Flink的sink实战之四:自定义

Flink的sink实战之四:自定义

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

Flink官方提供的sink服务可能满足不了我们的需要,此时可以开发自定义的sink,文本就来一起实战;

全系列链接

  1. 《Flink的sink实战之一:初探》
  2. 《Flink的sink实战之二:kafka》
  3. 《Flink的sink实战之三:cassandra3》
  4. 《Flink的sink实战之四:自定义》

继承关系

  1. 在正式编码前,要先弄清楚对sink能力是如何实现的,前面我们实战过的print、kafka、cassandra等sink操作,核心类的继承关系如下图所示:
    在这里插入图片描述
  2. 可见实现sink能力的关键,是实现RichFunction和SinkFunction接口,前者用于资源控制(如open、close等操作),后者负责sink的具体操作,来看看最简单的PrintSinkFunction类是如何实现SinkFunction接口的invoke方法:
@Override
public void invoke(IN record) {
  writer.write(record);
}
  1. 现在对sink的基本逻辑已经清楚了,可以开始编码实战了;

内容和版本

本次实战很简单:自定义sink,用于将数据写入MySQL,涉及的版本信息如下:

  1. jdk:1.8.0_191
  2. flink:1.9.2
  3. maven:3.6.0
  4. flink所在操作系统:CentOS Linux release 7.7.1908
  5. MySQL:5.7.29
  6. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinksinkdemo文件夹下,如下图红框所示:
在这里插入图片描述

数据库准备

请您将MySQL准备好,并执行以下sql,用于创建数据库flinkdemo和表student:

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

编码

  1. 使用《Flink的sink实战之二:kafka》中创建的flinksinkdemo工程;
  2. 在pom.xml中增加mysql的依赖:
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
  1. 创建和数据库的student表对应的实体类Student.java:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  1. 创建自定义sink类MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        //准备数据库相关实例
        buildPreparedStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();

        try{
            if(null!=preparedStatement) {
                preparedStatement.close();
                preparedStatement = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }

        try{
            if(null!=connection) {
                connection.close();
                connection = null;
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();
    }

    /**
     * 准备好connection和preparedStatement
     * 获取mysql连接实例,考虑多线程同步,
     * 不用synchronize是因为获取数据库连接是远程操作,耗时不确定
     * @return
     */
    private void buildPreparedStatement() {
        if(null==connection) {
            boolean hasLock = false;
            try {
                hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {
                    Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                }

                if(null!=connection) {
                    preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                }
            } catch (Exception e) {
                //生产环境慎用
                e.printStackTrace();
            } finally {
                if(hasLock) {
                    reentrantLock.unlock();
                }
            }
        }
    }
}
  1. 上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;
  2. 创建任务类StudentSink.java,用来创建一个flink任务,里面通过ArrayList创建了一个数据集,然后直接addSink,为了看清DAG,调用disableChaining方法取消了operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //并行度为1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa", 11));
        list.add(new Student("bbb", 12));
        list.add(new Student("ccc", 13));
        list.add(new Student("ddd", 14));
        list.add(new Student("eee", 15));
        list.add(new Student("fff", 16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj");
    }
}
  1. 在flink web页面提交任务,并设置任务类:
    在这里插入图片描述
  2. 任务完成后,DAG图显示任务和记录数都符合预期:
    在这里插入图片描述
  3. 去检查数据库,发现数据已写入:

在这里插入图片描述

至此,自定义sink的实战已经完成,希望本文能给您一些参考;

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
https://github.com/zq2599/blog_demos

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/2630.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 看了这篇文章觉得MySQL读写分离这么简单「建议收藏」

    点赞多大胆,就有多大产!有支持才有动力!微信搜索公众号【达摩克利斯之笔】获取更多资源,文末有二维码!前言​  Mysql优化那篇文章有朋友留言说就这么点?,深深刺痛了晓添的心,感觉知识深度被小看了,痛定思痛决定发布读写分离,分表分库优化文章,其实这系列文章也在Mysql优化的计划之内,最近较忙断断续续写的有点难受,到今天才跟大家见面,篇幅有限这篇我们来说说基于Mycat实现读写分离,话不多…

    2022年4月13日
    60
  • java判断集合list是否为空

    java判断集合list是否为空方法有二 其一为 if list null amp amp list size gt 0 判断 list 是否为空 且 list 集合中包含的元素个数小于等于 0 个 其二为 这是最常用的方法 简单便捷使用 isEmpty 方法 isEmpty 判断 list 集合有没有元素 如果有元素返回 false 没有返回 true 如果集合本身设置为 null 则会报

    2025年8月24日
    3
  • [奶奶看了都会]京东自动签到薅羊毛-完整教程

    [奶奶看了都会]京东自动签到薅羊毛-完整教程又到了节假日的时间了,每逢节假日必须得搞事情。最近北京疫情管的比较严,楼主去小区旁边的小公园散步,都要出示核酸证明了。。。上一次说到用脚本完成京东自动签到领京豆:[奶奶看了都会]教你用脚本薅京东签到羊毛这个只能领到自动签到任务的豆子而已,还有好多京豆任务都没做了,导致咱白白损失了一波豆豆?所以今天嘛,我们就把京豆的任务都做一遍,把京豆全给领了?手机抓包为了获取到京豆签到的接口,需要在手机京东APP上抓包,这就需要用到手机抓包的技术了楼主对着网上的教程实践了一波,搞了一整天之后,得到的结论是An

    2025年11月25日
    4
  • Python字符串大小比较

    Python字符串大小比较这个问题对于有编程经验的人来说,是个非常简单的问题;但是对于初学者来说,可能是个头疼的问题,所以以此记录一下。

    2022年6月21日
    47
  • python淘宝抢购脚本_Python 实现毫秒级淘宝、京东、天猫等秒杀抢购脚本「建议收藏」

    python淘宝抢购脚本_Python 实现毫秒级淘宝、京东、天猫等秒杀抢购脚本「建议收藏」本篇文章主要介绍了Python通过selenium实现毫秒级自动抢购的示例代码,通过扫码登录即可自动完成一系列操作,抢购时间精确至毫秒,可抢加购物车等待时间结算的,也可以抢聚划算的商品。该思路可运用到其他任何网站,京东,天猫,淘宝均可使用,且不属于外挂或者软件之类,只属于一个自动化点击工具。#!/usr/bin/envpython#-*-coding:utf-8-*-#2019/0…

    2022年4月30日
    54
  • 如何正确安装Oracle:Oracle11g安装教程

    如何正确安装Oracle:Oracle11g安装教程前言之前安装的过程中存在隐患问题,所以导致了我把它狠心的卸载了,今天就正确的安装上我们的Oracle。怎么卸载?卸载请点这里下面我们就来看一看具体的实施步骤吧!首先开水烫毛,将脏器取出,放上葱姜蒜等香料…下…锅…不好意思,走错片场了下载没有安装包,等我给你下载呐?好吧,这次就帮你一次吧!官方下地址:甲骨文官网如果你不想忍受英文的肆虐,那么直接点下面的连接吧!win3…

    2022年7月25日
    16

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号