kafka应用场景有哪些_kafka顺序性的消费

kafka应用场景有哪些_kafka顺序性的消费序在学习一门新技术之前,我们需要先去了解一下这门技术的具体应用场景,使用它能够做什么,能够达到什么目的,学习kafka的初衷是用作消息队列;但是还可以使用KafkaStream进行一些实时的流计算,多用于大数据处理;也可以做日志收集汇总、网站活动跟踪等任务。消息队列kafka可以很好的替代一些传统的消息系统,kafka具有更好的吞吐量,内置的分区使kaf…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

在学习一门新技术之前,我们需要先去了解一下这门技术的具体应用场景,使用它能够做什么,能够达到什么目的,学习kafka的初衷是用作消息队列;但是还可以使用Kafka Stream进行一些实时的流计算,多用于大数据处理;也可以做日志收集汇总、网站活动跟踪等任务。

消息队列

kafka可以很好的替代一些传统的消息系统,kafka具有更好的吞吐量,内置的分区使kafka具有更好的容错和伸缩性,这些特性使它可以替代传统的消息系统,成为大型消息处理应用的首选方案。

场景:异步、解耦、削峰填谷

  1. 生成订单:给不同的产品业务线分配同一个topic的不同partition,用户下单后根据订单类型发送到对应的partition
  2. 消息通知:用户登录后计算积分
  • 消息生产者

    public static void main(String[] args) throws Exception {
      Properties prop = new Properties();
      prop.put("bootstrap.servers", "127.0.0.1:9092");
      prop.put("acks", "all");
      prop.put("retries", "0");
      // 缓冲区大小
      prop.put("batch.size", "10");
      prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
      for (int i = 0; i < 101; i++) {
        ProducerRecord<String, String> record = new ProducerRecord<>("my_topics", "value_" + i);
        // 阻塞到消息发送完成
        producer.send(record).get();
      }
      // 刷新缓冲区,发送到分区,并清空缓冲区
      // producer.flush();
      // 关闭生产者,会阻塞到缓冲区内的数据发送完
      producer.close();
      // producer.close(Duration.ofMillis(1000));
    }
    

    生产者发送消息是先将消息放到缓冲区,当缓冲区存满之后会自动flush,或者手动调用flush()方法

  • 消息消费者

    public static void main(String[] args) {
      Properties properties = new Properties();
      properties.put("bootstrap.servers", "127.0.0.1:9092");
      properties.put("group.id", "cc_consumer");
      properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
      // 指定topic
      consumer.subscribe(Arrays.asList("my_topics"));
      // 指定topic的partition
      // TopicPartition partition0 = new TopicPartition("my_topics", 10);
      // consumer.assign(Arrays.asList(partition0));
      try {
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.toString());
          }
        }
      } finally {
        consumer.close(Duration.ofMillis(2000));
      }
    }
    

流计算

[todo]

日志收集

应用程序的日志可以通过log4j收集日志信息,并将日志直接打到kafka中:客户端—>应用—>kafka

SpringBoot中默认使用的是logback,所以要在引入SpringBoot的jar包时排除掉logback的jar包

日志消息发送有同步和异步两种方式,由KafkaAppender中的syncSend属性决定,默认为true(同步)

> <Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false">
>
  • pom.xml
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
  <exclusions>
    <exclusion>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-logging</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<!-- springboot 1.3.x之前版本是log4j,之后版本都是log4j2 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
  • log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="off">
    <Properties>
    </Properties>
    <Appenders>
        <Console name="STDOUT" target="SYSTEM_OUT">
            <PatternLayout pattern="%d %p %c{1.} %t %m%n"/>
        </Console>
	      <!--kafka topic-->
        <Kafka name="KAFKA-LOGGER" topic="my_topics">
          	<!--JsonLayout:日志格式为json,方便在ES中处理-->
            <JsonLayout/>
          	<!--kafka server的ip:port-->
            <Property name="bootstrap.servers">127.0.0.1:9092</Property>
            <Property name="retries">3</Property>
            <Property name="linger.ms">1000</Property>
            <Property name="buffer.memory">10485760</Property>
        </Kafka>
        <Async name="ASYNC-KAFKA-LOGGER">
            <AppenderRef ref="KAFKA-LOGGER"/>
            <LinkedTransferQueue/>
        </Async>
    </Appenders>
    <Loggers>
      	<!--日志级别大于info都会被记录到Kafka-->
        <Logger name="cc.kevinlu.springbootkafka.controller.MessageController" level="info"
                additivity="false">
            <AppenderRef ref="KAFKA-LOGGER"/>
        </Logger>
        <!-- Root表示所有Logger用Root中的Appender打印日志  -->
        <Root level="info">
            <AppenderRef ref="STDOUT"/>
        </Root>
    </Loggers>
</Configuration>
  • code
@GetMapping("/log")
public String sendLog() {
  for (int i = 0; i < 10; i++) {
    log.info("kafka log i = " + i);
  }
  return "success";
}
  • consumer视图

image-20200419032218971

网站活动跟踪

  1. 前端Nodejs控制

    Node接入kafka需要使用kafka-node库,下面是网上的例子

    var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
    /**
     * 定义生产类
     * partitionerType 定义
     * 0:默认模式 只产生数据在第一个分区
     * 1:随机分配,在分区个数内,随机产生消息到各分区
     * 2:循环分配,在分区个数内,按顺序循环产生消息到各分区
    */   
    var producerOption = {
        requireAcks: 1,
        ackTimeoutMs: 100,
        partitionerType: 0 //默认为第一个分区
    };
    var producer = new Producer(client,producerOption);
    /**
     * TOPIC的创建需要在命令行进行创建,以便指定分区个数以及备份个数
     * PS:kafka-node的创建topic不行,不能创建分区
     * 产生消息,如果不指定partition
     * 则根据 partitionerType 的值来指定发送数据到哪个分区
     * 我们创建的topic-test-one只有一个分区,所以只能产生数据到第1个分区(下标0),否则不会生产数据
     */
    function getPayloads(){
        return [
            {topic:"topic-test-one",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
        ];
    }
    
    producer.on("ready",function(){
        setInterval(function(){
            producer.send(getPayloads(),function(err,data){
                if(!err){
                    console.log("send message complete!data:"+JSON.stringify(data),new Date());
                }
            });
         },1000);
    });
    
    producer.on('error', function (err) {console.log("send message error!\r\n"+err);})
    
  2. 后端日志控制

    后端也可以使用log4j的日志系统来完成,拦截所有需要监控的api请求,使用log4j输出日志到kafka队列中,和上述日志收集方法相同。若同一个应用中需要通过日志输出到kafka的多个topic中,可以使用log4j的Marker标记来区分,配置如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration status="off">
        <Properties>
        </Properties>
        <Appenders>
            <Console name="STDOUT" target="SYSTEM_OUT">
                <PatternLayout pattern="%d %p %c{1.} %t %m%n"/>
            </Console>
          	<!-- 日志收集 -->
            <Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false">
                <JsonLayout/>
                <Property name="bootstrap.servers">127.0.0.1:9092</Property>
                <Property name="retries">3</Property>
                <Property name="linger.ms">1000</Property>
                <Property name="buffer.memory">10485760</Property>
                <Filters>
                  	<!-- 通过Marker过滤消息 -->
                    <MarkerFilter marker="Kafka" onMatch="ACCEPT" onMismatch="DENY"/>
                </Filters>
            </Kafka>
          	<!-- 轨迹跟踪 -->
            <Kafka name="KAFKA-TRACK-LOGGER" topic="cc_test1" syncSend="false">
                <JsonLayout/>
                <Property name="bootstrap.servers">127.0.0.1:9092</Property>
                <Property name="retries">3</Property>
                <Property name="linger.ms">1000</Property>
                <Property name="buffer.memory">10485760</Property>
                <Filters>
                  	<!-- 通过Marker过滤消息 -->
                    <MarkerFilter marker="Track" onMatch="ACCEPT" onMismatch="DENY"/>
                </Filters>
            </Kafka>
            <Async name="ASYNC-KAFKA-LOGGER">
                <AppenderRef ref="KAFKA-LOGGER"/>
                <AppenderRef ref="KAFKA-TRACK-LOGGER"/>
                <LinkedTransferQueue/>
            </Async>
        </Appenders>
        <Loggers>
            <Logger name="cc.kevinlu.springbootkafka.controller" level="info"
                    additivity="false">
                <AppenderRef ref="KAFKA-LOGGER"/>
                <AppenderRef ref="KAFKA-TRACK-LOGGER"/>
            </Logger>
            <Root level="info">
                <AppenderRef ref="STDOUT"/>
            </Root>
        </Loggers>
    </Configuration>
    
    private final static Marker KAFKA_MARKER       = MarkerManager.getMarker("Kafka");
    private final static Marker KAFKA_TRACK_MARKER = MarkerManager.getMarker("Track");
    
    @GetMapping("/log")
    public String sendLog() {
      // 轨迹跟踪
      log.info(KAFKA_TRACK_MARKER, "send async message!");
      for (int i = 0; i < 10; i++) {
        // 日志收集
        log.info(KAFKA_MARKER, "kafka log i = {}", i);
      }
      return "success";
    }
    
  3. 前端+后端组合

    后端提供API供前端传递轨迹,后端接收到请求之后将消息同步到kafka中。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/181994.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • word的样式设置在哪_word怎么设置目录

    word的样式设置在哪_word怎么设置目录一般自己写文档就用typora了,便捷美观,但是在工作上又不得不用word写文档,我对审美、格式比较有强迫症,有的小公司没有形成自己的文档规范,或者所谓的规范也只是写好了格式的文档,你往里面填内容就可

    2022年8月2日
    6
  • Idea激活码永久有效Idea2020.3.2激活码教程-持续更新,一步到位

    Idea激活码永久有效Idea2020.3.2激活码教程-持续更新,一步到位Idea激活码永久有效2020.3.2激活码教程-Windows版永久激活-持续更新,Idea激活码2020.3.2成功激活

    2022年6月17日
    66
  • 关于pdb文件

    关于pdb文件关于pdb文件当程序在VS上编译时,程序所依赖的所有动态链接库(dll文件)也会被编译,编译过程中每个dll都会产生一个pdb文件,又称为“符号文件”,是一个存储数据的信息文件,其包含dll库在编译过程的某些调试信息,例如程序中所用到的全局变量、局部变量、函数名以及他们的入口地址等。当使用VS调试程序时,会默认加载你的程序以及程序依赖的dll库产生的所有pdb文件,但是结…

    2022年6月2日
    60
  • 湖南省中职学业水平考试复习试题(语文)

    湖南省中职学业水平考试复习试题(语文)语文文化科题库选择题1.下列选项中的词语书写有错误的一项是(B)A.湿润脑髓B.锐智自栩C.大度丰富D.蛮横磕头2.下列选项中的惯用词语,使用不得体的一项是(C)A.学生给一位刚刚病愈后的老师写的信,最后的致敬语是“敬祝痊安”。B.有位海外游子给其祖父写信,落款是“XX顿首”。C.有位长辈给侄儿写信说:“此事望你钧裁。”D.给朋友写信,末…

    2025年11月9日
    5
  • java 二维数组 数据库_java 二维数组如何存入数据库

    java 二维数组 数据库_java 二维数组如何存入数据库usingSystem;usingSystem.Linq;usingSystem.Text;usingSystem.Windows.Forms;usingSystem.Xml;usingSystem.Xml.Serialization;usingSystem.IO;namespaceWindowsFormsApplication1{publicpartialclassForm…

    2022年5月16日
    39
  • Python面试题之基础篇(一)「建议收藏」

    PHP中文网为大家准备了Python面试题,本文是一些基础问题。例如:为什么学习Python、通过什么途径学习Python、谈谈对Python和其他语言的区别、简述解释型和编译型编程语言,等等。

    2022年1月18日
    102

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号