Flume 如何自定义 Mysql Sink?

Flume 如何自定义 Mysql Sink?前言本文隶属于专栏 1000 个问题搞定大数据技术体系 该专栏为笔者原创 引用请注明来源 不足和错误之处请在评论区帮忙指出 谢谢 本专栏目录结构和参考文献请见 1000 个问题搞定大数据技术体系正文场景描述官方提供的 sink 类型已经很多 但是有时候并不能满足实际开发当中的需求 此时我们就需要根据实际需求自定义某些 sink 如 需要把接受到的数据按照规则进行过滤之后写入到某张 mysql 表中 所以此时需要我们自己实现 MySQLSink 自定义 MysqlSink 步骤 1 根据官方说明自

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

场景描述

官方提供的sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些sink。 如:需要把接受到的数据按照规则进行过滤之后写入到某张mysql表中,所以此时需要我们自己实现MySQLSink。 

自定义 Mysql Sink 步骤

  • 1、根据官方说明自定义 MysqlSink 需要继承 AbstractSink 类并实现 Configurable
  • 2、实现对应的方法
    • configure(Context context)
      • 初始化context
    • start()
      • 启动准备操作
    • process()
      • 从channel获取数据,然后解析之后,保存在mysql表中
    • stop()
      • 关闭相关资源

实践

  1. 创建 mysql 数据库以及 mysql 数据库表
--创建一个数据库 CREATE DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ; --创建一个表,用户保存拉取目标表位置的信息 CREATE TABLE mysqlsource.flume2mysql ( id int(11) NOT NULL AUTO_INCREMENT, createTime varchar(64) NOT NULL, content varchar(255) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 
  1. 构建maven工程,添加依赖
 <properties> <flume.version>1.9.0 
     flume.version> <mysql.version>8.0.24 
      mysql.version>  
       properties> <dependencies> <dependency> <groupId>org.apache.flume 
        groupId> <artifactId>flume-ng-core 
         artifactId> <version>${flume.version} 
          version>  
           dependency> <dependency> <groupId>mysql 
            groupId> <artifactId>mysql-connector-java 
             artifactId> <version>${mysql.version} 
              version>  
               dependency> <dependency> <groupId>org.apache.commons 
                groupId> <artifactId>commons-lang3 
                 artifactId> <version>3.12.0 
                  version>  
                   dependency>  
                    dependencies> 
  1. 定义 MysqlSink 类
package com.shockang.study.bigdata.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; / * 自定义MysqlSink */ public class MysqlSink extends AbstractSink implements Configurable { 
    private String mysqlurl = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process() { 
    Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { 
    Event event = ch.take(); if (event != null) { 
    //获取body中的数据 String body = new String(event.getBody(), "UTF-8"); //如果日志中有以下关键字的不需要保存,过滤掉 if (body.contains("delete") || body.contains("drop") || body.contains("alert")) { 
    status = Status.BACKOFF; } else { 
    //存入Mysql SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)"); stmt.setString(1, createtime); stmt.setString(2, body); stmt.execute(); stmt.close(); status = Status.READY; } } else { 
    status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) { 
    txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally { 
    txn.close(); } return status; } / * 获取配置文件中指定的参数 * * @param context */ @Override public void configure(Context context) { 
    mysqlurl = context.getString("mysqlurl"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() { 
    try { 
    //初始化数据库连接 con = DriverManager.getConnection(mysqlurl, username, password); super.start(); System.out.println("finish start"); } catch (Exception ex) { 
    ex.printStackTrace(); } } @Override public synchronized void stop() { 
    try { 
    con.close(); } catch (SQLException e) { 
    e.printStackTrace(); } super.stop(); } } 
  1. 测试

① 程序打成jar包,上传jar包到flume的lib目录下

② 配置文件准备

vim mysqlsink.conf 
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/bigdata/flumeData/data.log a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = com.shockang.study.bigdata.flume.MysqlSink a1.sinks.k1.mysqlurl=jdbc:mysql://node1:3306/mysqlsource?useSSL=false a1.sinks.k1.username=root a1.sinks.k1.password= a1.sinks.k1.tablename=flume2mysql 

③ 启动flume配置

flume-ng agent -n a1 -c /opt/bigdata/flume/myconf -f /opt/bigdata/flume/myconf/mysqlsink.conf -Dflume.root.logger=info,console 

④ 最后向文件中添加数据,观察mysql表中的数据

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220310.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午8:46
下一篇 2026年3月17日 下午8:46


相关推荐

  • PCI与PCIe学习一——硬件篇[通俗易懂]

    PCI与PCIe学习一——硬件篇[通俗易懂]文章转载自:点击打开链接最近在学习驱动开发过程中涉及到PCI相关知识,在网上看了很多文章,良莠不齐,我总结一下比较好的文章分享给大家,那就从源头开始说起。PCI总线和设备树是X86硬件体系内很重要的组成部分,几乎所有的外围硬件都以这样或那样的形式连接到PCI设备树上。虽然Intel为了方便各种IP的接入而提出IOSF总线,但是其主体接口(primaryinterface)…

    2022年6月15日
    51
  • ubuntu 20.04 安装中文输入法_如何在ubuntu中安装中文输入法

    ubuntu 20.04 安装中文输入法_如何在ubuntu中安装中文输入法在Ubuntu系统中,无论是写文档还是在程序中写注释,都经常需要用到中文输入法。本文简单介绍了三种输入法框架,然后详细介绍了在Ubuntu20.04系统中,IBus框架和Fcitx框架支持的中文输入法的配置和安装。……

    2026年4月13日
    3
  • haxm intel庐_如何开启Intel HAXM功能「建议收藏」

    haxm intel庐_如何开启Intel HAXM功能「建议收藏」1.启用BIOS中的Intel(R)VirtualizationTechnology选项2.设置成功后,在控制台中输入scqueryintelhaxm。出现下图即为成功3.启动androidSDK,在Extras目录的最下边,勾选IntelHAXM项,并下载4.下载完成后,打开目录:Sdk\extras\intel\Hardware_Accelerated_Execution_…

    2022年6月28日
    29
  • 使用axios上传文件+参数

    使用axios上传文件+参数数据格式采用FormData请求头设置:Content-Type:multipart/form-data;boundary=—-WebKitFormBoundaryVCFSAonTuDbVCoAN例:letfile=所选取的文件letformData=newFormData();formData.append(‘paramId’,1)formData.append(…

    2022年6月14日
    74
  • android 简单的登录

    android 简单的登录

    2022年1月8日
    47
  • 食品生物技术学计算机吗,食品生物技术「建议收藏」

    食品生物技术学计算机吗,食品生物技术「建议收藏」三、教学任务食品生物技术系主要承担本科生的课程如下:生物化学、微生物学、食品营养与卫生学、食品生物技术、实验设计与数据处理、综合性实验课等课程。承担生物化工和食品科学专业研究生高等生物化学、高等微生物学、实验动物学、现代生物技术等课程。四、主要研究方向与内容  食品生物技术系主要从事与食品生物技术方向的教学、科研及甜菜分子生物学方向的科研、研究生培养工作。1.食品分子营养与安全1.1食品分子营养学…

    2022年7月11日
    18

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号