springboot集成kafka(从部署到实践)

springboot集成kafka(从部署到实践)springboot 集成 kafkakafka 简介 kafka 安装部署 kafka 简介 Kafka 是最初由 Linkedin 公司开发 是一个分布式 分区的 多副本的 多订阅者 基于 zookeeper 协调的分布式日志系统 也可以当做 MQ 系统 常见可以用于 web nginx 日志 访问日志 消息服务等等 Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目 主要应用场景是 日志收集系统和消息系统 Kafka 主要设计目标如下 以时间复杂度为 O 1 的方式提供消息持久化能力 即使对 TB 级以上数

一、kafka简介

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展
  • Kafka将一个主题(topic)内容的消息拆分成多个分区(partition),分布式保存到不同节点,形成一个分布式的消息队列,提升系统的吞吐量。同时每个partition都可以指定副本数量,通过副本保证数据的可靠性。创建topic的时候需要指定分区和副本数量。
    在这里插入图片描述

二、kafka和zookeeper安装部署

1、单容器手动部署方案

  • 步骤1 下载镜像
//下载kafka和zookeeper的镜像 docker pull wurstmeister/kafka docker pull zookeeper 
  • 步骤2 新建容器通信网络(主要用于zookeeper和kafka之间的通信)
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network create kafka-network (新建网络) e714f435cb1260c2de0e842dfc902d61e6d0a0b15dc73362bee87f94e6 [root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network ls (查看已存在的网络列表) NETWORK ID NAME DRIVER SCOPE cb77844cbdf1 bridge bridge local c4df20e19860 compose-files_default bridge local 104a0c8d74f2 compose-files_edgex-network bridge local 24b1b6ec3161 harbor_harbor bridge local caee95f8bdda host host local e714f435cb12 kafka-network bridge local 039be458adff none null local [root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network (查看网络的详细信息) [ { "Name": "kafka-network", "Id": "e714f435cb1260c2de0e842dfc902d61e6d0a0b15dc73362bee87f94e6", "Created": "2020-09-01T21:08:55.+08:00", "Scope": "local", "Driver": "bridge", "EnableIPv6": false, "IPAM": { "Driver": "default", "Options": {}, "Config": [ { "Subnet": "172.26.0.0/16", "Gateway": "172.26.0.1" } ] }, "Internal": false, "Attachable": false, "Ingress": false, "ConfigFrom": { "Network": "" }, "ConfigOnly": false, "Containers": {}, (连接的容器为空) "Options": {}, "Labels": {} } ] [root@iZuf6heg0pec2tuaaw1ltvZ ~]# 
  • 步骤3 部署zookeeper容器
//使用上面创建的容器网络kafka-network,创建zookeeper容器 docker run --net=kafka-network --name nick_zookeeper -p 21810:2181 -d docker.io/zookeeper 
  • 步骤3 部署kafka容器
//同样使用上面创建的容器网络kafka-network,创建kafka容器 docker run --net=kafka-network --name nick_kafka -p 9092:9092 \ --link kafka-network \ -e KAFKA_ZOOKEEPER_CONNECT=172.26.0.2:2181 \ -e KAFKA_ADVERTISED_HOST_NAME=47.101.72.122 \ -e KAFKA_ADVERTISED_PORT=9092 \ -d wurstmeister/kafka 
"Networks": { "kafka-network": { ... ... "IPAddress": "172.26.0.2", ... ... } } 

再次,查看下kafka-network容器网络的Containers字段,zookeeper和kafka容器都已经加入 kafka-network网络了。

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network [ { "Name": "kafka-network", ... ... "Containers": { "cac4fd9ce8ff8f0236e7f6e613da9c819dc8aeade1d63596af02bb0": { "Name": "nick_zookeeper", "EndpointID": "0cf6a426f43bb4fb191c739afb02bb59cb5ada182e795df2", "MacAddress": "02:42:ac:1a:00:02", "IPv4Address": "172.26.0.2/16", "IPv6Address": "" }, "d28ffb2e72b36b1ad4c049bbf5256d578c4fe9d141da05fff3bdb": { "Name": "nick_kafka", "EndpointID": "552db13d5d82cb7e2fadc123e80063dc0cfb4bdde292d7faf051", "MacAddress": "02:42:ac:1a:00:03", "IPv4Address": "172.26.0.3/16", "IPv6Address": "" } }, "Options": {}, "Labels": {} } ] 
  • 步骤4 测试发送消息和接收消息
// 进入容器 docker exec -it nick_kafka bash // 进入kafka所在目录 cd /opt/kafka_2.13-2.6.0/bin/ // 启动消息发送方 ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic // 再开一个终端进入容器,启动消息接收方 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning 

进入kafka容器,发送消息

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash bash-4.4# cd /opt/kafka_2.13-2.6.0/bin bash-4.4# ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic >1 >2 >3 >4 >5 >6 

进入kafka容器,接收消息

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash bash-4.4# cd /opt/kafka_2.13-2.6.0/bin/ bash-4.4# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning 1 2 3 4 5 6 

上述关于kakfa和zookeeper容器的部署操作,属于单机版单个容器部署,对于初学者建议手动操作一下。

2、docker-compose容器编排工具部署

使用docker-compose容器编排工具,可一键式安装两个容器。

  • 步骤1 首先编写docker-compose.yml
version: '3' services: zookeeper: image: zookeeper container_name: nick_zookeeper ports: - "2181:2181" networks: - kafka-network kafka: image: wurstmeister/kafka container_name: nick_kafka depends_on: - zookeeper links: - zookeeper networks: - kafka-network ports: - "9092:9092" environment: KAFKA_BROKER_ID: 5 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock networks: kafka-network: driver: bridge 
  • 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-single.yml up -d Creating nick_zookeeper ... done Creating nick_kafka ... done Creating nick_kafka ... 
  • 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-single.yml ps Name Command State Ports -------------------------------------------------------------------------------------------------------------- nick_kafka start-kafka.sh Up 0.0.0.0:9092->9092/tcp nick_zookeeper /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp root@ubuntu:~# 

3、集群部署zookeeper和kafka

  • 步骤1 首先编写
    docker-compose.yml
version: '3' services: zkwt01: image: zookeeper restart: always container_name: zkwt01 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 #这里有个隐形的坑,需要在3888端口后面加上;2181,否则zookeeper无法对外提供服务,会导致Kafka无法连接上 networks: - kafka-network zkwt02: image: zookeeper restart: always container_name: zkwt02 ports: - "2182:2181" environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network zkwt03: image: zookeeper restart: always container_name: zkwt03 ports: - "2183:2181" environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network zkwt04: image: zookeeper restart: always container_name: zkwt04 ports: - "2184:2181" environment: ZOO_MY_ID: 4 PEER_TYPE: observer ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network kafkawt01: image: wurstmeister/kafka restart: always container_name: kafkawt01 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock kafkawt02: image: wurstmeister/kafka restart: always container_name: kafkawt02 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9093:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9093 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock kafkawt03: image: wurstmeister/kafka restart: always container_name: kafkawt03 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9094:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9094 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock networks: kafka-network: driver: bridge 
  • 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml up -d Creating network "root_kafka-network" with driver "bridge" Creating kafkawt01 ... done Creating zkwt02 ... Creating zkwt04 ... Creating zkwt01 ... Creating kafkawt03 ... Creating kafkawt02 ... Creating kafkawt01 ... 
  • 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml ps Name Command State Ports --------------------------------------------------------------------------------------------------------- kafkawt01 start-kafka.sh Up 0.0.0.0:9092->9092/tcp kafkawt02 start-kafka.sh Up 0.0.0.0:9093->9092/tcp kafkawt03 start-kafka.sh Up 0.0.0.0:9094->9092/tcp zkwt01 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt02 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2182->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt03 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2183->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt04 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2184->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp 

至此,zookeeper和kafka的安装部署完成。

三、springboot集成kafka

1、创建springboot项目(生产者/消费者)

  • 步骤1 创建一个空项目enpty project,
    在这里插入图片描述
  • 步骤2 创建new module,新建生产者springboot项目,
    右击上面创建的kafka-project–>New–>Module–>Spring Initializr–>Next
    在这里插入图片描述
    输入Group,模块名称,Java version,包路径等,如下图所示,
    在这里插入图片描述
    点击Next后,添加maven依赖,kafka的依赖关系,如下图所示,
    在这里插入图片描述
    点击Next–>Finish,完成生产者springboot项目创建。
    使用如上同样的操作步骤,创建消费者springboot项目
    在这里插入图片描述








2、实现生产者业务逻辑

  • 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。
    在这里插入图片描述
  • 步骤2 打开springboot-kafka-producter项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
# kafka配置生产者 begin # spring: kafka: bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094 producer: # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败, # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。 retries: 0 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 batch-size: 16384 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 buffer-memory:  #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下: #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 #可以设置的值为:all, -1, 0, 1 acks: 1 # 指定消息key和消息体的序列化编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # kafka配置生产者 end # 
  • 步骤3 创建初始化topic的配置类KafkaInitialConfiguration,在com.kafka.eason目录下面,增加config文件夹,新建KafkaInitialConfiguration类,
package com.kafka.eason.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaInitialConfiguration { // 创建一个名为testTopic的Topic并设置分区数partitions为8,分区副本数replication-factor为2 @Bean public NewTopic initialTopic() { System.out.println("begin to init initialTopic........................"); return new NewTopic("wtopic04",8, (short) 2 ); } // 如果要修改分区数,只需修改配置值重启项目即可 // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小 @Bean public NewTopic updateTopic() { System.out.println("begin to init updateTopic........................"); return new NewTopic("wtopic04",10, (short) 2 ); } } 
  • 步骤4 创建生产者业务逻辑controller类ProducterController,在com.kafka.eason目录下面,增加controller文件夹,新建ProducterController类,
package com.kafka.eason.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Controller; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; / * * * @author Lynch */ @Controller @RequestMapping("/api/kafka/") public class ProducterController { @Autowired private KafkaTemplate 
  
    kafkaTemplate; @GetMapping("send") @ResponseBody public boolean send(@RequestParam String message) { try { ListenableFuture 
   
     > future = kafkaTemplate.send("wtopic04", message); future.addCallback(new ListenableFutureCallback 
    
      >() { @Override public void onFailure(Throwable throwable) { System.err.println("wtopic04 - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult 
     
       stringObjectSendResult) { System.out.println("wtopic04 - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } catch (Exception e) { e.printStackTrace(); } return true; } @GetMapping("test") @ResponseBody public String test(){ System.out.println("hello world!"); return "ok"; } } 
      
     
    
  
  • 步骤5 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.kafka.eason.controller") @ComponentScan("com.kafka.eason.config") public class SpringbootKafkaProducterApplication { public static void main(String[] args) { SpringApplication.run(SpringbootKafkaProducterApplication.class, args); } } 

3、实现消费者业务逻辑

  • 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。
    在这里插入图片描述
  • 步骤2 打开springboot-kafka-consumer项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
server: port: 8081 # kafka配置消费者 start # #============== kafka =================== # 指定kafka server的地址,集群配多个,中间,逗号隔开 spring: kafka: bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094 consumer: # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 group-id: test # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest auto-offset-reset: earliest # enable.auto.commit:true --> 设置自动提交offset enable-auto-commit: true #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 auto-commit-interval: 1000 # 指定消息key和消息体的序列化编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # kafka配置消费者 end # 
  • 步骤3 创建消费者业务逻辑controller类ConsumerController,在com.kafka.eason目录下面,增加controller文件夹,新建ConsumerController类,
package com.kafka.eason.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; / * 消费者监听topic=testTopic的消息 * * @author Lynch */ @Component public class ConsumerController { @KafkaListener(topics = "wtopic04") public void onMessage(ConsumerRecord 
   record){ //insertIntoDb(buffer);//这里为插入数据库代码 //System.out.println("message: " + message); System.out.println("简单消费,record:"+record.topic()+"-"+record.partition()+"-"+record.value()); } } 
  • 步骤4 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.kafka.eason.controller") public class SpringbootKafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringbootKafkaConsumerApplication.class, args); } } 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/177453.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月26日 下午7:29
下一篇 2026年3月26日 下午7:29


相关推荐

  • VHDL快速入门

    VHDL快速入门写在前面 VHDL 是一门硬件语言 没学过硬件语言 挺感兴趣 还可以用在计组的实验中 花了点时间学习整理了一下 VHDL 的基本语法 方便查看 本 blog 所用到的所有图片都引用自 VHDL 语言的基本语法参考文档一 VHDL 语言的基本语法 1 VHDL 语言的表示符 2 VHDL 的数字 2 1 数字型文字 156E2 的意思是 156 times 10210 2102 下划线可以连接数字 2 2 数字基数表示的文字 2 3 字符串型文字 2 4 下标名及下标段名二 VHDL 语言的数据对象

    2026年3月20日
    3
  • “逃离”中国的Manus,可能白跑了

    “逃离”中国的Manus,可能白跑了

    2026年3月15日
    2
  • ai创意插件合集Astute Graphics

    ai创意插件合集Astute GraphicsAstutegraphics是一个AdobeIllustrator的创意插件合集,包含多种常见辅助功能,可以帮你提升平面与矢量设计的效率,AstuteGraphics最新版本也与时俱进的更新了支持CC2017的全系列插件,包含ColliderScribe,DynamicSketch,InkQuest,InkScribe,MirrorMe,Phantasm,Rasterino,VectorScribe,WidthScribe几大插件,插件支持Illustratorcs4/cs5/

    2022年5月30日
    58
  • 【AekdyCoin】求小于等于N的与N互质的数的和

    【AekdyCoin】求小于等于N的与N互质的数的和又向大牛学到了一点。以下内容转大牛文章:ifgcd(n,i)=1thengcd(n,n-i)=1(1反证法:如果存在K!=1使gcd(n,n-i)=k,那么(n-i)%k==0而n%k=0那么必须保证i%k=0k是n的因子,如果i%k=0那么gcd(n,i)=k,矛盾出现;于是问题变的非常简单ANS=N*phi(N)/2i,n-i总是成对

    2022年7月23日
    13
  • arping命令用法

    arping命令用法arping命令使用说明BusyBoxv1.17.3(2011-07-2017:01:30CST)multi-callbinary.Usage:arping[-fqbDUA][-cCNT][-wTIMEOUT][-IIFACE][-sSRC_IP]DST_IPSendARPrequests/repliesOptions: -f Quiton…

    2022年5月27日
    49
  • 设计模式(4)-对象创建型模式-Prototype模式

    设计模式(4)-对象创建型模式-Prototype模式

    2022年1月22日
    51

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号