Flume与Kafka对接「建议收藏」

Flume与Kafka对接「建议收藏」引言flume为什么要与kafka对接?我们都知道flume可以跨节点进行数据的传输,那么flume与sparkstreaming对接不好吗?主要是flume对接到kafka的topic,可以给多个consumergroup去生成多条业务线。虽然flume中的channelselector中的副本策略也可以做多给多个sink传输数据,但是每个channelselector都是很消耗资源的。文章目录一、flume采集的数据发往一个topic二、flume采集的数据发往多个topic总结.

大家好,又见面了,我是你们的朋友全栈君。

引言
flume为什么要与kafka对接?
我们都知道flume可以跨节点进行数据的传输,那么flume与spark streaming对接不好吗?主要是flume对接到kafka的topic,可以给多个consumer group去生成多条业务线。虽然flume中的channel selector中的副本策略也可以给多个sink传输数据,但是每个channel selector都是很消耗资源的。其次,kafka也可以起到一个消峰的作用


一、flume采集的数据发往一个topic

这里为了方便测试,我采用的是netcat source、memory channel、kafka sink,当然你也可以采用你自己想要的方式配置flume,只需要根据官方文档修改对应的source和channel即可。

necat-flume-kafka.conf的配置文件如下:

#Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = wjt
a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.kafka.producer.linger.ms = 1 

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

其中你只需要修改sink中的topic和brokerList即可,当然你也可以增加其他的配置
1、启动kafka消费者
在这里插入图片描述
2、启动flume
在这里插入图片描述
3、启动netcat的客户端并发送几条数据
在这里插入图片描述
4、观察到kafka consumer很快就消费到了数据
在这里插入图片描述

二、flume采集的数据发往多个topic

如果数据有多种类型,比如点赞数据、评论数据、喜欢数据等等,是不是就要发往不同的topic去分析数据,这时候就需要用到flume的拦截器来做分类。
flume可以给event加上头信息,结合channel selector来发往不同的sink。
在flume官方文档可以看到:
在这里插入图片描述
意思是:如果你的event的头信息(k-v类型)包含一个topic字段,那么这个event将会被发送到对应的topic,并覆盖你配置的kafka.topic

拦截器的代码:

package wjt.demo;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/** * @description: * @author: wanjintao * @time: 2020/8/29 11:45 */
public class myInterceptor implements Interceptor { 
   

    //声明一个存放事件的集合
    private List<Event> addHeaderEvents;

    @Override
    public void initialize() { 
   

        //初始化存放事件的集合
        addHeaderEvents = new ArrayList<>();

    }

    //单个事件拦截
    @Override
    public Event intercept(Event event) { 
   

        //1. 获取事件中的头信息
        Map<String, String> headers = event.getHeaders();

        //2. 获取事件中的body信息
        String body = new String(event.getBody());

        //3. 根据body中是否有“Hello”来决定是否添加头信息
        if (body.contains("hello")) { 
   

            //4. 有hello添加“wan”头信息
            headers.put("topic", "www1");

        } else { 
   

            //4. 没有hello添加“tao”头信息
            headers.put("topic", "www2");

        }

        return event;
    }

    //批量事件拦截
    @Override
    public List<Event> intercept(List<Event> events) { 
   

        //1. 清空集合
        addHeaderEvents.clear();

        //2. 遍历events
        for (Event event : events) { 
   

            //3. 给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));

        }

        //4. 返回结果
        return addHeaderEvents;
    }

    @Override
    public void close() { 
   

    }

    public static class Builder implements Interceptor.Builder { 
   

        @Override
        public Interceptor build() { 
   
            return new myInterceptor();
        }

        @Override
        public void configure(Context context) { 
   

        }
    }

}

你只需要修改单个事件拦截的代码即可,我这里是如果数据包含hello,将会给事件加上header(topic,www1),反之则给事件加上header(topic,www2),打包上传至flume/lib目录下

netcat-flume-typekafka.conf的配置文件:

#Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = wjt.demo.myInterceptor$Builder

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = wjt
a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.kafka.producer.linger.ms = 1 

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

你只需要将a1.sources.r1.interceptors.i1.type的值改为你上面的拦截器的全类名$Builder即可
1、先启动consumer1和consumer2(flume启动顺序都是先启动服务端在启动客户端)
在这里插入图片描述
在这里插入图片描述
2、启动flume
在这里插入图片描述
3、启动netcat客户端
在这里插入图片描述
4、观察consumer消费的topic可以看到,www1只接受到了包含hello的数据,www2只接受到了没有包含hello的数据
在这里插入图片描述
在这里插入图片描述

总结

很多时候flume官方文档可以帮助我们解决很多自己想要的业务场景,我们要更多地去查看官方文档

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/152405.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 理解BN层「建议收藏」

    理解BN层「建议收藏」https://www.cnblogs.com/king-lps/p/8378561.html转载

    2022年10月14日
    2
  • Java中的快捷键大全「建议收藏」

    Java中的快捷键大全「建议收藏」1.常用快捷键(1)Ctrl+Space说明:内容助理。提供对方法,变量,参数,javadoc等得提示,应运在多种场合,总之需要提示的时候可先按此快捷键。注:避免输入法的切换设置与此设置冲突(2)Ctrl+Shift+Space说明:变量提示(3)Ctrl+/说明:添加/消除//注释,在eclipse2.0中,消除注释为Ctrl+\(4)Ctrl+Shift+/

    2022年7月8日
    18
  • intellij idea破解2019(2019年科目二考试全过程视频)

    本来看网上已经有写的不错的教程,结果用起来的时候发现有一些问题,首先是版本号的问题,另外是文件路径问题,还有就是碰到的修改hosts没有权限问题,还是想着记录一下,方便需要的童鞋使用。如果发现什么问题,请及时联系我。本文参考自:https://www.jianshu.com/p/3c87487e7121https://blog.csdn.net/qq_17213067/article/de…

    2022年4月16日
    43
  • 商标注册_企业软件

    商标注册_企业软件开发软件时,当用到商业用途时,注册码与激活码就显得很重要了。现在的软件激活成功教程技术实在在强了,各种国内外大型软件都有注册机制,但同时也不断地被激活成功教程。下面发的只是一个常用版本,发出源码被破就更容易了,但我们学习的是技术。当然也为以后自己的软件不会被轻易激活成功教程。第一步。根据卷标,CPU序列号,生成机器码//取得设备硬盘的卷标号       publicstaticstringG

    2022年9月1日
    2
  • 神经网络——最易懂最清晰的一篇文章「建议收藏」

    神经网络是一门重要的机器学习技术。它是目前最为火热的研究方向–深度学习的基础。学习神经网络不仅可以让你掌握一门强大的机器学习方法,同时也可以更好地帮助你理解深度学习技术。  本文以一种简单的,循序的方式讲解神经网络。适合对神经网络了解不多的同学。本文对阅读没有一定的前提要求,但是懂一些机器学习基础会更好地帮助理解本文。  神经网络是一种模拟人脑的神经网络以期能够实现类人工智能的机器学习技…

    2022年4月12日
    35
  • MLP多层感知机(人工神经网络)原理及代码实现

    MLP多层感知机(人工神经网络)原理及代码实现一、多层感知机(MLP)原理简介多层感知机(MLP,MultilayerPerceptron)也叫人工神经网络(ANN,ArtificialNeuralNetwork),除了输入输出层,它中间可以有多个隐层,最简单的MLP只含一个隐层,即三层的结构,如下图:从上图可以看到,多层感知机层与层之间是全连接的(全连接的意思就是:上一层的任何一个神经元与下一层的所有神经元都有连接)。多层感知机最底层…

    2022年6月17日
    107

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号