Flink教程(5)-Flink常用API

Flink教程(5)-Flink常用APIFlink 教程 FlinkAPI 讲解 Flink 是一个同时具备流数据处理和批数据处理的分布式计算框架 Flink 代码主要是由 Java 实现 部分代码由 Scala 实现 Flink 既可以处理有界的批量数据集 也可以处理无界的实时数据集 Flink 处理的主要场景是流式数据 Flink 称得上是一款真正的流 批统一的大数据计算框架

【订阅专栏合集,作者所有付费文章都能看】

FlinkAPI

Environment

执行Flink程序首先要判断flink环境。Flink中有3种获取执行环境的方式。

1)getExecutionEnvironment

获取当前执行程序的上下文。如果是直接在IDEA中运行的JAVA代码,则此方法返回本地执行环境。如果是从命令行或web页面提交flink任务到集群中,则此方法返回的是集群执行环境。这种方式是最常用的,Flink底层帮我们判断具体调用本地还是远程环境。

ExecutionEnvironment.getExecutionEnvironment();//获取批处理执行环境 StreamExecutionEnvironment.getExecutionEnvironment(); //获取流处理执行环境  

2)createLocalEnvironment

直接返回本地执行环境,这种方式可以指定并行度。如不指定则使用当前机器可用cpu核数作为并行度。其实第1种方式判断当前环境是本地环境的话,底层也会调此方法。

ExecutionEnvironment.createLocalEnvironment(); 

3)createRemoteEnvironment

获取远程集群执行环境。如果将Jar包提交到远程Flink集群执行则需指定JobManager的IP和port,并指定jar包路径

ExecutionEnvironment.createRemoteEnvironment(hostname,port,"hdfs://wordCount.jar"); 

Source

source是Flink应用程序的数据来源。作为一款通用的数据处理框架,flink既可以处理静态的历史数据集,也可以处理实时的流式数据。流式计算场景下只要数据源源不断传入,flink就能一直处理。下面讲解Flink中的几种数据输入方式。

1)从本地集合中读取

executionEnvironment.fromCollection(Arrays.asList("a", "b", "c","d"));//从JAVA Collection中读取数据 executionEnvironment.fromElements(1, 2, 3, 4);//从给定的对象序列中读取数据 

2)从文件中读取

String inputPath = "F:\\data\\file"; executionEnvironment.readTextFile(inputPath);//使用默认的文件格式 

3)从socket中读取

env.socketTextStream("localhost", 9999);//从指定的IP地址和端口处读取数据,使用默认行分隔符 env.socketTextStream(hostname, port, delimiter);//指定行分隔符 

4)从Kafka中读取

实际开发中,Kafka作为Flink数据源非常常见,可以说Kafka和Flink在流式数据处理领域是天生的一对。

引入Kafka 连接器pom依赖,连接器的版本和Flink版本保持一致

<dependency> <groupId>org.apache.flink 
     groupId> <artifactId>flink-connector-kafka-0.11_2.11 
      artifactId> <version>1.9.2 
       version>  
        dependency> 

Flink中添加kafka数据源

 public static void main(String[] args) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置参数 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.174.129:9092"); props.put("zookeeper.connect", "192.168.174.129:2181"); //props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); //FlinkKafkaConsumer011 表示对应的kafka版本是0.11.x DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "test01", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 env.execute("Flink add kafka data source"); } 

addSource是一般化的添加数据源的算子,前面几种source都是Flink根据特定应用场景封装好的算子,底层还是调用了addSource。

可以测试一下上述程序。在linux服务器上启动kafka集群,并通过命令行运行一个Producer发送消息。查看Flink是否消费到数据。

Kafka相关教程可以参考这篇文章:《Kafka 实战教程》

5)自定义source

有时为了方便测试Flink应用程序,我们需要手动造数据,这就要用到自定义数据源。

自定义的DataSource只要实现org.apache.flink.streaming.api.functions.source.SourceFunction接口即可被作为数据源添加。

下面的例子展示了如何自定义数据源。需求是实现一个实时数字生成器,1秒钟产生1个自增数字发送到Flink。Flink收到数据后放大两倍输出。

/ * 自定义Flink数据源,重写SourceFunction的run和cancel方法 */ import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MyDataSource implements SourceFunction<Integer> { 
    private boolean isRunning = true; / * run方法里编写数据产生逻辑 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Integer> ctx) throws Exception { 
    int i = 1; while (isRunning) { 
    ctx.collect(i); i++; Thread.sleep(1000); } } @Override public void cancel() { 
    isRunning = false; } } 
public class MyDataSourceTest { 
    public static void main(String[] args) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e); res.print(); env.execute("Flink add dataSource"); } } 

掌握了自定义数据源的使用有助于实际开发,比如可以写个实时读取MySql数据源的工具!

Transformation

Transform也可称为Operator,翻译为中文为“算子”,其实就是数据转换操作。下面讲解Flink中的几种数据转换操作,先从流式处理,即DataStream 操作讲起。批处理与之类似。

1)Map

Map就是映射,顾名思义,就是将输入数据进行转换操作。

map算子的输入参数是一个MapFunction,我们只要实现它重写其中的map函数即可

public interface MapFunction<T, O> extends Function, Serializable { 
    O map(T value) throws Exception; } 

比如将商品数据流中的每个商品价格翻倍:

SingleOutputStreamOperator<Product> map = dataStreamSource.map(new MapFunction<Product, Product>() { 
    @Override public Product map(Product product) throws Exception { 
    product.price = product.price * 2; return product; } }); map.print(); 

对于简单的转换操作我们也可以直接使用lambda 表达式,比如.map(e -> 2 * e);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e);//把数据源中的每个元素放大2倍 res.print(); 

2)FlatMap

FlatMap意指扁平化的map,即将每个元素map后的数据打散,重新组成一个“宽”的集合。和JDK8中的flatMap本质一样。

FlatMap的输入参数是一个FlatMapFunction,只要重写其flatMap方法即可,value是输入数据,out是输出数据收集器

public interface FlatMapFunction<T, O> extends Function, Serializable { 
    void flatMap(T value, Collector<O> out) throws Exception; } 
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
    //接收一个字符串(Wordcount中表示一行数据),输出一个2元组 @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { 
    String[] splits = value.split("\\s"); for (String word : splits) { 
    out.collect(new Tuple2<String, Integer>(word, 1)); } } }) 

FlatMap和Map的区别在第一篇快速入门案例中已讲解,此处不再赘述。

3)Filter

对元素进行过滤,重写FilterFunction的filter实现过滤逻辑,简单的过滤逻辑可以直接使用lambda表达式

public interface FilterFunction<T> extends Function, Serializable { 
    boolean filter(T value) throws Exception; } 

例如过滤价格超过100的商品

SingleOutputStreamOperator<Product> res = mySource.filter(new FilterFunction<Product>() { 
    @Override public boolean filter(Product product) throws Exception { 
    if (product.price >= 100) { 
    return true; } return false; } }) res.print(); 

4)KeyBy

根据指定的key对流数据元素进行分区,底层基于hash算法,hashCode相同的key被分到同一个分区,即分到下游算子并行节点中的一个。比如快速入门案例中,flatMap之后的数据按照单词分组,即按照二元组数据的第一个字段:Tuple2.f0 分组

DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource .flatMap(new Splitter()) .keyBy(value -> value.f0) 

keyBy的参数是KeySelector

,前一个泛型表示来源数据类型,后一个泛型表示从原数据中提取出来的key的类型

再比如根据商品的品牌来分组

KeyedStream<Product, String> keyByedProd = productStream.keyBy(new KeySelector<Product, String>() { 
    @Override public String getKey(Product product) throws Exception { 
    return product.brand; } }); keyByedProd.print(); 

简写:.keyBy(product-> product.brand)

5)Reduce

reduce俗称“约减”,就是将元素进行聚合处理。常见的sum、min、max、count、average等聚合操作都可以使用原生的reduce实现。reduce算子的入参是ReduceFunction,value1表示前一个元素,value2表示后一个元素,reduce方法是具体的数据处理逻辑。reduce操作实质上就是不断地将数据源中两个值合并为同一类型的一个值,reduce函数连续应用于输入数据流中的所有值,直到只剩下一个值(聚合之后的结果)。

public interface ReduceFunction<T> extends Function, Serializable { 
    T reduce(T value1, T value2) throws Exception; } 

比如统计各个品牌商品的总价

SingleOutputStreamOperator<Product> reduceRes = productStream.keyBy(new KeySelector<Product, String>() { 
    @Override public String getKey(Product product) throws Exception { 
    return product.brand; } }).reduce(new ReduceFunction<Product>() { 
    @Override public Product reduce(Product product1, Product product2) throws Exception { 
    product2.price = (product1.price + product2.price); return product2; } }); reduceRes.print(); 

6)Aggregation

Flink中支持对数据流的各种聚合操作,并封装了很多聚集函数。像min、max、sum等聚集函数都可以应用于 KeyedStream获得聚合结果。聚合算子参数如果是int类型,则表示聚合字段的下标(从0开始)。比如快速入门案例中对二元组数据求和:.sum(1)表示求二元组中第二个字段(单词计数)的和。如果是string类型,则表示聚合字段名,通常是一个pojo对象的public属性。

KeyedStream.sum(0) KeyedStream.sum("field0") KeyedStream.min(1) KeyedStream.min("field1") KeyedStream.max(2) KeyedStream.max("field2") KeyedStream.minBy(3) KeyedStream.minBy("field3") KeyedStream.maxBy(4) KeyedStream.maxBy("field4") 

7)Split和Select

Split是根据指定条件将数据流拆分为两个或多个流,可以单独处理每个数据流。Select是从拆分的流中选择特定的流。select和split一般结合使用,正如keyBy和聚集函数一起使用一样。实现这样的需求:按照商品价格比如100元为界将商品分为优品(>100)和良品。

public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 990)); products.add(new Product("B", "安踏", 90)); products.add(new Product("C", "耐克", 880)); products.add(new Product("D", "特步", 80)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() { 
    @Override public Iterable<String> select(Product product) { 
    List<String> list = new ArrayList<>();//使用list作为临时数据结构存储标签 if (product.getPrice() > 100) { 
    list.add("优品"); } else { 
    list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); DataStream<Product> allProducts = splitStream.select("良品","优品"); superiorProducts.print("优品"); //启动计算任务 executionEnvironment.execute("Stream operator"); } 

控制台输出“优品”的数据:

优品:1> Product{ 
   name='C', brand='耐克', price=880.0} 优品:4> Product{ 
   name='A', brand='阿迪', price=990.0} 

有分流操作,那么与之对应的必然有合流操作。Flink中合流操作有2种:Union和Connect。

8)Union

Union函数表示将两个或多个数据类型相同的流组合在一起,即求并集。

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream1 = executionEnvironment.fromCollection(Arrays.asList("a", "b", "c", "d")); DataStreamSource<String> dataStream2 = executionEnvironment.fromCollection(Arrays.asList("1", "2", "3", "4")); DataStream<String> union = dataStream2.union(dataStream1); union.print(); 

控制台输出合并之后的数据:

3> b 4> c 1> d 1> 3 4> 2 2> a 3> 1 2> 4 

9)connect和CoMap

两个datastream连接后转变成connectedstreams,即:datastream,datastream->connectedstreams。与union不同的是connect不要求被连接的两个流数据类型相同。两个流虽然被connect到了同一个流中,但是合并之后的流内部依然保持各自的数据格式不变,相互独立。connect通常和coMap一起使用,coMap对connect之后的流做数据处理。

实际应用中一个数据流过来可能先根据元素的某种特征分开处理,到了一定阶段又需要合并处理,此时就需要用到分流和合流操作。

实现这样的需求:连接一个二元组类型的数据流和一个Product的数据流。并分别对连接后的数据流做map操作。

还是沿用之前的例子:

public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 999)); products.add(new Product("B", "安踏", 99)); products.add(new Product("C", "耐克", 888)); products.add(new Product("D", "特步", 89)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() { 
    @Override public Iterable<String> select(Product product) { 
    List<String> list = new ArrayList<>(); if (product.getPrice() > 100) { 
    list.add("优品"); } else { 
    list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); //先将优品数据转换为2元组类型 DataStream<Tuple2<String, Double>> superiorProductsStream = superiorProducts.map(new MapFunction<Product, Tuple2<String, Double>>() { 
    @Override public Tuple2<String, Double> map(Product product) throws Exception { 
    return new Tuple2<>(product.getName(), product.getPrice()); } }); //连接二元组数据流和Product数据类型,并做算子操作,都转换为3元组 SingleOutputStreamOperator<Object> operator = superiorProductsStream.connect(acceptedProducts).map(new CoMapFunction<Tuple2<String, Double>, Product, Object>() { 
    @Override public Object map1(Tuple2 value) throws Exception { 
    return new Tuple3<>(value.f0, value.f1, "优品"); } @Override public Object map2(Product value) throws Exception { 
    return new Tuple3<>(value.getName(), value.getPrice(), "良品"); } }); operator.print("coMap"); //启动计算任务 executionEnvironment.execute("Stream operator"); } 

控制台输出如下内容,说明案例中不同数据类型的流连接(connect)、处理(coMap)成功。

coMap:1> (A,999.0,优品) coMap:2> (C,888.0,优品) coMap:4> (D,89.0,良品) coMap:3> (B,99.0,良品) 

观察上面的介绍的几种操作可以总结一些规律。

比如keyBy操作总是和聚集函数一起使用、split通常和select一起使用、connect和coMap一起使用。datastream split后得到splitstream,再select之后又转换为datastream ;同样的,datastream connect之后得到connectedstreams,再经coMap操作后又转换为datastream 。union合并的两个流数据类型必须相同,合并过程不涉及流类型的转换。而connect不要求数据流的元素类型相同。union操作可以操作多个流,connect操作只能操作两个流。

上述介绍的 DataStream(流处理) 数据转换操作中,有些也适合DataSet(批处理)。比如 Map、FlatMap、Reduce、Filter 等。

当然DataSet也有一些特有算子。

比如在DataStream中分区是 KeyBy,而DataSet中是GroupBy。这在快速入门案例中已经演示,不再赘述。

DataSet有个first(n)方法可以返回DataSet中前 n个元素,比如:env.readTextFile(inputPath).first(2);返回数据集中前2个元素。

Flink数据类型

前文中介绍Flink中算子的使用时提到了数据类型,下面简单介绍一下Flink中所支持的数据类型。

Flink应用程序处理的是由数据对象组成的连续不断的数据流。这些数据对象需要被序列化和反序列化,以便能够通过网络传输以及从检查点、保存点、状态后端存储读取。为了明确应用程序所处理的数据类型,Flink底层提供了一套完备的数据类型信息,并且为每一种类型提供了序列化器、反序列化器以及比较器。

此外,Flink还提供了类型提取系统,自动分析函数的输入类型和输出类型,以获得对应的序列化器和反序列化器。在使用lambda函数或者泛型类型时,需显式指定类型信息。

Flink DataStream里的元素类型支持JAVA和Scala中的所有基本类型,像Int、Long、Double、String等。此外还支持Tuple元组类型、Java简单对象(pojo)、scala样例类以及一些集合类型,比如Java的ArrayList、HashMap、Enum等。

Flink的每个函数都提供了对应的Rich版本。富函数相比普通的函数可以获取flink运行时上下文、生命周期方法。生命周期方法中通常可以做一些初始化及收尾操作,比如连接数据库、关闭数据库连接。

Sink

sink,顾名思义,下沉,在Flink中意指数据输出、数据落地的意思。最简单的数据输出方式就是打印到控制台,调用datastream的print()方法即可,print就是一种sink操作。对于不同的sink方式,Flink提供了各种内置的输出格式。

除了基本的输入输出数据源外,flink目前还支持下列第三方组件作为数据源。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

本节介绍几种常用的数据输出方式。

1)普通文件、socket

writeAsText()/TextOutputFormat:将元素按行写入字符串。字符串通过调用每个元素的toString()方法获得。

writeAsCsv(…)/CsvOutputFormat:将数据以逗号分隔的形式写入文件。换行符和字段分隔符可配置。每个字段的值来自对象的toString()方法。

writeUsingOutputFormat() / FileOutputFormat:自定义文件输出格式,支持自定义对象到字节的转换。

writeToSocket:根据指定格式(Serialization Schema)将元素写入网络套接字。

举例如下:

public class SinkDemo { 
    public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1,输出结果全部写出到一个文件,否则开发环境会使用默认并行度分区,分区数为当前机器逻辑cpu核数 env.setParallelism(1); //准备数据源 List<Tuple2<String, Integer>> list = new ArrayList<>(); list.add(new Tuple2<>("A", 100)); list.add(new Tuple2<>("B", 200)); list.add(new Tuple2<>("C", 300)); list.add(new Tuple2<>("D", 400)); DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection(list); dataStreamSource.print(); //除了路径参数是必填外,还可以通过指定第二个参数来定义输出模式 dataStreamSource.writeAsText("d://sink-text.txt", FileSystem.WriteMode.OVERWRITE); //如果想要将输出结果全部写出到一个文件,可以单独设置算子的并行度为 1 dataStreamSource.writeAsCsv("d://sink-csv.txt", FileSystem.WriteMode.OVERWRITE, "\n", ",").setParallelism(1); //自定义的输出格式,writeAsText/writeAsCsv底层调用的都是该方法 dataStreamSource.writeUsingOutputFormat(new TextOutputFormat(new Path("d://sink-file.txt"), "UTF-8")); //以字符串的形式输出到socket服务器 dataStreamSource.map(t -> t.f0 + ":" + t.f1 + "\r\n").writeToSocket("192.168.244.131", 9999, new SimpleStringSchema()); env.execute("sink demo"); } } 

测试socket输出时,先在linux服务器上使用nc -lk 9999 模拟socket服务器开启监听。

引入依赖

<dependency> <groupId>org.apache.flink 
     groupId> <artifactId>flink-connector-kafka-0.11_2.11 
      artifactId> <version>1.11.0 
       version>  
        dependency> 

需求:实现Flink消费kafka消息队列的消息,转换处理后再次输出到kafka中。

实体类Product

public class Product { 
    private String name; private String brand; private double price; //省略get/set } 

通过linux命令行创建2个topic,一个由flink消费,另一个由flink写入。

bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-in-topic 
bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-out-topic 

查看topic: bin/kafka-topics.sh --list --bootstrap-server 192.168.244.131:9092

启动一个消费者,接收flink的输出

bin/kafka-console-consumer.sh --bootstrap-server 192.168.244.131:9092 --topic flink-stream-out-topic

启动一个生产者,向flink应用程序监听的topic发送消息

bin/kafka-console-producer.sh --topic flink-stream-in-topic --bootstrap-server 192.168.244.131:9092

在生产者端输入json串:{"name":"跑鞋","brand":"Nike","price":1000}

Flink应用程序集成Kafka:

public class KafkaSink { 
    public static void main(String[] args) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.244.131:9092"); props.put("zookeeper.connect", "192.168.244.131:2181"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value 反序列化 props.put("auto.offset.reset", "latest"); //从kafka中消费数据 DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "flink-stream-in-topic", //kafka topic new SimpleStringSchema(), // String序列化 props)).setParallelism(1);//并行度一般不超过kafka topic分区数 dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 //对数据进行业务处理 SingleOutputStreamOperator<String> streamOperated = dataStreamSource.map(new MapFunction<String, String>() { 
    @Override public String map(String value) throws Exception { 
    Product product = JSON.parseObject(value, Product.class); //将商品价格翻倍 product.setPrice(product.getPrice()*2); return JSON.toJSONString(product); } }); //将处理完的数据再次发送到kafka中 streamOperated.addSink(new FlinkKafkaProducer011<>( "flink-stream-out-topic", new SimpleStringSchema(), props)).setParallelism(1); env.execute("kafka data source"); } } 

运行flink应用后,kafka消费者端将收到处理后的数据:

{"brand":"Nike","name":"跑鞋","price":2000.0}

3)redis

Redis Connector 用于向 Redis 发送数据。可以使用三种不同的方法与不同类型的 Redis 环境进行通信

  • 单 Redis 服务器
  • Redis 集群
  • Redis Sentinel(哨兵)

不同模式主要是Config类的不同,本例展示了单机模式下Flink写入redis

引入redis连接器依赖

<dependency> <groupId>org.apache.bahir 
     groupId> <artifactId>flink-connector-redis_2.11 
      artifactId> <version>1.0 
       version>  
        dependency> 

编写flink应用代码

public class RedisSinkDemo { 
    public static void main(String[] args) throws Exception { 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源,简单起见,这里使用本地集合数据 List<Tuple2<String, String>> list = new ArrayList<>(); list.add(new Tuple2<>("A", "apple")); list.add(new Tuple2<>("B", "bird")); list.add(new Tuple2<>("C", "cat")); list.add(new Tuple2<>("D", "dog")); DataStreamSource<Tuple2<String, String>> dataStreamSource = env.fromCollection(list); //单机Redis配置,这里只简单配置ip/端口,还支持其它配置比如maxTotal、maxIdle、timeout FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build(); //数据写入redis dataStreamSource.addSink(new RedisSink<>(redisConf, new RedisMapper<Tuple2<String, String>>() { 
    @Override public RedisCommandDescription getCommandDescription() { 
    //指定redis命令,这里只演示最简单的设置字符串key return new RedisCommandDescription(RedisCommand.SET); } @Override public String getKeyFromData(Tuple2<String, String> data) { 
    //提取要存到redis的key return data.f0; } @Override public String getValueFromData(Tuple2<String, String> data) { 
    //提取要存到redis的value return data.f1; } })); env.execute("redis data sink"); } } 

通过redis Cli 查看写入的数据:

127.0.0.1:6379> get A "apple" 127.0.0.1:6379> get B "bird" 127.0.0.1:6379> get C "cat" 

Redis 集群 配置:

 FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder() .setNodes(new HashSet<InetSocketAddress>( Arrays.asList(new InetSocketAddress("host1", 6379), new InetSocketAddress("host2", 6379)))).build(); 

Redis Sentinels 配置:

FlinkJedisSentinelConfig sentinelConfig = new FlinkJedisSentinelConfig.Builder() .setMasterName("master") .setSentinels(new HashSet<>(Arrays.asList("sentinel1", "sentinel2"))) .setPassword("12345") .setDatabase(1).build(); 

4)JDBC

Flink官方提供了JDBC连接器,只要引入连接器和mysql驱动即可。

引入依赖

<dependency> <groupId>org.apache.flink 
     groupId> <artifactId>flink-connector-jdbc_2.11 
      artifactId> <version>1.11.2 
       version>  
        dependency> <dependency> <groupId>mysql 
         groupId> <artifactId>mysql-connector-java 
          artifactId> <version>5.1.49 
           version>  
            dependency> 

flink应用代码:

public class JDBCSinkDemo { 
    public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源 List<Product> list = new ArrayList<>(); list.add(new Product("跑鞋", "耐克", 998)); list.add(new Product("短裤", "李宁", 119)); list.add(new Product("袜子", "耐克", 68)); list.add(new Product("西服", "海澜之家", 1000)); DataStreamSource<Product> dataStreamSource = env.fromCollection(list); String url = "jdbc:mysql://localhost:3306/flink_data?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT"; String sql = "insert into t_product(name, brand, price) values (?,?,?)"; dataStreamSource.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<Product>() { 
    @Override public void accept(PreparedStatement ps, Product product) throws SQLException { 
    ps.setString(1, product.getName()); ps.setString(2, product.getBrand()); ps.setDouble(3, product.getPrice()); } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("") .build())); env.execute("jdbc data sink"); } } 

运行完毕,查看数据库

use flink_data; select * from t_product; 

image-20201104222546393.png

5)自定义

除了Flink官方提供的第三方连接器外,我们也可以自定义 Sink 来满足各种输出需求。自定义的 Sink需要直接或者间接实现 SinkFunction 接口,一般直接继承抽象的富函数RichSinkFunction,重写其open、close、invoke方法。相比于SinkFunction ,富函数提供了操作生命周期的相关方法。

需求:实现一个自定义的sink,将数据输出到Mysql数据库。

MysqlSink.java

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlSink extends RichSinkFunction<Product> { 
    private PreparedStatement stmt; private Connection conn; @Override public void open(Configuration parameters) throws Exception { 
    super.open(parameters); String url = "jdbc:mysql://localhost:3306/flink_data?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT&autoReconnect=true"; String sql = "insert into t_product(name, brand, price) values (?,?,?)"; Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url, "root", ""); stmt = conn.prepareStatement(sql); } @Override public void close() throws Exception { 
    super.close(); if (stmt != null) { 
    stmt.close(); } if (conn != null) { 
    conn.close(); } } @Override public void invoke(Product product, Context context) throws Exception { 
    stmt.setString(1, product.getName()); stmt.setString(2, product.getBrand()); stmt.setDouble(3, product.getPrice()); stmt.executeUpdate(); } } 

flink应用代码

import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class MysqlSinkDemo { 
    public static void main(String[] args) throws Exception { 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源 List<Product> list = new ArrayList<>(); list.add(new Product("笔记本", "联想", 3998)); list.add(new Product("硬盘", "希捷", 219)); list.add(new Product("CPU", "Intel", 668)); list.add(new Product("显示器", "飞利浦", 1400)); DataStreamSource<Product> dataStreamSource = env.fromCollection(list); dataStreamSource.addSink(new MysqlSink()); env.execute("mysql data sink"); } } 

运行flink应用程序,查看mysql数据库数据是否写入成功。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/224829.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午11:03
下一篇 2026年3月17日 上午11:03


相关推荐

  • 2021DIY电脑配置入门篇(包含各cpu显卡天梯图对比)

    2021DIY电脑配置入门篇(包含各cpu显卡天梯图对比)前言:我本来以为一篇文章可以把电脑配置讲清楚的,但是发现电脑比我想象的要复杂,所以可能分了几篇来写如何查看自己的电脑配置最简单的右键桌面此电脑->点击属性下载个电脑管家等电脑助手软件也可以查看详细配置如何DIY自己的第一台电脑篇幅有限,这里我只详细分析一台电脑的核心配置(CPU、主板、显卡),外加内存定好预算对于电脑来说,预算是最重要的!没有预算,一切都是空谈。没预算默认外星人Area51M(价格在2万左右),现在电脑往往充当一种娱乐需求,相对来说比较次要,因此大多数人配电脑.

    2022年7月12日
    31
  • 将图片存储到mysql数据库[通俗易懂]

    将图片存储到mysql数据库[通俗易懂]正常的图片储存要么放进本地磁盘,要么就存进数据库。存入本地很简单,现在我在这里记下如何将图片存进mysql数据库 如果要图片存进数据库 要将图片转化成二进制。1.数据库存储图片的字段类型要为blob二进制大对象类型2.将图片流转化为二进制下面放上代码实例一、数据库CREATETABLE`photo`(`id`int(11)NOTNULL,`na

    2022年7月12日
    24
  • Jenkins详细安装与构建部署使用教程[通俗易懂]

    Jenkins详细安装与构建部署使用教程[通俗易懂]     Jenkins是一个开源软件项目,旨在提供一个开放易用的软件平台,使软件的持续集成变成可能。Jenkins是基于Java开发的一种持续集成工具,用于监控持续重复的工作,功能包括:1、持续的软件版本发布/测试项目。2、监控外部调用执行的工作。本文使用的Linux:Ubuntu其中JDK、Tomcat、SVN服务器请看这里Ubuntu安装配置JDK、Tomcat、SVN…

    2022年5月14日
    52
  • 关于Java中的Arrays.copyOfRange()方法

    关于Java中的Arrays.copyOfRange()方法要使用这个方法 首先要 importjava util Arrays copyOfRange T original intfrom intto 将一个原始的数组 original 从下标 from 开始复制 复制到上标 to 生成一个新的数组 注意这里包括下标 from 不包括上标 to 这个方法在一些处理数组的编程题里很好用 效率和 clone 基本一致 都是 nativ

    2026年3月16日
    2
  • 常见广域网技术

    常见广域网技术广域网封装技术广域网分装方式 HDLC PPP FR 其中 HDLC 和 FR 相继已经被淘汰 HDLCHDLC highleveldat 高级数据链路控制 简称 HDLC 是一种面相比特的链路层协议 广域网中会使用串行链路来提供远距离数据传输 HDLC 是思科研发 思科串行接口默认封装 HDLC 二层技术 注意 1 如果思科和华为串行接口对接需要将封装类型改为一致 CISCO 私有的 HDLC 和工业标准的 HDLC 不是一回事 假设一条链路两端设备使用不同的 HDLC 是不能

    2026年3月18日
    2
  • IIS7.5配置防盗链

    IIS7.5配置防盗链首先,要下载、安装一个IIS重写模块。是到微软站点下载的,可以放心了。(靠,之前以为IIS7是内置了的,想不到还是要另外安装东西)64位:http://www.microsoft.com/downloads/zh-cn/details.aspx?familyid=1b8c7bd8-8824-4408-b8fc-49dc7f951a0032位:http://www.microsoft.com/…

    2022年7月23日
    13

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号