数据收集之Fluentd

数据收集之FluentdFluentd 是一个完全免费且开源的日志收集系统 性能敏感的部分用 C 语言编写 插件部分用 Ruby 编写 500 多种插件 只需很少的系统资源即可轻松实现 LogEverythin 一般叫 Fluentd 为 td agent Fluentd 与 td agent 关系 td agent 是 Fluentd 的稳定发行包 Fluentd 与 Flume 关系 是两个类似工具 都可用于数据采集 Fluentd 的 In

Fluentd

Fluentd是一个完全免费且开源的日志收集系统,性能敏感的部分用C语言编写,插件部分用Ruby编写,500多种插件,只需很少的系统资源即可轻松实现”Log Everything”。一般叫Fluentd为td-agent。

Fluentd与td-agent关系:td-agent是Fluentd的稳定发行包。

Fluentd与Flume关系:是两个类似工具,都可用于数据采集。Fluentd的Input/Buffer/Output类似于Flume的Source/Channel/Sink。

Fluentd主要组成部分

fluentd_struct

Fluentd 主要由Input输出、Buffer缓冲、Output输出三大部分组成。这三大部分都是以插件的形式存在。当然还有其他辅助插件如Filter、Formatter等用于数据处理或格式化。

快速部署

Fluentd官网下载对应版本并安装。

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | sh

启动Fluentd

service td-agent start

默认情况下,/etc/td-agent/td-agent.conf配置文件给了一个测试配置。可接收HTTP Post数据,并将其路由并写入到/var/log/td-agent/td-agent.log日志文件中。

curl -X POST -d 'json={"json":"Hello Fluentd"}' http://localhost:8888/debug /var/log/td-agent/td-agent.log收到的内容 debug: { 
  "json":"Hello Fluentd"}

Fluentd插件

Fluentd有七种类型的插件:输入(Input),分析器(Parser),过滤器(Filter),输出(Output),格式化(Formatter),存储(Storage),缓冲(Buffer)。

  • Input
    输入插件。内置的有tail、http、tcp、udp等。

  • Parser
    解析器。可自定义解析规则,如解析nginx日志。

  • Filter
    Filter插件,可过滤掉事件,或增加字段,删除字段。

  • Output
    输出插件。内置的有file、hdfs、s3、kafka、elasticsearch、mongoDB、stdout等。

  • Formatter
    Formatter插件。可自定义输出格式如json、csv等。

  • Storage
    Storage插件可将各状态保存在文件或其他存储中,如Redis、MongoDB等。

  • Buffer
    Buffer缓冲插件。缓冲插件由输出插件使用。在输出之前先缓冲,然后以如Kafka Producer Client的方式批量提交。有file、memory两种类型。flush_interval参数决定了提交的间隔,默认60秒刷新一次。

同一 Fluentd Agent同步不同类型日志到对应Kaka Topic

在/etc/td-agent目录创建配置文件browseFile_browseTopic.conf,内容如下:

 input file  <source> #input类型tail 类似于tail -F 命令 @type tail #日志文件路径 path /data/log/browse-*.log #偏移量路径 记录上一次读取的位置 td-agent用户需要写权限 pos_file /data/td-agent-data/pos/browse.pos #tag: match tag 把Event路由到不同output tag browse 
   
   #以json格式解析 @ 
   type json 
   
  source>  output kafka  #匹配tag 
   
   #输出到kafka @ 
   type kafka 
   #kafka brokers brokers node1: 
   6667,node2: 
   6667,node3: 
   6667 
   #kafka topic default_topic browse_log 
   #kafka data type output_data_ 
   type json 
   #kafka ack required_acks - 
   1 
   #kafka flush 间隔 flush_interval 
   5s 
   #缓冲 
   #缓冲类型 file buffer_ 
   type file 
   #缓存的文件路径 buffer_path /data/td-agent-data/buffer/browse_buffer 
  

在/etc/td-agent目录创建配置文件clickFile_clickTopic.conf,内容如下:

 input file  <source> #input类型tail 类似于tail -F 命令 @type tail #日志文件路径 path /data/log/click-*.log #偏移量路径 记录上一次读取的位置 td-agent用户需要写权限 pos_file /data/td-agent-data/pos/click.pos #tag: match tag 把Event路由到不同output tag click 
   
   #以json格式解析 @ 
   type json 
   
  source>  output kafka  #匹配tag 
   
   #输出到kafka @ 
   type kafka 
   #kafka brokers brokers node1: 
   6667,node2: 
   6667,node3: 
   6667 
   #kafka topic default_topic click_log 
   #kafka data type output_data_ 
   type json 
   #kafka ack required_acks - 
   1 
   #kafka flush 间隔 flush_interval 
   5s 
   #缓冲 
   #缓冲类型 file buffer_ 
   type file 
   #缓存的文件路径 buffer_path /data/td-agent-data/buffer/click_buffer 
  

在/etc/td-agent目录创建配置文件td-agent1.conf,内容如下:

#用户浏览日志 @include /etc/td-agent/browseFile_browseTopic.conf #用户点击日志 @include /etc/td-agent/clickFile_clickTopic.conf

启动td-agent

td-agent -c /etc/td-agent/td-agent1.conf -o /var/log/td-agent/td-agent1.log

向用户浏览日志日志文件追加数据

for i in `seq 1 1000`; do aa=`date "+%Y-%m-%d %H:%M:%S"` browse='{ 
  "userID":1,"productID":1,"event_time":12333,"log_type":"browse_log","now_date":"'"${aa}"'"}' echo ${browse} >> browse-.log echo ${browse} >> browse-.log sleep 1 done

向用户点击日志日志文件追加数据

click='{ 
  "userID":1,"productID":1,"event_time":12333,"log_type":"click_log"}' for i in `seq 1 1000`;do echo ${click} >> click-.log;sleep 1;done

结果: kafka topic browse_log 收到的日志

td-agent-browse.png

结果:kafka topic click_log 收到的日志

td-agent-click.png

同一 Fluentd Agent同步同一输入到不同输出

在/etc/td-agent目录创建配置文件 td-agent2.conf,内容如下:

配置input  #配置input 为tail @type tail #日志文件路径 path /data/log/test-*.log #偏移量路径 pos_file /data/td-agent-data/pos/test.pos #tag tag test 
   
     #原样输出 @ 
    type none 
    配置output 多个输出 
   
     #copy output plugin 将event拷贝到多个output @ 
    type copy 
    
      #输出到文件 @ 
     type file path / 
     data/log/testBackup.log 
     
    
      #输出到控制台 @ 
     type stdout 
     
    
      #输出到kafka @ 
     type kafka brokers node1: 
     6667,node2: 
     6667,node3: 
     6667 default_topic testTopic3 
     
   

启动td-agent

td-agent -c /etc/td-agent/td-agent2.conf -o /var/log/td-agent/td-agent2.log

向test-1.log追加日志

for i in `seq 1 1000`;do echo "Fluent is logging ....">> test-1.log;sleep 1;done

结果:本地目录

td-agent-local.png

结果:标准输出

td-agent-stdout.png

结果:kafka

td-agent-kafka.png

监控

进程监控

一个td-agent有两个进程:父进程、子进程。确保这两个进程存在。

ps w -C ruby -C td-agent --no-heading 81565 ? Sl 0:00 /opt/td-agent/embedded/bin/ruby /usr/sbin/td-agent --log /var/log/td-agent/td-agent.log --use-v1-config 81570 ? Sl 0:02 /opt/td-agent/embedded/bin/ruby -Eascii-8bit:ascii-8bit /usr/sbin/td-agent --log /var/log/td-agent/td-ag

指标监控

在td-agent1.conf配置文件增加如下配置:

#监控 <source> type monitor_agent bind 0.0.0.0 port 24220 
  source>

通过HTTP访问 http://node3:24220/api/plugins.json

{ "plugins":[ { "plugin_id":"object:d72aa8", "plugin_category":"input", "type":"tail", "config":{ "@type":"tail", "path":"/data/log/browse-*.log", "pos_file":"/data/td-agent-data/pos/browse.pos", "tag":"browse" }, "output_plugin":false, "retry_count":null }, { "plugin_id":"object:10351c8", "plugin_category":"input", "type":"tail", "config":{ "@type":"tail", "path":"/data/log/click-*.log", "pos_file":"/data/td-agent-data/pos/click.pos", "tag":"click" }, "output_plugin":false, "retry_count":null }, { "plugin_id":"object:d6c4b4", "plugin_category":"input", "type":"monitor_agent", "config":{ "type":"monitor_agent", "bind":"0.0.0.0", "port":"24220" }, "output_plugin":false, "retry_count":null }, { "plugin_id":"object:f94124", "plugin_category":"output", "type":"kafka", "config":{ "@type":"kafka", "brokers":"node1:6667,node2:6667,node3:6667", "default_topic":"browse_log", "output_data_type":"json", "required_acks":"-1", "flush_interval":"5s", "buffer_type":"file", "buffer_path":"/data/td-agent-data/buffer/browse_buffer" }, "output_plugin":true, "retry_count":0, "retry":{ } }, { "plugin_id":"object:ff62e8", "plugin_category":"output", "type":"kafka", "config":{ "@type":"kafka", "brokers":"node1:6667,node2:6667,node3:6667", "default_topic":"click_log", "output_data_type":"json", "required_acks":"-1", "flush_interval":"5s", "buffer_type":"file", "buffer_path":"/data/td-agent-data/buffer/click_buffer" }, "output_plugin":true, "retry_count":0, "retry":{ } } ] }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/228991.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午5:43
下一篇 2026年3月16日 下午5:44


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号