springboot集成kafka
一、kafka简介
- 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
- 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展
- Kafka将一个主题(topic)内容的消息拆分成多个分区(partition),分布式保存到不同节点,形成一个分布式的消息队列,提升系统的吞吐量。同时每个partition都可以指定副本数量,通过副本保证数据的可靠性。创建topic的时候需要指定分区和副本数量。

二、kafka和zookeeper安装部署
1、单容器手动部署方案
- 步骤1 下载镜像
//下载kafka和zookeeper的镜像 docker pull wurstmeister/kafka docker pull zookeeper
- 步骤2 新建容器通信网络(主要用于zookeeper和kafka之间的通信)
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network create kafka-network (新建网络) e714f435cb1260c2de0e842dfc902d61e6d0a0b15dc73362bee87f94e6 [root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network ls (查看已存在的网络列表) NETWORK ID NAME DRIVER SCOPE cb77844cbdf1 bridge bridge local c4df20e19860 compose-files_default bridge local 104a0c8d74f2 compose-files_edgex-network bridge local 24b1b6ec3161 harbor_harbor bridge local caee95f8bdda host host local e714f435cb12 kafka-network bridge local 039be458adff none null local [root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network (查看网络的详细信息) [ { "Name": "kafka-network", "Id": "e714f435cb1260c2de0e842dfc902d61e6d0a0b15dc73362bee87f94e6", "Created": "2020-09-01T21:08:55.+08:00", "Scope": "local", "Driver": "bridge", "EnableIPv6": false, "IPAM": { "Driver": "default", "Options": {}, "Config": [ { "Subnet": "172.26.0.0/16", "Gateway": "172.26.0.1" } ] }, "Internal": false, "Attachable": false, "Ingress": false, "ConfigFrom": { "Network": "" }, "ConfigOnly": false, "Containers": {}, (连接的容器为空) "Options": {}, "Labels": {} } ] [root@iZuf6heg0pec2tuaaw1ltvZ ~]#
- 步骤3 部署zookeeper容器
//使用上面创建的容器网络kafka-network,创建zookeeper容器 docker run --net=kafka-network --name nick_zookeeper -p 21810:2181 -d docker.io/zookeeper
- 步骤3 部署kafka容器
//同样使用上面创建的容器网络kafka-network,创建kafka容器 docker run --net=kafka-network --name nick_kafka -p 9092:9092 \ --link kafka-network \ -e KAFKA_ZOOKEEPER_CONNECT=172.26.0.2:2181 \ -e KAFKA_ADVERTISED_HOST_NAME=47.101.72.122 \ -e KAFKA_ADVERTISED_PORT=9092 \ -d wurstmeister/kafka
"Networks": { "kafka-network": { ... ... "IPAddress": "172.26.0.2", ... ... } }
再次,查看下kafka-network容器网络的Containers字段,zookeeper和kafka容器都已经加入 kafka-network网络了。
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network [ { "Name": "kafka-network", ... ... "Containers": { "cac4fd9ce8ff8f0236e7f6e613da9c819dc8aeade1d63596af02bb0": { "Name": "nick_zookeeper", "EndpointID": "0cf6a426f43bb4fb191c739afb02bb59cb5ada182e795df2", "MacAddress": "02:42:ac:1a:00:02", "IPv4Address": "172.26.0.2/16", "IPv6Address": "" }, "d28ffb2e72b36b1ad4c049bbf5256d578c4fe9d141da05fff3bdb": { "Name": "nick_kafka", "EndpointID": "552db13d5d82cb7e2fadc123e80063dc0cfb4bdde292d7faf051", "MacAddress": "02:42:ac:1a:00:03", "IPv4Address": "172.26.0.3/16", "IPv6Address": "" } }, "Options": {}, "Labels": {} } ]
- 步骤4 测试发送消息和接收消息
// 进入容器 docker exec -it nick_kafka bash // 进入kafka所在目录 cd /opt/kafka_2.13-2.6.0/bin/ // 启动消息发送方 ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic // 再开一个终端进入容器,启动消息接收方 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning
进入kafka容器,发送消息
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash bash-4.4# cd /opt/kafka_2.13-2.6.0/bin bash-4.4# ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic >1 >2 >3 >4 >5 >6
进入kafka容器,接收消息
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash bash-4.4# cd /opt/kafka_2.13-2.6.0/bin/ bash-4.4# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning 1 2 3 4 5 6
上述关于kakfa和zookeeper容器的部署操作,属于单机版单个容器部署,对于初学者建议手动操作一下。
2、docker-compose容器编排工具部署
使用docker-compose容器编排工具,可一键式安装两个容器。
- 步骤1 首先编写docker-compose.yml
version: '3' services: zookeeper: image: zookeeper container_name: nick_zookeeper ports: - "2181:2181" networks: - kafka-network kafka: image: wurstmeister/kafka container_name: nick_kafka depends_on: - zookeeper links: - zookeeper networks: - kafka-network ports: - "9092:9092" environment: KAFKA_BROKER_ID: 5 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock networks: kafka-network: driver: bridge
- 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-single.yml up -d Creating nick_zookeeper ... done Creating nick_kafka ... done Creating nick_kafka ...
- 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-single.yml ps Name Command State Ports -------------------------------------------------------------------------------------------------------------- nick_kafka start-kafka.sh Up 0.0.0.0:9092->9092/tcp nick_zookeeper /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp root@ubuntu:~#
3、集群部署zookeeper和kafka
- 步骤1 首先编写
docker-compose.yml
version: '3' services: zkwt01: image: zookeeper restart: always container_name: zkwt01 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 #这里有个隐形的坑,需要在3888端口后面加上;2181,否则zookeeper无法对外提供服务,会导致Kafka无法连接上 networks: - kafka-network zkwt02: image: zookeeper restart: always container_name: zkwt02 ports: - "2182:2181" environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network zkwt03: image: zookeeper restart: always container_name: zkwt03 ports: - "2183:2181" environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network zkwt04: image: zookeeper restart: always container_name: zkwt04 ports: - "2184:2181" environment: ZOO_MY_ID: 4 PEER_TYPE: observer ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 networks: - kafka-network kafkawt01: image: wurstmeister/kafka restart: always container_name: kafkawt01 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock kafkawt02: image: wurstmeister/kafka restart: always container_name: kafkawt02 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9093:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9093 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock kafkawt03: image: wurstmeister/kafka restart: always container_name: kafkawt03 depends_on: - zkwt01 - zkwt02 - zkwt03 - zkwt04 networks: - kafka-network ports: - "9094:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9094 #宿主机监听端口 volumes: - /var/run/docker.sock:/var/run/docker.sock networks: kafka-network: driver: bridge
- 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml up -d Creating network "root_kafka-network" with driver "bridge" Creating kafkawt01 ... done Creating zkwt02 ... Creating zkwt04 ... Creating zkwt01 ... Creating kafkawt03 ... Creating kafkawt02 ... Creating kafkawt01 ...
- 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml ps Name Command State Ports --------------------------------------------------------------------------------------------------------- kafkawt01 start-kafka.sh Up 0.0.0.0:9092->9092/tcp kafkawt02 start-kafka.sh Up 0.0.0.0:9093->9092/tcp kafkawt03 start-kafka.sh Up 0.0.0.0:9094->9092/tcp zkwt01 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt02 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2182->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt03 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2183->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp zkwt04 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2184->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
至此,zookeeper和kafka的安装部署完成。
三、springboot集成kafka
1、创建springboot项目(生产者/消费者)
- 步骤1 创建一个空项目enpty project,

- 步骤2 创建new module,新建生产者springboot项目,
右击上面创建的kafka-project–>New–>Module–>Spring Initializr–>Next

输入Group,模块名称,Java version,包路径等,如下图所示,

点击Next后,添加maven依赖,kafka的依赖关系,如下图所示,

点击Next–>Finish,完成生产者springboot项目创建。
使用如上同样的操作步骤,创建消费者springboot项目

2、实现生产者业务逻辑
- 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。

- 步骤2 打开springboot-kafka-producter项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
# kafka配置生产者 begin # spring: kafka: bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094 producer: # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败, # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。 retries: 0 # 每次批量发送消息的数量,produce积累到一定数据,一次发送 batch-size: 16384 # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 buffer-memory: #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下: #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 #可以设置的值为:all, -1, 0, 1 acks: 1 # 指定消息key和消息体的序列化编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer # kafka配置生产者 end #
- 步骤3 创建初始化topic的配置类KafkaInitialConfiguration,在com.kafka.eason目录下面,增加config文件夹,新建KafkaInitialConfiguration类,
package com.kafka.eason.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaInitialConfiguration { // 创建一个名为testTopic的Topic并设置分区数partitions为8,分区副本数replication-factor为2 @Bean public NewTopic initialTopic() { System.out.println("begin to init initialTopic........................"); return new NewTopic("wtopic04",8, (short) 2 ); } // 如果要修改分区数,只需修改配置值重启项目即可 // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小 @Bean public NewTopic updateTopic() { System.out.println("begin to init updateTopic........................"); return new NewTopic("wtopic04",10, (short) 2 ); } }
- 步骤4 创建生产者业务逻辑controller类ProducterController,在com.kafka.eason目录下面,增加controller文件夹,新建ProducterController类,
package com.kafka.eason.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Controller; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; / * * * @author Lynch */ @Controller @RequestMapping("/api/kafka/") public class ProducterController { @Autowired private KafkaTemplate
kafkaTemplate; @GetMapping("send") @ResponseBody public boolean send(@RequestParam String message) { try { ListenableFuture
> future = kafkaTemplate.send("wtopic04", message); future.addCallback(new ListenableFutureCallback
>() { @Override public void onFailure(Throwable throwable) { System.err.println("wtopic04 - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult
stringObjectSendResult) { System.out.println("wtopic04 - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } catch (Exception e) { e.printStackTrace(); } return true; } @GetMapping("test") @ResponseBody public String test(){ System.out.println("hello world!"); return "ok"; } }
- 步骤5 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.kafka.eason.controller") @ComponentScan("com.kafka.eason.config") public class SpringbootKafkaProducterApplication { public static void main(String[] args) { SpringApplication.run(SpringbootKafkaProducterApplication.class, args); } }
3、实现消费者业务逻辑
- 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。

- 步骤2 打开springboot-kafka-consumer项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
server: port: 8081 # kafka配置消费者 start # #============== kafka =================== # 指定kafka server的地址,集群配多个,中间,逗号隔开 spring: kafka: bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094 consumer: # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名 group-id: test # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest auto-offset-reset: earliest # enable.auto.commit:true --> 设置自动提交offset enable-auto-commit: true #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 auto-commit-interval: 1000 # 指定消息key和消息体的序列化编解码方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # kafka配置消费者 end #
- 步骤3 创建消费者业务逻辑controller类ConsumerController,在com.kafka.eason目录下面,增加controller文件夹,新建ConsumerController类,
package com.kafka.eason.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; / * 消费者监听topic=testTopic的消息 * * @author Lynch */ @Component public class ConsumerController { @KafkaListener(topics = "wtopic04") public void onMessage(ConsumerRecord
record){ //insertIntoDb(buffer);//这里为插入数据库代码 //System.out.println("message: " + message); System.out.println("简单消费,record:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }
- 步骤4 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan("com.kafka.eason.controller") public class SpringbootKafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringbootKafkaConsumerApplication.class, args); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/177453.html原文链接:https://javaforall.net
