Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输

Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输Flume对接KafkaFlume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。1)配置flume(flume-kafka.conf)#definea1.sources=r1a1.sinks=k1a1.channels=c1#sourcea1

大家好,又见面了,我是你们的朋友全栈君。

Flume 对接 Kafka

Flume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。

那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。

1)配置 flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动消费者
kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
  1. 进入 flume 根目录下,启动 flume
bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4)启动nc发送数据

[bd@hadoop113 ~]$ nc localhost 44444
hello
OK
word
OK

结果如下
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
hello
word

Kafka数据分类

依据Kafka Sink的配置

Property Name Default Description
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here.

在消息头中携带了topic字段的话,该消息就会被发送到topic字段对应的topic去。

那么在flume接收到消息之后,可以通过拦截器为topic加上header,即可将其进行分类。

Flume拦截器如下:

public class JudgeTestStringInterceptor implements Interceptor { 
   

    // 声明一个存放事件的List
    private List<Event> allEvents;

    public void initialize () { 
   

        // 初始化
        allEvents = new ArrayList<Event>();
    }

    /** * 单个事件拦截 * @param event * @return */
    public Event intercept (Event event) { 
   

        // 1、获取事件中的头信息
        Map<String, String> headers = event.getHeaders();

        // 2、获取事件中的body信息
        String body = new String(event.getBody());

        // 3、根据body中是否有“test”来决定添加怎样的头信息
        // 有的话添加<topic, first>没有则添加<topic, second>
        if (body.contains("test")) { 
   
            headers.put("topic", "first");
        } else { 
   
            headers.put("topic", "second");
        }

        return event;
        // 如果返回null则认为该事件无用,将会被过滤
    }

    /** * 批量事件拦截 * @param list * @return */
    public List<Event> intercept (List<Event> list) { 
   

        // 1、清空集合
        allEvents.clear();

        // 2、遍历event
        for (Event event : list) { 
   
            // 3、给每个事件添加头信息
            allEvents.add(intercept(event));
        }

        return allEvents;
    }

    public void close () { 
   

    }

    // 定义一个Builder对象
    public static class Builder implements Interceptor.Builder { 
   

        public Interceptor build () { 
   

            return new JudgeTestStringInterceptor();
        }

        public void configure (Context context) { 
   

        }
    }
}

配置文件type-kafka.conf如下:

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.starnet.interceptor.JudgeTestStringInterceptor$Builder

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,两个消费者以及nc之后结果如下:

[bd@hadoop113 ~]$ nc localhost 44444
test
OK
hello
OK
word
OK


[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
test

[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic second
hello
word
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152396.html原文链接:https://javaforall.net

(0)
上一篇 2022年6月23日 上午11:36
下一篇 2022年6月23日 上午11:36


相关推荐

  • pycharm批量注释代码_pycharm批量缩进快捷键

    pycharm批量注释代码_pycharm批量缩进快捷键我们使用pycharm的时候,会遇到写注释的情况,单独一行还没事,直接加个#就可以解决问题,但是需要注释掉多行的代码的时候,我们如果,还是一个人一个敲#,就会很费时间,下面介绍一下pycharm里面批量注释的方法。当我们想要注释掉多行代码时,只需要Ctrl+a选中这几行代码,然后继续**Ctrl+/**就可以完成注释,取消注释也是同样的方法。…

    2022年8月25日
    7
  • win10强制删除文件夹(“你需要来自XXX的权限才能对此文件夹进行更改”的解决方法)

    win10强制删除文件夹(“你需要来自XXX的权限才能对此文件夹进行更改”的解决方法)win10强制删除文件夹(“你需要来自XXX的权限才能对此文件夹进行更改”的解决方法)使用命令行方式删除文件夹(这是目前可行的方式)第一步:以管理员账户打开powershell第二步:定位到要删除的文件夹所在目录第三步:给要删除的文件夹赋本机管理员Administrator权限第四步:修改对文件/文件夹的访问权限(赋删除权限)第五步:强制删除文件使用命令行方式删除文件夹(这是目前可行的方式)第一步:以管理员账户打开powershell快捷键win+x调出如下界面,点击WindowsPowerS

    2022年5月29日
    49
  • 若干道Swift面试题

    1,说说你认识的Swift是什么?Swift是苹果于2014年WWDC(苹果开发者大会)发布的新开发语言,可与Objective-C共同运行于MACOS和iOS平台,用于搭建基于苹果平台的应用程序。

    2021年12月23日
    61
  • MATLAB中plot函数的用法

    MATLAB中plot函数的用法使用 plot 绘制二维图像本文转自 http blog sina com cn s blog d8f783c90102 html 以及 https blog csdn net alvern zhang article details 51153058MATL 中 plot 函数常常被用于绘制各种二维图像 其用法也是多种多样 本文仅介绍 plot 函数的基本用法 使用 plot 函数绘制二维点图和线

    2026年3月19日
    2
  • QeePHP的ACL设置

    QeePHP的ACL设置又开始用 qeephp 开发了 个人感觉这个框架越用越觉得强大 完全不亚于 python 的 django 框架 唯一的遗憾是 qeephp 文档的完整性是如此之差 没办法 国内的 不扯没用的 进入正题 今天用了 ACL 访问控制 就一个字 方便 默认情况下 所有的访问控制规则都写在 acl yaml php 中 该文件的结构也很简单 控制器名称 这里一定要顶格 nbsp all

    2026年3月18日
    2
  • Google资深工程师深度讲解Go语言-channel 通道 (十)

    Google资深工程师深度讲解Go语言-channel 通道 (十)

    2022年2月16日
    57

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号