整合Flume和Kafka完成实时数据采集

整合Flume和Kafka完成实时数据采集需要注意:参考的网站要与你的kafka的版本一致,因为里面的字段会不一致例如:http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html#kafka-sink这是1.6版本的,如果需要查看1.9版本的直接就将1.6.0改为1.9.0即可#avro-memory-kafka.confavro-memory-kafka.sources=avro-sourceavro-memory-kafka.sinks=kafka-.

大家好,又见面了,我是你们的朋友全栈君。

在这里插入图片描述

需要注意:参考的网站要与你的kafka的版本一致,因为里面的字段会不一致
例如:http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html#kafka-sink
这是1.6版本的,如果需要查看1.9版本的直接就将1.6.0改为1.9.0即可

# avro-memory-kafka.conf
avro-memory-kafka.sources = avro-source
avro-memory-kafka.sinks = kafka-sink
avro-memory-kafka.channels = memory-channel

# Describe/configure the source
avro-memory-kafka.sources.avro-source.type = avro
avro-memory-kafka.sources.avro-source.bind = hadoop000
avro-memory-kafka.sources.avro-source.port = 44444

# Describe the sink
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
# batchSize 当达到5个日志才会处理,所以消费者出现的消息会慢
avro-memory-kafka.sinks.kafka-sink.batchSize = 5
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

# Use a channel which buffers events in memory
avro-memory-kafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
flume-ng agent \
--name avro-memory-kafka \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-memory-kafka.conf \
-Dflume.root.logger=INFO,console
flume-ng agent  \
--name exec-memory-avro \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console

启动消费者:
kafka-console-consumer.sh –zookeeper hadoop000:2181 –topic hello_topic

向data.log写入数据,发现消费者出现消息,成功

[hadoop@hadoop000 data]$ echo hellospark1111 >> data.log
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152379.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Cortex-A53架构(记笔记的方法)

    1.前言一颗芯片最主要的就是CPU核了,处理CPUCore之外,还存在很多其他IP,包括Graphical、Multimedia、MemoryController、USBController等等。ARMproducts列出了主要产品,其中Architecture和Processors需要重点关注。Architecture扩展的四大领域:SecurityExtensio…

    2022年4月13日
    64
  • Brocade博科光纤交换机zone配置

    Brocade博科光纤交换机zone配置1、规划交换机端口用途DS6520B-A94存储模块1-195存储模块2-168DB1网卡1-169DB2网卡1-1DS6520B-B94存储模块1-295存储模块2-268DB1网卡1-269DB2网卡1-2…

    2022年5月21日
    44
  • dmesg的使用「建议收藏」

    dmesg的使用「建议收藏」1.dmesg命令–>用来显示开机信息,kernel会将开机信息存储在ringbuffer中。开机时来不及查看信息,可利用dmesg来查看。开机信息亦保存在/var/log/dmesg2.【dmesg命令作用】:有时候屏幕上的启动信息一闪而过,我们无法查看到具体信息,又或者服务器在电信机房,更看不到开机启动信息。这时候linux提供了dmesg这条命令。在命令行下…

    2025年7月8日
    6
  • Idea激活码最新教程2023.2.8版本,永久有效激活码,亲测可用,记得收藏

    Idea激活码最新教程2023.2.8版本,永久有效激活码,亲测可用,记得收藏Idea 激活码教程永久有效 2023 2 8 激活码教程 Windows 版永久激活 持续更新 Idea 激活码 2023 2 8 成功激活

    2025年5月29日
    5
  • fastjson List转JSONArray以及JSONArray转List「建议收藏」

    fastjson List转JSONArray以及JSONArray转List「建议收藏」1.fastjson List转JSONArrayList<T>list=newArrayList<T>();JSONArrayarray=JSONArray.parseArray(JSON.toJSONString(list));2.fastjson JSONArray转ListJSONArrayarray=newJSONArray();List&…

    2022年6月15日
    41
  • ioctl() FIONREAD 检测socket是否有数据可读

    ioctl() FIONREAD 检测socket是否有数据可读先看看FIONREAD的作用FIONREAD:Getthenumberofbytesintheinputbuffer获取接收缓存中数据的字节数项目中用来判断tcpsocket是否有数据接收到,但是出现了一个问题,对于用于accept的socket即调用listen()之后的socket,用FIONREAD,判断的时候报错,ioctl()返回-1,错误码是2…

    2022年7月23日
    8

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号