flume整合kafka

flume整合kafka整合流程Flume发送数据到Kafka上主要是通过KafkaSink来实现的,主要步骤如下:1.启动Zookeeper和Kafka这里启动一个单节点的Kafka作为测试:#启动ZookeeperzkServer.shstart#启动kafkabin/kafka-server-start.shconfig/server.properties2.创建主题创建一个主题flume-kafka,之后Flume收集到的数据都会发到这个主题上:#…

大家好,又见面了,我是你们的朋友全栈君。

整合流程

Flume 发送数据到 Kafka 上主要是通过 KafkaSink 来实现的,主要步骤如下:

1. 启动Zookeeper和Kafka

这里启动一个单节点的 Kafka 作为测试:

# 启动Zookeeper
zkServer.sh start

# 启动kafka
bin/kafka-server-start.sh config/server.properties

2. 创建主题

创建一个主题 flume-kafka,之后 Flume 收集到的数据都会发到这个主题上:

# 创建主题
bin/kafka-topics.sh --create \
--zookeeper hadoop001:2181 \
--replication-factor 1   \
--partitions 1 --topic flume-kafka

# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list

3. 启动kafka消费者

启动一个消费者,监听我们刚才创建的 flume-kafka 主题:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka

4. 配置Flume

新建配置文件 exec-memory-kafka.properties,文件内容如下。这里我们监听一个名为 kafka.log 的文件,当文件内容有变化时,将新增加的内容发送到 Kafka 的 flume-kafka 主题上。

a1.sources = s1
a1.channels = c1
a1.sinks = k1                                                                                         

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /tmp/kafka.log
a1.sources.s1.channels=c1 

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=hadoop001:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=flume-kafka
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1     

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100   

5. 启动Flume

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console

6. 测试

向监听的 /tmp/kafka.log 文件中追加内容,查看 Kafka 消费者的输出:

flume整合kafka

可以看到 flume-kafka 主题的消费端已经收到了对应的消息:

flume整合kafka

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152401.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月23日 上午11:16
下一篇 2022年6月23日 上午11:16


相关推荐

  • AIC和BIC准则详解

    AIC和BIC准则详解很多参数估计问题均采用似然函数作为目标函数,当训练数据足够多时,可以不断提高模型精度,但是以提高模型复杂度为代价,同时带来一个机器学习中非常普遍的问题——过拟合。所以,模型选择问题在模型复杂度与模型对数据集描述能力(即似然函数)之间寻求最佳平衡。人们提出许多信息准则,通过加入模型复杂度的惩罚项来避免过拟合问题,此处我们介绍一下常用的两个模型选择方法:1.赤池信息准则(AkaikeInformationCriterion,AIC)AIC是衡量统计模型拟合优良性的一种标准,由日本统计学家赤池弘次在

    2022年5月23日
    82
  • linux命令行安装gcc_linux用yum安装gcc

    linux命令行安装gcc_linux用yum安装gcc目前,GCC可以用来编译C/C++、FORTRAN、JAVA、OBJC、ADA等语言的程序,可根据需要选择安装支持的语言。下面由学习啦小编为大家整理了linux下安装gcc命令的方法,希望大家喜欢!linux下安装gcc命令1下载在GCC网站上或者通过网上搜索可以查找到下载资源。目前GCC的最新版本为4.2.1。可供下载的文件一般有两种形式:gcc-4.1.2.tar.gz和gcc-4.1.2…

    2022年10月13日
    6
  • 非常详细的Fiddler工具使用说明(包含APP抓包)

    非常详细的Fiddler工具使用说明(包含APP抓包)阅读目录1.Fiddler抓包简介    1).字段说明    2).Statistics请求的性能数据分析    3).Inspectors查看数据内容    4).AutoResponder允许拦截制定规则的请求    5).Filters请求过滤规则    6).Timeline请求响应时间2.Fiddler设置解密HTTPS的网络数据3.Fiddler抓取…

    2022年5月7日
    59
  • rsyslog日志服务器_centos7发送全部日志

    rsyslog日志服务器_centos7发送全部日志rsyslog日志服务详解原文出处:http://blog.51cto.com/6638225/1862902内容:1、rsyslog日志服务简介2、rsyslog的配置详解3、实现日志服务器收集日志及last、lastb、dmseg命令的使用4、实现日志存储在mysql中一、rsyslog日志服务简介​日志的概念好理解,日志作用可用于排障和追溯审计的等​…

    2026年3月10日
    4
  • Delphi 跨平台_delphi调用api接口

    Delphi 跨平台_delphi调用api接口DELPHI是怎么实现跨平台的?

    2022年4月21日
    212
  • github是什么,有什么用

    github是什么,有什么用写在前面 关于 github 的文章我已经写了两篇了 关于 github 个人网站搭建和上传的内容 这篇是对前两篇的一个总结 这里也会讲一些背景知识 和常见问题之类的相关内容 github 是什么 嗯 有什么奇怪的东西乱入了 画风有点不对实际上 这个问题在 github 的网站首页说的很清楚了 1 github 是一个基于 git 的 web 协作社区 它有多种机

    2026年3月18日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号