springboot整合kafka入门

springboot整合kafka入门springboot整合kafka入门kafka基本概念本机安装kafka测试安装kafka(mac下)本机测试kafkaspringboot整合kafka(IDEA)测试kafka基本概念producer:生产者,负责发布消息到kafkacluster(kafka集群)中。生产者可以是web前端产生的pageview,或者是服务器日志,系统CPU、memory等。consumer:消费者,每个consumer属于一个特定的consuergroup(可为每个consumer指定group

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

kafka基本概念

在这里插入图片描述
producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。

consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,要指定消费者接受的消息的topic,该消费者只会接受该topic的消息。

topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。

broker: kafka集群包含一个或多个服务器,这些服务器就叫做broker。

本机安装kafka测试

安装kafka(mac下)

kafka下载:官网下载 kafka_2.13-2.7.0.tgz,直接解压即可。

本机测试kafka

1、进入到kafka的解压目录,输入命令启动zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

Jetbrains全家桶1年46,售后保障稳定

打开另一个终端输入命令启动kafka:

./bin/kafka-server-start.sh config/server.properties 

2、服务启起来后,可以创建生产者和消费者了。
再打开另一个终端输入命令创建生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

broker-list: 参数指定生产者所使用的broker
localhost: 9092 参数表示broker,这个broker为本机(127.0.0.1),且使用的端口是kafka的默认端口号是9092
topic: 参数表示生产者生产的消息的topic 为 “test_topic”

最后再打开另一个终端创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

bootstrap-server: 是指定consumer从哪里(broker)取出消息
topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。
from-beginning: Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的,因为只能订阅最新发送的数据。通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可

3、都创建完了可以通过生产者输入消息,消费者来接收并显示消息,效果图如下:
在这里插入图片描述

springboot整合kafka(IDEA)

注意: kafka要是部署在服务器的话,本机就 要和服务器之间能ping通。

1、创建springboot项目:
在这里插入图片描述在这里插入图片描述2、创建两个类,分别为生产者和消费者
项目目录结构:
在这里插入图片描述
配置文件application.yml:(一般项目自动生成的是applicaiton.properties,但为了书写简便,改成yml)

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
    producer: #生产者配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: #消费者配置
      group-id: test #设置消费者的组id
      enable-auto-commit: true
# auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

springboot启动类入口,KafkaStudyApplication.java:

package com.study.kafka.kafka_study;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaStudyApplication { 
   

    public static void main(String[] args) { 
   
        SpringApplication.run(KafkaStudyApplication.class, args);
    }

}

TestKafkaProducerController.java:(生产者)

package com.study.kafka.kafka_study;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController     //定义这是一个控制器,可以通过浏览器访问
@RequestMapping("/kafka")
public class TestKafkaProducerController { 
   

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

	//当在浏览器上输入http://localhost:8080/kafka/send?msg=abc,就会发送abc到服务器上去让消费者接收,msg对应下面的String msg
    @RequestMapping("/producerSend")
    public String send(String msg){ 
   
        kafkaTemplate.send("test_topic", msg); //使用kafka模板发送信息
        return "success";
    }

}

TestConsumer.java:(消费者)

package com.study.kafka.kafka_study;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class TestConsumer { 
   

    /** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * @param record record变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 * */
    @KafkaListener(topics = "test_topic")
    public void listen (ConsumerRecord<?, ?> record) throws Exception { 
   
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

测试

1、运行KafkaStudyApplication.java之后,终端上输入消息时,不仅终端上(服务器)运行的测试消费者能收到,IDEA上的程序也能收到。
在这里插入图片描述在这里插入图片描述2、在浏览器上输入http://localhost:8080/kafka/producerSend?msg=web world31231,不仅IDEA上的消费者能收到,在终端(服务器)上运行的测试消费者也能收到:(其中8080是tomcat服务器的端口,springboot默认下带的是tomcat)
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/232030.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • C++的this指针

    C++的this指针C++的this指针当你进入一个房子后,你可以看见桌子、椅子、地板等,但是房子你是看不到全貌了。对于一个类的实例来说,你可以看到它的成员函数、成员变量,但是实例本身呢?this是一个指针,它时时刻刻指向你这个实例本身。C++在初始化对象时,每个对象中的数据成员都会得到系统分配的自己独立的存储空间。对于成员函数来说,一个函数的代码段在内存中只有一份,同一个类中的不同对象在调用自己的成…

    2022年5月16日
    37
  • Vue Router Tab「建议收藏」

    Vue Router Tab「建议收藏」介绍VueRouterTab是基于Vue.js和VueRouter的路由页签组件,用来实现多页签页面的管理。官网演示包含的功能✅响应路由变化来打开或切换页签✅页签过多鼠标滚轮滚动✅页签拖拽排序✅支持页签打开、切换、关闭、刷新、重置等操作✅Iframe页签嵌入外部网站✅组件个性化设置:过渡效果、自定义插槽、页签右键菜单✅多语言支持✅缓存控制:页签规则、页签是否缓存、最大缓存数、是否复用组件等✅动态页签信息:标题、图标、提示✅初始页签数据,进入页

    2022年7月27日
    13
  • 如何在阿里云服务器部署程序并用域名直接访问

    如何在阿里云服务器部署程序并用域名直接访问闲来无事,买了一个最便宜的阿里云服务器来学习,一年三百多,适合新手了解程序等。一般买服务器只有公网的IP地址,也就是类似10.205.25.32这种形式的。如何想用域名(例如www.baidu.com)直接访问的你网站,可以在阿里云直接再买个域名,将域名解析绑定ip地址。有人想知道怎么解析域名,我这里补充一下域名相关内容1.域名:…

    2022年6月18日
    31
  • 机器学习-数据归一化方法(Normalization Method)「建议收藏」

    机器学习-数据归一化方法(Normalization Method)「建议收藏」我的个人微信公众号:Microstrong微信公众号ID:MicrostrongAI公众号介绍:Microstrong(小强)同学主要研究机器学习、深度学习、计算机视觉、智能对话系统相关内容,分享在学习过程中的读书笔记!期待您的关注,欢迎一起学习交流进步!知乎专栏:https://zhuanlan.zhihu.com/Microstrong个人博客:https://blog.csd…

    2022年6月23日
    25
  • Alex 的 Hadoop 菜鸟教程: 第9课 zookeeper 介绍和使用

    Alex 的 Hadoop 菜鸟教程: 第9课 zookeeper 介绍和使用看了之前的教程,会发现多处出现zookeeper,比如hadoop的autofailover得用zookeeper,Hbase的RegionServer也得用zookeeper。其实不止hadoop,包括现在小有名气的Storm用的也是zookeeper。那么zookeeper究竟是做什么用的?

    2022年5月31日
    37
  • 计算机发展史上代表性的人物,计算机发展史最具影响力人物「建议收藏」

    计算机发展史上代表性的人物,计算机发展史最具影响力人物「建议收藏」1.冯·诺依曼 1903-1957开创了现代计算机理论,其体系结构沿用至今,而且他早在40年代就已预见到计算机建模和仿真技术对当代计算机将产生的意义深远的影响2.蒂姆·伯纳斯·李  1955-互联网之父蒂姆·伯纳斯·李是万维网的发明人,也是万维网联盟(World Wide Web Consortium)的发起人。1990年,他在日内瓦的欧洲粒子物理实验室里开发出了世界上第一个网页浏览器。3.罗伯特…

    2022年10月18日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号