Flume 入门–几种不同的Sinks

Flume 入门–几种不同的Sinks主要介绍几种常见 Flume 的 Sink 汇聚点 1 LoggerSink nbsp 记录 INFO 级别的日志 一般用于调试 前面介绍 Source 时候用到的 Sink 都是这个类型的 Sink 必须配置的属性 属性说明 nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp channel nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp type nbsp nbsp nbsp nbsp nbsp nbsp Thecomponent needstobelog nbsp nbsp

主要介绍几种常见Flume的Sink–汇聚点

1.Logger Sink 

记录INFO级别的日志,一般用于调试。前面介绍Source时候用到的Sink都是这个类型的Sink

必须配置的属性:

案例:前面的例子都是这种类型的Sink

2.File Roll Sink

在本地文件系统中存储事件。每隔指定时长生成文件保存这段时间内收集到的日志信息。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
编写配置文件:
  
#命名Agent a1的组件
  
a1.sources  =  r1
  
a1.sinks  =  k1
  
a1.channels  =  c1
 
  
#描述/配置Source
  
a1.sources.r1.
type  
= http
  
a1.sources.r1.port  = 6666
 
  
#描述Sink
  
a1.sinks.k1.
type  
= file_roll
  
a1.sinks.k1.sink.directory = 
/home/park/work/apache-flume-1
.6.0-bin
/mysink
  
#描述内存Channel
  
a1.channels.c1.
type  
=  memory
  
a1.channels.c1.capacity  =  1000
  
a1.channels.c1.transactionCapacity  =  100
 
  
#为Channle绑定Source和Sink
  
a1.sources.r1.channels  =  c1
  
a1.sinks.k1.channel  =  c1

 启动flume:

1
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template7
.conf --name a1 -Dflume.root.logger=INFO,console

测试:

通过curl命令向目标主机发送请求,就会发现在指定的文件夹下出现记录收集日志的文件

3.Avro Sink

是实现多级流动 和 扇出流(1到多) 扇入流(多到1) 的基础。非常重要 但是需要多台机器

案例1.多级流动  h1流动到h2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
h2:
                
配置配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=9988
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template8
.conf --name a1 -Dflume.root.logger=INFO,console
 
                 
            
h1:
                
配置配置文件
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.138
                    
a1.sinks.k1.port=9988
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.chafile:
///C
:
/Users/park/Desktop/Day01_Flume/
%E6%96%87%E6%A1%A3
/Flume
%201.6.0%20User%20Guide%20%E2%80%94%20Apache%20Flume.htm
#irc-sinknnels=c1
                    
a1.sinks.k1.channel=c1

 启动flume

发送http请求到h1:

1
curl -X POST -d 
'[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' 
http:
//192
.168.242.133:8888

 稍等几秒后,发现h2最终收到了这条消息

案例2:扇出流(h1扇出到h2,h3)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
h2 h3:
                
配置配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=9988
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template8
.conf --name a1 -Dflume.root.logger=INFO,console
 
            
h1:
                
配置配置文件
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1 k2
                    
a1.channels=c1 c2
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.138
                    
a1.sinks.k1.port=9988
                    
a1.sinks.k2.
type
=avro
                    
a1.sinks.k2.
hostname
=192.168.242.135
                    
a1.sinks.k2.port=9988
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
a1.channels.c2.
type
=memory
                    
a1.channels.c2.capacity=1000
                    
a1.channels.c2.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1 c2
                    
a1.sinks.k1.channel=c1 
                    
a1.sinks.k2.channel=c2 

案例3:扇入流()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
m3:
                
编写配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
                    
#描述/配置Source
                    
a1.sources.r1.
type
=avro
                    
a1.sources.r1.bind=0.0.0.0
                    
a1.sources.r1.port=4141
                    
#描述Sink
                    
a1.sinks.k1.
type
=logger
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template
.conf --name a1 -Dflume.root.logger=INFO,console
             
            
m1、m2:
                
编写配置文件:
                    
#命名Agent组件
                    
a1.sources=r1
                    
a1.sinks=k1
                    
a1.channels=c1
 
                    
#描述/配置Source
                    
a1.sources.r1.
type
=http
                    
a1.sources.r1.port=8888
                    
#描述Sink
                    
a1.sinks.k1.
type
=avro
                    
a1.sinks.k1.
hostname
=192.168.242.135
                    
a1.sinks.k1.port=4141
                    
#描述内存Channel
                    
a1.channels.c1.
type
=memory
                    
a1.channels.c1.capacity=1000
                    
a1.channels.c1.transactionCapacity=1000
                    
#为Channel绑定Source和Sink
                    
a1.sources.r1.channels=c1
                    
a1.sinks.k1.channel=c1
                
启动flume:
                    
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template9
.conf --name a1 -Dflume.root.logger=INFO,console
                
m1通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
                    
[root@localhost conf]
# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
                
m2通过curl发送一条http请求,由于默认使用的是jsonHandler,数据格式必须是指定的json格式:
                    
[root@localhost conf]
# curl -X POST -d '[{ "headers" :{"flag" : "c"},"body" : "idoall.org_body"}]' http://0.0.0.0:8888
                
发现m3均能正确收到消息

 

4、HDFS Sink

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#命名Agent组件
    
a1.sources=r1
    
a1.sinks=k1
    
a1.channels=c1
 
    
#描述/配置Source
    
a1.sources.r1.
type
=http
    
a1.sources.r1.port=8888
    
#描述Sink
    
a1.sinks.k1.
type
=hdfs
    
a1.sinks.k1.hdfs.path=hdfs:
//0
.0.0.0:9000
/ppp
    
#描述内存Channel
    
a1.channels.c1.
type
=memory
    
a1.channels.c1.capacity=1000
    
a1.channels.c1.transactionCapacity=1000
    
#为Channel绑定Source和Sink
    
a1.sources.r1.channels=c1
    
a1.sinks.k1.channel=c1
           

 启动flume:

1
.
/flume-ng 
agent --conf ..
/conf 
--conf-
file 
..
/conf/template9
.conf --name a1 -Dflume.root.logger=INFO,console

 测试:通过利用curl给目的主机发送命令,会发现在HDFS中会生成相应的记录文件。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/215686.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午1:40
下一篇 2026年3月18日 下午1:40


相关推荐

  • navacate连接不上mysql_navicat连接mysql失败怎么办

    navacate连接不上mysql_navicat连接mysql失败怎么办Navicat 连接 mysql 数据库时 不断报 1405 错误 下面是针对这个的解决办法 MySQL 服务器正在运行 停止它 如果是作为 Windows 服务运行的服务器 进入计算机管理 gt 服务和应用程序 gt 服务 如果服务器不是作为服务而运行的 可能需要使用任务管理器来强制停止它 创建 1 个文本文件 此处命名为 mysql init txt 并将下述命令置于单一行中 SETPASSW

    2025年7月10日
    8
  • flatMap示例

    flatMap示例flatMap示例什么是flatMap()回顾下面的数据结构,#Stream<String[]>#Stream<Stream<String>>#String[][][[1,2],[3,4],[5,6]]在Java8中,我们可以使用flatMap将上述数据结构转化为一下结构#Stream<String>#String[][1,2,3,4,5,6]为什么要平流处理包含超过一个级别的流,例

    2022年6月1日
    35
  • 通义千问2.5-0.5B-Instruct政府服务:政策解读问答系统部署

    通义千问2.5-0.5B-Instruct政府服务:政策解读问答系统部署

    2026年3月14日
    2
  • 此av非彼”AV”

    此av非彼”AV”作者:王亨 ,R语言中文社区专栏作者,跟着菜鸟一起一步步学习R语言,争做R语言高手。个人公众号:跟着菜鸟一起学R语言(微信ID:learn_R) 最近发现一个特别有意思的…

    2026年2月13日
    3
  • networkmanager配置文件在哪_需要运行networkmanager

    networkmanager配置文件在哪_需要运行networkmanager原文链接http://live.gnome.org/action/login/NetworkManagerConfiguration 本文对了解NetworkManager的使用和开发有很大帮助,澄清了一些NetworkManager中使用的概念,对阅读源代码和修改Bug有很大的帮助。 设置服务(SettingsServices)通过D-Bus服务提供配置给N

    2022年10月4日
    4
  • 实现稀疏矩阵相乘C/C++

    实现稀疏矩阵相乘C/C++实现稀疏矩阵相乘 C C 1 问题描述 已知稀疏矩阵 A m1 n1 和 B m2 n2 求乘积 C m1 n2 A 300 nbsp 7 nbsp nbsp B 4 nbsp 1 nbsp C 1217 nbsp nbsp nbsp 000 1 nbsp nbsp nbsp nbsp 0 nbsp 0 nbsp nbsp nbsp nbsp 0 nbsp nbsp 2 nbsp nbsp nbsp 020 nbsp 0 nbsp nbsp nbsp nbsp 1 1 nbsp nbsp nbsp nbsp 0 nbsp nbsp nbsp 0 nbsp nbsp

    2026年3月18日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号