Flume架构以及应用介绍

Flume架构以及应用介绍在具体介绍本文内容之前 先给大家看一下 Hadoop 业务的整体开发流程 从 Hadoop 的业务开发流程图中可以看出 在大数据的业务处理过程中 对于数据的采集是十分重要的一步 也是不可避免的一步 从而引出我们本文的主角 Flume 本文将围绕 Flume 的架构 Flume 的应用 日志采集 进行详细的介绍 一 Flume 架构介绍 1 Flume 的概念 flume 是分布式的日志

 # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100

3>通过channel将source与sink连接起来

 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent的shell操作:

 flume-ng agent -n a1 -c ../conf -f ../conf/example.file -Dflume.root.logger=DEBUG,console 
Property Name Default Description channels – type – The component type name, needs to be netcat bind – 日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听 port – 日志需要发送到的端口号,该端口号要有netcat类型的source在监听 

a) 编写配置文件:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console

c) 使用telnet发送数据

telnet 192.168.80.80 44444 big data world!(windows中运行的)
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/netcat.conf -Dflume.root.logger=DEBUG,console

c) 使用telnet发送数据

telnet 192.168.80.80 44444 big data world!(windows中运行的)
Property Name Default Description channels – type – The component type name, needs to be spooldir. spoolDir – Spooling Directory Source监听的目录 fileSuffix .COMPLETED 文件内容写入到channel之后,标记该文件 deletePolicy never 文件内容写入到channel之后的删除策略: never or immediate fileHeader false Whether to add a header storing the absolute path filename. ignorePattern ^$ Regular expression specifying which files to ignore (skip) interceptors – 指定传输中event的head(头信息),常用timestamp

Spooling Directory Source的两个注意事项:

①If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. 即:拷贝到spool目录下的文件不可以再打开编辑 ②If a file name is reused at a later time, Flume will print an error to its log file and stop processing. 即:不能将具有相同文件名字的文件拷贝到这个目录下

a) 编写配置文件:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console

c) 使用cp命令向Spooling Directory 中发送数据

 cp datafile /usr/local/datainput (注:datafile中的内容为:big data world!)
[root@hadoop80 datainput]# ls datafile.COMPLETED

案例4:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:hdfs Channel:file (相比于案例3的两个变化)

a) 编写配置文件:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /usr/local/datainput a1.sources.r1.fileHeader = true a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp # Describe the sink # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/spool.conf -Dflume.root.logger=DEBUG,console

c) 使用cp命令向Spooling Directory 中发送数据

 cp datafile /usr/local/datainput (注:datafile中的内容为:big data world!)

a) 编写配置文件:

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /usr/local/log.file # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b)在hive中建立外部表—–hdfs://hadoop80:9000/dataoutput的目录,方便查看日志捕获内容

hive> create external table t1(infor string) > row format delimited > fields terminated by '\t' > location '/dataoutput/'; OK Time taken: 0.284 seconds

c) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/exec.conf -Dflume.root.logger=DEBUG,console

d) 使用echo命令向/usr/local/datainput 中发送数据

 echo big data > log.file
hive> select * from t1; OK big data Time taken: 0.086 seconds

e)使用echo命令向/usr/local/datainput 中在追加一条数据

echo big data world! >> log.file
hive> select * from t1; OK big data big data world! Time taken: 0.511 seconds

总结Exec source:Exec source和Spooling Directory Source是两种常用的日志采集的方式,其中Exec source可以实现对日志的实时采集,Spooling Directory Source在对日志的实时采集上稍有欠缺,尽管Exec source可以实现对日志的实时采集,但是当Flume不运行或者指令执行出错时,Exec source将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。

Property Name Default Description channels – type – The component type name, needs to be avro bind – 日志需要发送到的主机名或者ip,该主机运行着ARVO类型的source port – 日志需要发送到的端口号,该端口要有ARVO类型的source在监听

1)编写配置文件

# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 192.168.80.80 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop80:9000/dataoutput a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S a1.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /usr/flume/checkpoint a1.channels.c1.dataDirs = /usr/flume/data # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

b) 启动flume agent a1 服务端

flume-ng agent -n a1 -c ../conf -f ../conf/avro.conf -Dflume.root.logger=DEBUG,console

c)使用avro-client发送文件

flume-ng avro-client -c ../conf -H 192.168.80.80 -p 4141 -F /usr/local/log.file

注:log.file文件中的内容为:

[root@hadoop80 local]# more log.file big data big data world!

通过上面的几个案例,我们可以发现:flume配置文件的书写是相当灵活的—-不同类型的Source、Channel和Sink可以自由组合!

如有问题,欢迎留言指正!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203923.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午9:17
下一篇 2026年3月19日 下午9:17


相关推荐

  • js中clearInterval无效,以及setInterval中断后重新执行

    js中clearInterval无效,以及setInterval中断后重新执行clearInterva 失效的原因 setInterval 每执行一次 则返回一个唯一 id 所以 setInterval 执行了 n 次 那么需要调用 clearInterva 也是 nci 出现 clearInterva 失效的情况 请查看每次调用 setInterval 是否都 clearInterva 了

    2026年3月16日
    3
  • Android P Preview1 兼容要点[通俗易懂]

    AndroidPPreview1,昨天3.8发布,兼容Preview1主要改下targetSdkVersion=28compileSdkVersion为前一个版本(比如27)在AndroidStudio3.2下能完整支持AndroidPPreview1AndroidPPreview1要点1.1约束调用非API接口:比如用JNI、反射来调用一下系统类、方法    Android…

    2022年4月15日
    47
  • 特色网站收集(332个)「建议收藏」

    特色网站收集(332个)「建议收藏」1.del.icio.ushttp://del.icio.us/在线收藏夹.域名很有创意.影响也很广.支持从浏览器导出和导出到浏览器.2.Anonymousehttp://anonymouse.org

    2022年7月3日
    45
  • eclipse字体大小调整

    eclipse字体大小调整Eclipse 字体有两处 一处是控制台的字体 一处是主窗口 这里分别介绍控制台和主窗口字体的调节方法 nbsp nbsp nbsp Window gt Preferences gt General gt Appearance gt ColorsandFon gt Basic gt TextFont gt Edit 调节控制条字体大小 nbsp nbsp nbsp Window

    2026年3月18日
    2
  • 空指针异常主要原因以及解决方案

    空指针异常主要原因以及解决方案空指针异常产生的主要原因如下 1 当一个对象不存在时又调用其方法会产生异常 obj method obj 对象不存在 2 当访问或修改一个对象不存在的字段时会产生异常 obj method method 方法不存在 3 字符串变量未初始化 4 接口类型的对象没有用具体的类初始化 比如 Lista 会报错 Lista newArrayList 则不会报错了当

    2026年3月19日
    2
  • SpringBoot整合TKmybatis

    SpringBoot整合TKmybatisSpringBoot 整合 TKmybatis 前言 最近公司在用 tkmybatis 于是乎去看了一下 挺好用的 所以在这里记录一下其用法 一什么是 TKmybatis 就我个人的理解而言 tkmybatis 就是一个框架或者说工具 其在 mybatis 的基础上进行了再次封装 使得我们可以不用写简单而重复的 CRUD 代码 又一次解放了生产力 如果涉及到多表查询 需要自己写 sql 哦 因为 tkmybat

    2026年3月16日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号