Flume对接Kafka详细过程[通俗易懂]

Flume对接Kafka详细过程[通俗易懂]一、为什么要集成Flume和Kafka一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Storm/SparkStreaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。因此数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计

大家好,又见面了,我是你们的朋友全栈君。

一、为什么要集成Flume和Kafka

一般使用 Flume + Kafka 来完成实时流式的日志处理,后面再连接上Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,当数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。

二、flume 与 kafka 的关系及区别

  • Flume
  1. Flume 是一个分布式、高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也可以将数据写到各种数据接受方,用于转发数据。Flume的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具

  2. flume分为sources,channels,sinks三部分,每一部分都可以根据需求定制。

  3. 与kafka相比,flume 可以定制很多数据源,减少开发量,因此做数据采集很好。

  • Kafka
  1. 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,及日志数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。

  2. kafka 是分布式消息中间件,自带存储,提供 push 和 pull 存取数据的功能,是一个非常通用消息缓存的系统,可以有许多生产者和很多的消费者共享多个主题

三、Flume 对接 Kafka(详细步骤)

(1). Kafka作为source端

1. 配置flume

a1.sources = r1  
a1.channels = c1  
a1.sinks = k1  
  
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = centos1:9092
a1.sources.r1.kafka.topics = mytopic
a1.sources.r1.kafka.consumer.group.id = group1
a1.sources.r1.channels=c1  


a1.channels.c1.type=memory  
a1.channels.c1.capacity=1000  
a1.channels.c1.transactionCapacity=100  


a1.sinks.k1.type=logger  
a1.sinks.k1.channel=c1  

2. 启动flume

[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console

3. 启动Kafka producer

[hadoop@master1 ~]# kafka-console-producer.sh --broker-list master1:2181,slave1:2181,slave2:2181 --topic hello

(2). Kafka作为sink端

1. 配置flume

a1.sources = r1
a1.sinks = k1
a1.channels = c1


# netcat 监听端口
a1.sources.r1.type = netcat
a1.sources.r1.bind =master1
a1.sources.r1.port = 10000
a1.sources.r1.channels = c1 
# 一行的最大字节数
a1.sources.r1.max-line-length = 1024000


# channels具体配置
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# KAFKA_sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = hello
a1.sinks.k1.brokerList = master1:9092,slave1:9092,slave2:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

2. 启动zookeeper集群

[hadoop@master1 ~]# zkServer.sh start

3. 启动kafka集群

[hadoop@master1 ~]# kafka-server-start.sh /usr/local/src/kafka/config/server.properties

kafka后台运行:kafka-server-start.sh /usr/local/src/kafka/config/server.properties 1>/dev/null 2>&1 &
在这里插入图片描述

4.创建并查看topic

[hadoop@master1 ~]# kafka-topics.sh --create --zookeeper master1:2181,slave1:2181,slave2:2181 --replication-factor 2 --topic hello --partitions 1

[hadoop@master1 ~]# kafka-topics.sh --list --zookeeper master1:2181,slave1:2181,slave2:2181 

在这里插入图片描述

5. 创建kafka消费者

[hadoop@master1 ~]# kafal-console-consumer.sh --zookeeper master1:2181,slave1:2181,slave2:2181 --topic hello --from-beginning

6. 启动flume

[hadoop@master1 ~]# flume-ng agent -c /usr/local/src/flume/conf -f /usr/local/src/flume/conf/hdfs_skin.conf -n a1 -Dflume.root.logger=DEBUG,console

7. 向flume端口发送消息

[hadoop@master1 ~]# telnet master1 10000

8. 在kafka消费者接收信息

在这里插入图片描述

如有错误,欢迎私信纠正,谢谢支持!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152369.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • ISO IEC 27001 企业信息安全管理要求[通俗易懂]

    ISO IEC 27001 企业信息安全管理要求[通俗易懂]ISO27001和ISO20000认证已经成为企业核心竞争力的重要标志。ISOIEC270012013中文版-制造文档类资源-CSDN下载ISOIEC270012013中文版更多下载资源、学习资料请访问CSDN下载频道.https://download.csdn.net/download/std86021/83901501?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164816627616780255250066%2522%252C%

    2025年6月6日
    2
  • ajax跨域请求jsonp完整示例

    ajax跨域请求jsonp完整示例最经用到jsonp(ajax)的跨域请求,在这分享给大家,有需要用到的一看就能明白。具体步骤如下:1.首先客户端即页面script中调用代码如下:        varcardNumber="***********"; $.ajax({ type:"GET", url:’你请求的服务地址?idCard=’+cardNumber, dataType:…

    2022年6月17日
    103
  • vector初始化方法_vector初始化大小

    vector初始化方法_vector初始化大小vector类为内置数组提供了一种替代表示,与string类一样vector类是随标准C++引入的标准库的一部分 ,为了使用vector我们必须包含相关的头文件  :#include使用vector有两种不同的形式,即所谓的数组习惯和 STL习惯。一、数组习惯用法1. 定义一个已知长度的vector:vectorivec(10);  //类似

    2022年9月18日
    5
  • Folium_个人基础信息介绍

    Folium_个人基础信息介绍folium基础内容介绍1.简介​ folium是js上著名的地理信息可视化库leafet.js为Python提供的接口,通过它,我们可以通过在Python端编写代码操纵数据,来调用leaflet的相关功能,基于内建的osm或自行获取的osm资源和地图原件进行地理信息内容的可视化,以及制作优美的可交互地图,是通过不断添加图层元素来定义一个Map对象,最后以几种方式将Map对象展现出来。​ 而在Map对象的生成形式上,可以在定义所有的图层内容之后,将其保存为html文件在浏览器中独立显示,也可以基于j

    2025年7月3日
    2
  • 建立数据库,建立一个“学生”表student。[通俗易懂]

    建立数据库,建立一个“学生”表student。[通俗易懂]1.建立数据库,建立一个“学生”表student。2.设计思想:首先利用createdatabase语句建立一个数据库,再用createtable语句按要求建立基本表,再按照规则添加数据。3.实验代码及注释:创建数据库mysql>createdatabaseymz;QueryOK,1rowaffected(0.03sec)使用数据库mysql>us…

    2022年7月24日
    24
  • 产品模块化设计_pom设计模式

    产品模块化设计_pom设计模式对于,ecmall的本身自带的模板,可能很多用过的朋友都知道,其实还有许多功能根本无法实现。从根本上来讲,不如Smarty强大,但本人也试过将smarty引入但不是很成功,究其原因,ecmall的模板,并不是单纯地模板,而是在解析时参入了许多其它业务的逻辑,所以,直接以smarty来代替ecmall本身的模板,在widget的开发时会有很大的问题。所以,为了灵活情,本人,在ecmall

    2025年8月11日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号