大数据——Flume+Kafka+Flume整合模式

大数据——Flume+Kafka+Flume整合模式创建kafka主题#启动kafka服务kafka-server-start.sh/opt/software/kafka280cala212/conf/kraft/server.properites#创建主题#topic主题名test01#partitions分区数1#replication-factor备份数量1kafka-topics.sh–create–topictest01–partitions1–replication-factor1…

大家好,又见面了,我是你们的朋友全栈君。

大数据——Flume+Kafka+Flume整合模式

创建kafka主题

#启动kafka服务
kafka-server-start.sh /opt/software/kafka280scala212/conf/kraft/server.properites

#创建主题
#topic主题名test01    
#partitions分区数1 
#replication-factor备份数量1
kafka-topics.sh --create --topic test01 --partitions 1 --replication-factor 1 --bootstrap-server 192.168.131.200:9092

#查看主题
kafka-topics.sh --list --bootstrap-server 192.168.131.200:9092

创建flume配置文件(采用KafkaSink作为kafka生产者)

#创建并编辑文件名为flume_kafka01.conf配置文件
vim /root/flume/flume_kafka01.conf

#创建flume 的三大组件sources channels sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#这里选用的是taildir类型的source,支持断点续采
a1.sources.s1.type = taildir

#需要侦听的文件,支持多目录侦听
a1.sources.s1.filegroups = f1
#侦听前缀为prolog的文件
a1.sources.s1.filegroups.f1 = /root/flume_log/prolog*
#断点记录保存文件路径
a1.sources.s1.positionFile = /opt/software/fluem190/data/taildir/tail_prolog_01.json
#设置采集批量
a1.sources.s1.batchSize = 10

a1.channels.c1.type = file
a1.channels.c1.file.checkpointDir = /opt/software/flume190/mydata/checkpoint04
a1.channels.c1.file.capacity = 1000
a1.channels.c1.file.transactionCapacity = 100
#transactionCapacity 默认值为100,且必须大于100
#transactionCapacity >= batchSize

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.131.200:9092
a1.sinks.k1.kafka.topic = test01
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.linger.ms = 500
a1.sinks.k1.kafka.acks = 1

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

创建flume配置文件(采用KafkaSource作为kafka消费者)

vim /root/flume/kafka_flume01.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 10
a1.sources.si.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.server = 192.168.131.200:9092
a1.sources.s1.topics = test01
a1.sources.s1.kafka.consumer.groupid = first_test
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

a1.channels.c1.type = file 
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint05
a1.channels.c1.file.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capaticy = 1000
a1.channels.c1.transactionCapacity = 10

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /kafka_flume/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
sinks.k1.hdfs.roundUnit = minute

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动flume消费者

flume-ng agent -n a1 -c conf/ -f /root/flume/kafka_flume01.conf -Dflume.root.logger=INFO,console

启动flume生产者

flume-ng agent -n a1 -c conf/ -f /root/flume/flume_kafka02.conf -Dflume.root.logger=INFO,console

启动控制台kafka消费者

kafka-console-consumer.sh --bootstrap-server test:9092 --from-beginning --topic kb12_01 --property print.key=true --key-deserializer org.apache.kafka.common.serialization.LongDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152384.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Android Drawable 与 LayerList综合汇总

    Android Drawable 与 LayerList综合汇总

    2022年1月25日
    36
  • mysql 通配符 替换,使用通配符替换的MySQL[通俗易懂]

    mysql 通配符 替换,使用通配符替换的MySQL[通俗易懂]I’mtryingtowriteaSQLupdatetoreplaceaspecificxmlnodewithanewstring:UPDATEtableSETConfiguration=REPLACE(Configuration,”%%ANY_VALUE%%””NEW_DATA”);SothatSDADASbecomesNEW_DATAIsthere…

    2022年7月16日
    11
  • c语言必背100代码,初学者代码大全(c语言必背100代码)[通俗易懂]

    c语言必背100代码,初学者代码大全(c语言必背100代码)[通俗易懂]一个完全入门初学者如何学代码,读代码和写代码,,我想学代码不知道方向谁能给我指明一个方向?1、学代码:前提是你的复有一个比较系统的学习.认真完成每一个课程中的案例.2、读代码:分制两步走:前期能读懂自己写的代码.2113后期能读懂他人写的代码和大致的知道底层的某些源码的含义.多去5261看开发文档(开发文档建议使用官方提供的4102英文版、不要使用中文自己害自己)3、写代码1653:前提是你要有…

    2022年5月18日
    82
  • LoadImage()的使用

    LoadImage()的使用

    2021年12月5日
    40
  • hystrix实现服务降级的3种方式[通俗易懂]

    hystrix实现服务降级的3种方式[通俗易懂]1、hystrix是什么Hystrix是一款开源的容错插件,具有依赖隔离,系统容错降级等功能,这也是其最重要的两种用途,还有请求合并等功能2、为什么要进行隔离在实际工作中,尤其是分布式、微服务越来越普遍的今天,一个服务经常需要调用其他的服务,即RPC调用,而调用最多的方式还是通过http请求进行调用,这里面就有一个问题了,如果调用过程中,因为网络等原因,造成某个服务调用超时,如果没有熔断机制…

    2022年4月30日
    121
  • 服务器的cd驱动器怎么修改盘符,更改dvd驱动器盘符,cd驱动器盘符改「建议收藏」

    服务器的cd驱动器怎么修改盘符,更改dvd驱动器盘符,cd驱动器盘符改「建议收藏」有部分win7系统用户反映说,当他在电脑中安装了虚拟光驱之后,电脑就会产生好多个无效的驱动器盘符,所以就将要将它们删除掉,可以却发现右击的菜单中找不到删除选项,也无法弹出光驱,导致无法删除无效驱动器盘符,这要怎么办呢?接下来给大家分享一下Win7系统删除无效驱动器盘符右键没有删除选项要怎么解决吧!推荐:1、点击win7系统的开始菜单,右击计算机,选择管理选项卡,进入计算机管理;2、打开计算机管理后…

    2022年5月31日
    167

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号