flume整合kafka

flume整合kafka整合流程Flume发送数据到Kafka上主要是通过KafkaSink来实现的,主要步骤如下:1.启动Zookeeper和Kafka这里启动一个单节点的Kafka作为测试:#启动ZookeeperzkServer.shstart#启动kafkabin/kafka-server-start.shconfig/server.properties2.创建主题创建一个主题flume-kafka,之后Flume收集到的数据都会发到这个主题上:#…

大家好,又见面了,我是你们的朋友全栈君。

整合流程

Flume 发送数据到 Kafka 上主要是通过 KafkaSink 来实现的,主要步骤如下:

1. 启动Zookeeper和Kafka

这里启动一个单节点的 Kafka 作为测试:

# 启动Zookeeper
zkServer.sh start

# 启动kafka
bin/kafka-server-start.sh config/server.properties

2. 创建主题

创建一个主题 flume-kafka,之后 Flume 收集到的数据都会发到这个主题上:

# 创建主题
bin/kafka-topics.sh --create \
--zookeeper hadoop001:2181 \
--replication-factor 1   \
--partitions 1 --topic flume-kafka

# 查看创建的主题
bin/kafka-topics.sh --zookeeper hadoop001:2181 --list

3. 启动kafka消费者

启动一个消费者,监听我们刚才创建的 flume-kafka 主题:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flume-kafka

4. 配置Flume

新建配置文件 exec-memory-kafka.properties,文件内容如下。这里我们监听一个名为 kafka.log 的文件,当文件内容有变化时,将新增加的内容发送到 Kafka 的 flume-kafka 主题上。

a1.sources = s1
a1.channels = c1
a1.sinks = k1                                                                                         

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /tmp/kafka.log
a1.sources.s1.channels=c1 

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=hadoop001:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=flume-kafka
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1     

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100   

5. 启动Flume

flume-ng agent \
--conf conf \
--conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-kafka.properties \
--name a1 -Dflume.root.logger=INFO,console

6. 测试

向监听的 /tmp/kafka.log 文件中追加内容,查看 Kafka 消费者的输出:

flume整合kafka

可以看到 flume-kafka 主题的消费端已经收到了对应的消息:

flume整合kafka

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152401.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 虚拟化和云计算之间,主要是什么关系?

    虚拟化和云计算之间,主要是什么关系?尽管事实上云计算和虚拟化是两种非常不同的技术 但他们仍然常常被等同起来 从行业数据互相关联的角度来说 云计算是极度依赖虚拟化的 事实上 我们可以说虚拟化就是云计算的关键 以下是 4 个关键性特征 让虚拟化支撑着云计算 1 弹性能力许多云计算用户都希望能够快速的部署新服务器和高可扩展性环境 虚拟机让这成为可能 镜像部署变得轻松可行 2 以低成本最大限度利用资源相比在专用服务器上隔离单个的应用程

    2025年6月10日
    0
  • LeetCode – 538. Convert BST to Greater Tree

    LeetCode – 538. Convert BST to Greater Tree

    2022年3月6日
    47
  • Android手机上使用Socks5全局代理-教程+软件

    Android手机上使用Socks5全局代理-教程+软件前言:在Android上使用系统自带的代理,限制灰常大,仅支持系统自带的浏览器。这样像QQ、飞信、微博等这些单独的App都不能使用系统的代理。如何让所有软件都能正常代理呢?ProxyDroid这个软件能帮你解决!使用方法及步骤如下:一、推荐从GooglePlay下载ProxyDroid,目前最新版本是v2.6.6。二、对ProxyDroid进行配置(基本配置:)…

    2022年6月24日
    607
  • 单片机流水灯程序[通俗易懂]

    单片机流水灯程序[通俗易懂]一、流水灯实验1、设计要求P1口接8个发光二极管,烧录程序后发光二极管依次点亮2、硬件要求利用proteusIsis仿真,选择器件,AT89C51、LED-BLUE、RES3、软件设计源程序:/*********************必要变量定义******************/#include<reg51.h>#include<intrns.h>…

    2022年5月1日
    54
  • java环境教程_java环境配置的详细教程(图文)

    java环境教程_java环境配置的详细教程(图文)本篇文章给大家带来的内容是关于java环境配置的详细教程(图文),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。JAVA环境变量的配置:Path,JAVA_HOME,CLASSPATH一、右键我的电脑,属性,高级系统设置,点击环境变量二、然后就会弹出环境变量这个窗口,在系统变量编辑JAVA_HOME,如果没有就新建一个,把jkd的路径添加进去,如图三、配置CASSPATH,如果没有…

    2022年7月7日
    28
  • Java虚拟机(JVM)面试题(2020最新版)

    文章目录Java内存区域说一下JVM的主要组成部分及其作用?说一下JVM运行时数据区深拷贝和浅拷贝说一下堆栈的区别?队列和栈是什么?有什么区别?HotSpot虚拟机对象探秘对象的创建为对象分配内存处理并发安全问题对象的访问定位句柄访问直接指针内存溢出异常Java会存在内存泄漏吗?请简单描述垃圾收集器简述Java垃圾回收机制GC是什么?为什么要GC垃圾回收的优点和原理。并考虑2种回收机制垃圾…

    2022年4月18日
    59

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号