【订阅专栏合集,作者所有付费文章都能看】
FlinkAPI
Environment
执行Flink程序首先要判断flink环境。Flink中有3种获取执行环境的方式。
1)getExecutionEnvironment
获取当前执行程序的上下文。如果是直接在IDEA中运行的JAVA代码,则此方法返回本地执行环境。如果是从命令行或web页面提交flink任务到集群中,则此方法返回的是集群执行环境。这种方式是最常用的,Flink底层帮我们判断具体调用本地还是远程环境。
ExecutionEnvironment.getExecutionEnvironment();//获取批处理执行环境 StreamExecutionEnvironment.getExecutionEnvironment(); //获取流处理执行环境
2)createLocalEnvironment
直接返回本地执行环境,这种方式可以指定并行度。如不指定则使用当前机器可用cpu核数作为并行度。其实第1种方式判断当前环境是本地环境的话,底层也会调此方法。
ExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
获取远程集群执行环境。如果将Jar包提交到远程Flink集群执行则需指定JobManager的IP和port,并指定jar包路径
ExecutionEnvironment.createRemoteEnvironment(hostname,port,"hdfs://wordCount.jar");
Source
source是Flink应用程序的数据来源。作为一款通用的数据处理框架,flink既可以处理静态的历史数据集,也可以处理实时的流式数据。流式计算场景下只要数据源源不断传入,flink就能一直处理。下面讲解Flink中的几种数据输入方式。
1)从本地集合中读取
executionEnvironment.fromCollection(Arrays.asList("a", "b", "c","d"));//从JAVA Collection中读取数据 executionEnvironment.fromElements(1, 2, 3, 4);//从给定的对象序列中读取数据
2)从文件中读取
String inputPath = "F:\\data\\file"; executionEnvironment.readTextFile(inputPath);//使用默认的文件格式
3)从socket中读取
env.socketTextStream("localhost", 9999);//从指定的IP地址和端口处读取数据,使用默认行分隔符 env.socketTextStream(hostname, port, delimiter);//指定行分隔符
4)从Kafka中读取
实际开发中,Kafka作为Flink数据源非常常见,可以说Kafka和Flink在流式数据处理领域是天生的一对。
引入Kafka 连接器pom依赖,连接器的版本和Flink版本保持一致
<dependency> <groupId>org.apache.flink
groupId> <artifactId>flink-connector-kafka-0.11_2.11
artifactId> <version>1.9.2
version>
dependency>
Flink中添加kafka数据源
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置参数 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.174.129:9092"); props.put("zookeeper.connect", "192.168.174.129:2181"); //props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); //FlinkKafkaConsumer011 表示对应的kafka版本是0.11.x DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "test01", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 env.execute("Flink add kafka data source"); }
addSource是一般化的添加数据源的算子,前面几种source都是Flink根据特定应用场景封装好的算子,底层还是调用了addSource。
可以测试一下上述程序。在linux服务器上启动kafka集群,并通过命令行运行一个Producer发送消息。查看Flink是否消费到数据。
Kafka相关教程可以参考这篇文章:《Kafka 实战教程》
5)自定义source
有时为了方便测试Flink应用程序,我们需要手动造数据,这就要用到自定义数据源。
自定义的DataSource只要实现org.apache.flink.streaming.api.functions.source.SourceFunction接口即可被作为数据源添加。
下面的例子展示了如何自定义数据源。需求是实现一个实时数字生成器,1秒钟产生1个自增数字发送到Flink。Flink收到数据后放大两倍输出。
/ * 自定义Flink数据源,重写SourceFunction的run和cancel方法 */ import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MyDataSource implements SourceFunction<Integer> {
private boolean isRunning = true; / * run方法里编写数据产生逻辑 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Integer> ctx) throws Exception {
int i = 1; while (isRunning) {
ctx.collect(i); i++; Thread.sleep(1000); } } @Override public void cancel() {
isRunning = false; } }
public class MyDataSourceTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e); res.print(); env.execute("Flink add dataSource"); } }
掌握了自定义数据源的使用有助于实际开发,比如可以写个实时读取MySql数据源的工具!
Transformation
Transform也可称为Operator,翻译为中文为“算子”,其实就是数据转换操作。下面讲解Flink中的几种数据转换操作,先从流式处理,即DataStream 操作讲起。批处理与之类似。
1)Map
Map就是映射,顾名思义,就是将输入数据进行转换操作。
map算子的输入参数是一个MapFunction,我们只要实现它重写其中的map函数即可
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception; }
比如将商品数据流中的每个商品价格翻倍:
SingleOutputStreamOperator<Product> map = dataStreamSource.map(new MapFunction<Product, Product>() {
@Override public Product map(Product product) throws Exception {
product.price = product.price * 2; return product; } }); map.print();
对于简单的转换操作我们也可以直接使用lambda 表达式,比如.map(e -> 2 * e);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e);//把数据源中的每个元素放大2倍 res.print();
2)FlatMap
FlatMap意指扁平化的map,即将每个元素map后的数据打散,重新组成一个“宽”的集合。和JDK8中的flatMap本质一样。
FlatMap的输入参数是一个FlatMapFunction,只要重写其flatMap方法即可,value是输入数据,out是输出数据收集器
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception; }
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//接收一个字符串(Wordcount中表示一行数据),输出一个2元组 @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] splits = value.split("\\s"); for (String word : splits) {
out.collect(new Tuple2<String, Integer>(word, 1)); } } })
FlatMap和Map的区别在第一篇快速入门案例中已讲解,此处不再赘述。
3)Filter
对元素进行过滤,重写FilterFunction的filter实现过滤逻辑,简单的过滤逻辑可以直接使用lambda表达式
public interface FilterFunction<T> extends Function, Serializable {
boolean filter(T value) throws Exception; }
例如过滤价格超过100的商品
SingleOutputStreamOperator<Product> res = mySource.filter(new FilterFunction<Product>() {
@Override public boolean filter(Product product) throws Exception {
if (product.price >= 100) {
return true; } return false; } }) res.print();
4)KeyBy
根据指定的key对流数据元素进行分区,底层基于hash算法,hashCode相同的key被分到同一个分区,即分到下游算子并行节点中的一个。比如快速入门案例中,flatMap之后的数据按照单词分组,即按照二元组数据的第一个字段:Tuple2.f0 分组
DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource .flatMap(new Splitter()) .keyBy(value -> value.f0)
keyBy的参数是KeySelector
,前一个泛型表示来源数据类型,后一个泛型表示从原数据中提取出来的key的类型
再比如根据商品的品牌来分组
KeyedStream<Product, String> keyByedProd = productStream.keyBy(new KeySelector<Product, String>() {
@Override public String getKey(Product product) throws Exception {
return product.brand; } }); keyByedProd.print();
简写:.keyBy(product-> product.brand)
5)Reduce
reduce俗称“约减”,就是将元素进行聚合处理。常见的sum、min、max、count、average等聚合操作都可以使用原生的reduce实现。reduce算子的入参是ReduceFunction,value1表示前一个元素,value2表示后一个元素,reduce方法是具体的数据处理逻辑。reduce操作实质上就是不断地将数据源中两个值合并为同一类型的一个值,reduce函数连续应用于输入数据流中的所有值,直到只剩下一个值(聚合之后的结果)。
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception; }
比如统计各个品牌商品的总价
SingleOutputStreamOperator<Product> reduceRes = productStream.keyBy(new KeySelector<Product, String>() {
@Override public String getKey(Product product) throws Exception {
return product.brand; } }).reduce(new ReduceFunction<Product>() {
@Override public Product reduce(Product product1, Product product2) throws Exception {
product2.price = (product1.price + product2.price); return product2; } }); reduceRes.print();
6)Aggregation
Flink中支持对数据流的各种聚合操作,并封装了很多聚集函数。像min、max、sum等聚集函数都可以应用于 KeyedStream获得聚合结果。聚合算子参数如果是int类型,则表示聚合字段的下标(从0开始)。比如快速入门案例中对二元组数据求和:.sum(1)表示求二元组中第二个字段(单词计数)的和。如果是string类型,则表示聚合字段名,通常是一个pojo对象的public属性。
KeyedStream.sum(0) KeyedStream.sum("field0") KeyedStream.min(1) KeyedStream.min("field1") KeyedStream.max(2) KeyedStream.max("field2") KeyedStream.minBy(3) KeyedStream.minBy("field3") KeyedStream.maxBy(4) KeyedStream.maxBy("field4")
7)Split和Select
Split是根据指定条件将数据流拆分为两个或多个流,可以单独处理每个数据流。Select是从拆分的流中选择特定的流。select和split一般结合使用,正如keyBy和聚集函数一起使用一样。实现这样的需求:按照商品价格比如100元为界将商品分为优品(>100)和良品。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 990)); products.add(new Product("B", "安踏", 90)); products.add(new Product("C", "耐克", 880)); products.add(new Product("D", "特步", 80)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() {
@Override public Iterable<String> select(Product product) {
List<String> list = new ArrayList<>();//使用list作为临时数据结构存储标签 if (product.getPrice() > 100) {
list.add("优品"); } else {
list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); DataStream<Product> allProducts = splitStream.select("良品","优品"); superiorProducts.print("优品"); //启动计算任务 executionEnvironment.execute("Stream operator"); }
控制台输出“优品”的数据:
优品:1> Product{
name='C', brand='耐克', price=880.0} 优品:4> Product{
name='A', brand='阿迪', price=990.0}
有分流操作,那么与之对应的必然有合流操作。Flink中合流操作有2种:Union和Connect。
8)Union
Union函数表示将两个或多个数据类型相同的流组合在一起,即求并集。
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream1 = executionEnvironment.fromCollection(Arrays.asList("a", "b", "c", "d")); DataStreamSource<String> dataStream2 = executionEnvironment.fromCollection(Arrays.asList("1", "2", "3", "4")); DataStream<String> union = dataStream2.union(dataStream1); union.print();
控制台输出合并之后的数据:
3> b 4> c 1> d 1> 3 4> 2 2> a 3> 1 2> 4
9)connect和CoMap
两个datastream连接后转变成connectedstreams,即:datastream,datastream->connectedstreams。与union不同的是connect不要求被连接的两个流数据类型相同。两个流虽然被connect到了同一个流中,但是合并之后的流内部依然保持各自的数据格式不变,相互独立。connect通常和coMap一起使用,coMap对connect之后的流做数据处理。
实际应用中一个数据流过来可能先根据元素的某种特征分开处理,到了一定阶段又需要合并处理,此时就需要用到分流和合流操作。
实现这样的需求:连接一个二元组类型的数据流和一个Product的数据流。并分别对连接后的数据流做map操作。
还是沿用之前的例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 999)); products.add(new Product("B", "安踏", 99)); products.add(new Product("C", "耐克", 888)); products.add(new Product("D", "特步", 89)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() {
@Override public Iterable<String> select(Product product) {
List<String> list = new ArrayList<>(); if (product.getPrice() > 100) {
list.add("优品"); } else {
list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); //先将优品数据转换为2元组类型 DataStream<Tuple2<String, Double>> superiorProductsStream = superiorProducts.map(new MapFunction<Product, Tuple2<String, Double>>() {
@Override public Tuple2<String, Double> map(Product product) throws Exception {
return new Tuple2<>(product.getName(), product.getPrice()); } }); //连接二元组数据流和Product数据类型,并做算子操作,都转换为3元组 SingleOutputStreamOperator<Object> operator = superiorProductsStream.connect(acceptedProducts).map(new CoMapFunction<Tuple2<String, Double>, Product, Object>() {
@Override public Object map1(Tuple2 value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "优品"); } @Override public Object map2(Product value) throws Exception {
return new Tuple3<>(value.getName(), value.getPrice(), "良品"); } }); operator.print("coMap"); //启动计算任务 executionEnvironment.execute("Stream operator"); }
控制台输出如下内容,说明案例中不同数据类型的流连接(connect)、处理(coMap)成功。
coMap:1> (A,999.0,优品) coMap:2> (C,888.0,优品) coMap:4> (D,89.0,良品) coMap:3> (B,99.0,良品)
观察上面的介绍的几种操作可以总结一些规律。
比如keyBy操作总是和聚集函数一起使用、split通常和select一起使用、connect和coMap一起使用。datastream split后得到splitstream,再select之后又转换为datastream ;同样的,datastream connect之后得到connectedstreams,再经coMap操作后又转换为datastream 。union合并的两个流数据类型必须相同,合并过程不涉及流类型的转换。而connect不要求数据流的元素类型相同。union操作可以操作多个流,connect操作只能操作两个流。
上述介绍的 DataStream(流处理) 数据转换操作中,有些也适合DataSet(批处理)。比如 Map、FlatMap、Reduce、Filter 等。
当然DataSet也有一些特有算子。
比如在DataStream中分区是 KeyBy,而DataSet中是GroupBy。这在快速入门案例中已经演示,不再赘述。
DataSet有个first(n)方法可以返回DataSet中前 n个元素,比如:env.readTextFile(inputPath).first(2);返回数据集中前2个元素。
Flink数据类型
前文中介绍Flink中算子的使用时提到了数据类型,下面简单介绍一下Flink中所支持的数据类型。
Flink应用程序处理的是由数据对象组成的连续不断的数据流。这些数据对象需要被序列化和反序列化,以便能够通过网络传输以及从检查点、保存点、状态后端存储读取。为了明确应用程序所处理的数据类型,Flink底层提供了一套完备的数据类型信息,并且为每一种类型提供了序列化器、反序列化器以及比较器。
此外,Flink还提供了类型提取系统,自动分析函数的输入类型和输出类型,以获得对应的序列化器和反序列化器。在使用lambda函数或者泛型类型时,需显式指定类型信息。
Flink DataStream里的元素类型支持JAVA和Scala中的所有基本类型,像Int、Long、Double、String等。此外还支持Tuple元组类型、Java简单对象(pojo)、scala样例类以及一些集合类型,比如Java的ArrayList、HashMap、Enum等。
Flink的每个函数都提供了对应的Rich版本。富函数相比普通的函数可以获取flink运行时上下文、生命周期方法。生命周期方法中通常可以做一些初始化及收尾操作,比如连接数据库、关闭数据库连接。
Sink
sink,顾名思义,下沉,在Flink中意指数据输出、数据落地的意思。最简单的数据输出方式就是打印到控制台,调用datastream的print()方法即可,print就是一种sink操作。对于不同的sink方式,Flink提供了各种内置的输出格式。
除了基本的输入输出数据源外,flink目前还支持下列第三方组件作为数据源。
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
- JDBC (sink)
本节介绍几种常用的数据输出方式。
1)普通文件、socket
writeAsText()/TextOutputFormat:将元素按行写入字符串。字符串通过调用每个元素的toString()方法获得。
writeAsCsv(…)/CsvOutputFormat:将数据以逗号分隔的形式写入文件。换行符和字段分隔符可配置。每个字段的值来自对象的toString()方法。
writeUsingOutputFormat() / FileOutputFormat:自定义文件输出格式,支持自定义对象到字节的转换。
writeToSocket:根据指定格式(Serialization Schema)将元素写入网络套接字。
举例如下:
public class SinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1,输出结果全部写出到一个文件,否则开发环境会使用默认并行度分区,分区数为当前机器逻辑cpu核数 env.setParallelism(1); //准备数据源 List<Tuple2<String, Integer>> list = new ArrayList<>(); list.add(new Tuple2<>("A", 100)); list.add(new Tuple2<>("B", 200)); list.add(new Tuple2<>("C", 300)); list.add(new Tuple2<>("D", 400)); DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection(list); dataStreamSource.print(); //除了路径参数是必填外,还可以通过指定第二个参数来定义输出模式 dataStreamSource.writeAsText("d://sink-text.txt", FileSystem.WriteMode.OVERWRITE); //如果想要将输出结果全部写出到一个文件,可以单独设置算子的并行度为 1 dataStreamSource.writeAsCsv("d://sink-csv.txt", FileSystem.WriteMode.OVERWRITE, "\n", ",").setParallelism(1); //自定义的输出格式,writeAsText/writeAsCsv底层调用的都是该方法 dataStreamSource.writeUsingOutputFormat(new TextOutputFormat(new Path("d://sink-file.txt"), "UTF-8")); //以字符串的形式输出到socket服务器 dataStreamSource.map(t -> t.f0 + ":" + t.f1 + "\r\n").writeToSocket("192.168.244.131", 9999, new SimpleStringSchema()); env.execute("sink demo"); } }
测试socket输出时,先在linux服务器上使用nc -lk 9999 模拟socket服务器开启监听。
引入依赖
<dependency> <groupId>org.apache.flink
groupId> <artifactId>flink-connector-kafka-0.11_2.11
artifactId> <version>1.11.0
version>
dependency>
需求:实现Flink消费kafka消息队列的消息,转换处理后再次输出到kafka中。
实体类Product
public class Product {
private String name; private String brand; private double price; //省略get/set }
通过linux命令行创建2个topic,一个由flink消费,另一个由flink写入。
bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-in-topic
bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-out-topic
查看topic: bin/kafka-topics.sh --list --bootstrap-server 192.168.244.131:9092
启动一个消费者,接收flink的输出
bin/kafka-console-consumer.sh --bootstrap-server 192.168.244.131:9092 --topic flink-stream-out-topic
启动一个生产者,向flink应用程序监听的topic发送消息
bin/kafka-console-producer.sh --topic flink-stream-in-topic --bootstrap-server 192.168.244.131:9092
在生产者端输入json串:{"name":"跑鞋","brand":"Nike","price":1000}
Flink应用程序集成Kafka:
public class KafkaSink {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.244.131:9092"); props.put("zookeeper.connect", "192.168.244.131:2181"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value 反序列化 props.put("auto.offset.reset", "latest"); //从kafka中消费数据 DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "flink-stream-in-topic", //kafka topic new SimpleStringSchema(), // String序列化 props)).setParallelism(1);//并行度一般不超过kafka topic分区数 dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 //对数据进行业务处理 SingleOutputStreamOperator<String> streamOperated = dataStreamSource.map(new MapFunction<String, String>() {
@Override public String map(String value) throws Exception {
Product product = JSON.parseObject(value, Product.class); //将商品价格翻倍 product.setPrice(product.getPrice()*2); return JSON.toJSONString(product); } }); //将处理完的数据再次发送到kafka中 streamOperated.addSink(new FlinkKafkaProducer011<>( "flink-stream-out-topic", new SimpleStringSchema(), props)).setParallelism(1); env.execute("kafka data source"); } }
运行flink应用后,kafka消费者端将收到处理后的数据:
{"brand":"Nike","name":"跑鞋","price":2000.0}
3)redis
Redis Connector 用于向 Redis 发送数据。可以使用三种不同的方法与不同类型的 Redis 环境进行通信
- 单 Redis 服务器
- Redis 集群
- Redis Sentinel(哨兵)
不同模式主要是Config类的不同,本例展示了单机模式下Flink写入redis
引入redis连接器依赖
<dependency> <groupId>org.apache.bahir
groupId> <artifactId>flink-connector-redis_2.11
artifactId> <version>1.0
version>
dependency>
编写flink应用代码
public class RedisSinkDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源,简单起见,这里使用本地集合数据 List<Tuple2<String, String>> list = new ArrayList<>(); list.add(new Tuple2<>("A", "apple")); list.add(new Tuple2<>("B", "bird")); list.add(new Tuple2<>("C", "cat")); list.add(new Tuple2<>("D", "dog")); DataStreamSource<Tuple2<String, String>> dataStreamSource = env.fromCollection(list); //单机Redis配置,这里只简单配置ip/端口,还支持其它配置比如maxTotal、maxIdle、timeout FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build(); //数据写入redis dataStreamSource.addSink(new RedisSink<>(redisConf, new RedisMapper<Tuple2<String, String>>() {
@Override public RedisCommandDescription getCommandDescription() {
//指定redis命令,这里只演示最简单的设置字符串key return new RedisCommandDescription(RedisCommand.SET); } @Override public String getKeyFromData(Tuple2<String, String> data) {
//提取要存到redis的key return data.f0; } @Override public String getValueFromData(Tuple2<String, String> data) {
//提取要存到redis的value return data.f1; } })); env.execute("redis data sink"); } }
通过redis Cli 查看写入的数据:
127.0.0.1:6379> get A "apple" 127.0.0.1:6379> get B "bird" 127.0.0.1:6379> get C "cat"
Redis 集群 配置:
FlinkJedisClusterConfig config = new FlinkJedisClusterConfig.Builder() .setNodes(new HashSet<InetSocketAddress>( Arrays.asList(new InetSocketAddress("host1", 6379), new InetSocketAddress("host2", 6379)))).build();
Redis Sentinels 配置:
FlinkJedisSentinelConfig sentinelConfig = new FlinkJedisSentinelConfig.Builder() .setMasterName("master") .setSentinels(new HashSet<>(Arrays.asList("sentinel1", "sentinel2"))) .setPassword("12345") .setDatabase(1).build();
4)JDBC
Flink官方提供了JDBC连接器,只要引入连接器和mysql驱动即可。
引入依赖
<dependency> <groupId>org.apache.flink
groupId> <artifactId>flink-connector-jdbc_2.11
artifactId> <version>1.11.2
version>
dependency> <dependency> <groupId>mysql
groupId> <artifactId>mysql-connector-java
artifactId> <version>5.1.49
version>
dependency>
flink应用代码:
public class JDBCSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源 List<Product> list = new ArrayList<>(); list.add(new Product("跑鞋", "耐克", 998)); list.add(new Product("短裤", "李宁", 119)); list.add(new Product("袜子", "耐克", 68)); list.add(new Product("西服", "海澜之家", 1000)); DataStreamSource<Product> dataStreamSource = env.fromCollection(list); String url = "jdbc:mysql://localhost:3306/flink_data?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT"; String sql = "insert into t_product(name, brand, price) values (?,?,?)"; dataStreamSource.addSink(JdbcSink.sink(sql, new JdbcStatementBuilder<Product>() {
@Override public void accept(PreparedStatement ps, Product product) throws SQLException {
ps.setString(1, product.getName()); ps.setString(2, product.getBrand()); ps.setDouble(3, product.getPrice()); } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("") .build())); env.execute("jdbc data sink"); } }
运行完毕,查看数据库
use flink_data; select * from t_product;

5)自定义
除了Flink官方提供的第三方连接器外,我们也可以自定义 Sink 来满足各种输出需求。自定义的 Sink需要直接或者间接实现 SinkFunction 接口,一般直接继承抽象的富函数RichSinkFunction,重写其open、close、invoke方法。相比于SinkFunction ,富函数提供了操作生命周期的相关方法。
需求:实现一个自定义的sink,将数据输出到Mysql数据库。
MysqlSink.java
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlSink extends RichSinkFunction<Product> {
private PreparedStatement stmt; private Connection conn; @Override public void open(Configuration parameters) throws Exception {
super.open(parameters); String url = "jdbc:mysql://localhost:3306/flink_data?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT&autoReconnect=true"; String sql = "insert into t_product(name, brand, price) values (?,?,?)"; Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url, "root", ""); stmt = conn.prepareStatement(sql); } @Override public void close() throws Exception {
super.close(); if (stmt != null) {
stmt.close(); } if (conn != null) {
conn.close(); } } @Override public void invoke(Product product, Context context) throws Exception {
stmt.setString(1, product.getName()); stmt.setString(2, product.getBrand()); stmt.setDouble(3, product.getPrice()); stmt.executeUpdate(); } }
flink应用代码
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class MysqlSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源 List<Product> list = new ArrayList<>(); list.add(new Product("笔记本", "联想", 3998)); list.add(new Product("硬盘", "希捷", 219)); list.add(new Product("CPU", "Intel", 668)); list.add(new Product("显示器", "飞利浦", 1400)); DataStreamSource<Product> dataStreamSource = env.fromCollection(list); dataStreamSource.addSink(new MysqlSink()); env.execute("mysql data sink"); } }
运行flink应用程序,查看mysql数据库数据是否写入成功。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/224829.html原文链接:https://javaforall.net
