前言
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
正文
场景描述
官方提供的sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些sink。 如:需要把接受到的数据按照规则进行过滤之后写入到某张mysql表中,所以此时需要我们自己实现MySQLSink。
自定义 Mysql Sink 步骤
- 1、根据官方说明自定义 MysqlSink 需要继承 AbstractSink 类并实现 Configurable
- 2、实现对应的方法
- configure(Context context)
- 初始化context
- start()
- 启动准备操作
- process()
- 从channel获取数据,然后解析之后,保存在mysql表中
- stop()
- 关闭相关资源
- configure(Context context)
实践
- 创建 mysql 数据库以及 mysql 数据库表
--创建一个数据库 CREATE DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ; --创建一个表,用户保存拉取目标表位置的信息 CREATE TABLE mysqlsource.flume2mysql ( id int(11) NOT NULL AUTO_INCREMENT, createTime varchar(64) NOT NULL, content varchar(255) NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 构建maven工程,添加依赖
<properties> <flume.version>1.9.0
flume.version> <mysql.version>8.0.24
mysql.version>
properties> <dependencies> <dependency> <groupId>org.apache.flume
groupId> <artifactId>flume-ng-core
artifactId> <version>${flume.version}
version>
dependency> <dependency> <groupId>mysql
groupId> <artifactId>mysql-connector-java
artifactId> <version>${mysql.version}
version>
dependency> <dependency> <groupId>org.apache.commons
groupId> <artifactId>commons-lang3
artifactId> <version>3.12.0
version>
dependency>
dependencies>
- 定义 MysqlSink 类
package com.shockang.study.bigdata.flume; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; / * 自定义MysqlSink */ public class MysqlSink extends AbstractSink implements Configurable {
private String mysqlurl = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process() {
Status status = null; // Start transaction Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try {
Event event = ch.take(); if (event != null) {
//获取body中的数据 String body = new String(event.getBody(), "UTF-8"); //如果日志中有以下关键字的不需要保存,过滤掉 if (body.contains("delete") || body.contains("drop") || body.contains("alert")) {
status = Status.BACKOFF; } else {
//存入Mysql SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)"); stmt.setString(1, createtime); stmt.setString(2, body); stmt.execute(); stmt.close(); status = Status.READY; } } else {
status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) {
txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally {
txn.close(); } return status; } / * 获取配置文件中指定的参数 * * @param context */ @Override public void configure(Context context) {
mysqlurl = context.getString("mysqlurl"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() {
try {
//初始化数据库连接 con = DriverManager.getConnection(mysqlurl, username, password); super.start(); System.out.println("finish start"); } catch (Exception ex) {
ex.printStackTrace(); } } @Override public synchronized void stop() {
try {
con.close(); } catch (SQLException e) {
e.printStackTrace(); } super.stop(); } }
- 测试
① 程序打成jar包,上传jar包到flume的lib目录下
② 配置文件准备
vim mysqlsink.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/bigdata/flumeData/data.log a1.sources.r1.channels = c1 #配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #配置sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = com.shockang.study.bigdata.flume.MysqlSink a1.sinks.k1.mysqlurl=jdbc:mysql://node1:3306/mysqlsource?useSSL=false a1.sinks.k1.username=root a1.sinks.k1.password= a1.sinks.k1.tablename=flume2mysql
③ 启动flume配置
flume-ng agent -n a1 -c /opt/bigdata/flume/myconf -f /opt/bigdata/flume/myconf/mysqlsink.conf -Dflume.root.logger=info,console
④ 最后向文件中添加数据,观察mysql表中的数据
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220310.html原文链接:https://javaforall.net
