大数据——Flume+Kafka+Flume整合模式

大数据——Flume+Kafka+Flume整合模式创建kafka主题#启动kafka服务kafka-server-start.sh/opt/software/kafka280cala212/conf/kraft/server.properites#创建主题#topic主题名test01#partitions分区数1#replication-factor备份数量1kafka-topics.sh–create–topictest01–partitions1–replication-factor1…

大家好,又见面了,我是你们的朋友全栈君。

大数据——Flume+Kafka+Flume整合模式

创建kafka主题

#启动kafka服务
kafka-server-start.sh /opt/software/kafka280scala212/conf/kraft/server.properites

#创建主题
#topic主题名test01    
#partitions分区数1 
#replication-factor备份数量1
kafka-topics.sh --create --topic test01 --partitions 1 --replication-factor 1 --bootstrap-server 192.168.131.200:9092

#查看主题
kafka-topics.sh --list --bootstrap-server 192.168.131.200:9092

创建flume配置文件(采用KafkaSink作为kafka生产者)

#创建并编辑文件名为flume_kafka01.conf配置文件
vim /root/flume/flume_kafka01.conf

#创建flume 的三大组件sources channels sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#这里选用的是taildir类型的source,支持断点续采
a1.sources.s1.type = taildir

#需要侦听的文件,支持多目录侦听
a1.sources.s1.filegroups = f1
#侦听前缀为prolog的文件
a1.sources.s1.filegroups.f1 = /root/flume_log/prolog*
#断点记录保存文件路径
a1.sources.s1.positionFile = /opt/software/fluem190/data/taildir/tail_prolog_01.json
#设置采集批量
a1.sources.s1.batchSize = 10

a1.channels.c1.type = file
a1.channels.c1.file.checkpointDir = /opt/software/flume190/mydata/checkpoint04
a1.channels.c1.file.capacity = 1000
a1.channels.c1.file.transactionCapacity = 100
#transactionCapacity 默认值为100,且必须大于100
#transactionCapacity >= batchSize

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.131.200:9092
a1.sinks.k1.kafka.topic = test01
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.linger.ms = 500
a1.sinks.k1.kafka.acks = 1

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

创建flume配置文件(采用KafkaSource作为kafka消费者)

vim /root/flume/kafka_flume01.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 10
a1.sources.si.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.server = 192.168.131.200:9092
a1.sources.s1.topics = test01
a1.sources.s1.kafka.consumer.groupid = first_test
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

a1.channels.c1.type = file 
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint05
a1.channels.c1.file.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capaticy = 1000
a1.channels.c1.transactionCapacity = 10

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /kafka_flume/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
sinks.k1.hdfs.roundUnit = minute

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动flume消费者

flume-ng agent -n a1 -c conf/ -f /root/flume/kafka_flume01.conf -Dflume.root.logger=INFO,console

启动flume生产者

flume-ng agent -n a1 -c conf/ -f /root/flume/flume_kafka02.conf -Dflume.root.logger=INFO,console

启动控制台kafka消费者

kafka-console-consumer.sh --bootstrap-server test:9092 --from-beginning --topic kb12_01 --property print.key=true --key-deserializer org.apache.kafka.common.serialization.LongDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152384.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月23日 下午12:00
下一篇 2022年6月23日 下午12:00


相关推荐

  • map用value值找key的两种方法

    map用value值找key的两种方法map用value值找key的两种方法Map中是一个key有且只有一个value.但是一个value可以对应多个key值.只用用特殊方法才能用value值来找key,以下就是用value值找key的两种方法publicstaticvoidmain(String[]args){//TODOAuto-generatedmethodstub…

    2022年7月23日
    17
  • 电脑蓝屏错误代码0x000000ED_0x00000019蓝屏win7

    电脑蓝屏错误代码0x000000ED_0x00000019蓝屏win7相信大家都遇到过电脑蓝屏的情况,而且蓝屏故障是一件非常麻烦的事情,前不久小编电脑出现蓝屏并且提示错误代码是0x000000BE,很多用户可能会直接选择重装系统,其实不用这么麻烦,出现0x000000BE很有可能是硬件设备驱动程序存在BUG或安装不正确引起,具体一起来看看。解决方法:在开机过程中按下f8键进入Windows高级启动菜单,进入安全模式,也许会有改善。再重启电脑,继续按F8键,此时可以选…

    2022年10月8日
    4
  • Lua教程(一):简介、优势和应用场景介绍

    Lua教程(一):简介、优势和应用场景介绍

    2026年3月12日
    2
  • Windows Xp 优化文件 的一段BAT代码

    Windows Xp 优化文件 的一段BAT代码WindowsXp 优化文件的一段 BAT 代码 echoofftitle 优化文件 startclscolo COLS 50LINES 27echo nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp WindowsXp 优化文件 echo nbsp nbsp nbsp nbsp PoweredByThu e

    2026年3月26日
    1
  • SDRAM控制器设计(8)SDRAM控制器仿真验证

    SDRAM控制器设计(8)SDRAM控制器仿真验证到此,简单的可进行读写操作的SDRAM控制器模块就设计好了。接下来,结合仿真模型(镁光官网提供的SDRAM模型)sdr文件,和编写的testbench文件验证所设计的控制器是否正确。testbench如下`timescale1ns/1ns`defineCLK100_PERIOD10modulesdram_control_tb;`include”../src/Sdr…

    2022年7月25日
    15
  • html怎么隐藏播放器_css遮罩

    html怎么隐藏播放器_css遮罩<!DOCTYPEhtml><htmllang=”en”><head><metacharset=”UTF-8″><metaname=”viewport”content=”width=device-width,initial-scale=1.0″><metahttp-equiv=”X-UA-Compatible”content=”ie=edge”><title>视.

    2025年5月26日
    12

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号