环境
ZooKeeper、Kafka。
如果使用 docker-compose 部署 Kafka 集群,可以参考我的这篇文章 docker-compose安装Kafka集群
依赖
pom.xml 依赖文件如下:springboot 版本是 2.3.0.RELEASE 。
<dependencies> <dependency> <groupId>org.springframework.boot
groupId> <artifactId>spring-boot-starter
artifactId>
dependency>
<dependency> <groupId>org.springframework.kafka
groupId> <artifactId>spring-kafka
artifactId> <version>2.4.0.RELEASE
version>
dependency>
<dependency> <groupId>org.projectlombok
groupId> <artifactId>lombok
artifactId>
dependency>
<dependency> <groupId>org.springframework.boot
groupId> <artifactId>spring-boot-starter-test
artifactId> <scope>test
scope>
dependency>
dependencies>
配置
可以通过Java类配置和配置文件配置两种方式来配置 Kafka。
Java 类配置
配置主题(KafkaAdmin)
创建配置类 KafkaTopicConfiguration 代码如下:该配置可选,通常会事先通过 Kafka 提供的脚本创建主题。
/ * kafka 主题配置类 * * @author Leo * @create 2020/12/31 15:57 / @Configuration public class KafkaTopicConfiguration {
/ * 创建 KafkaAmin,可以自动检测集群中是否存在topic,不存在则创建 * @return */ @Bean public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); return new KafkaAdmin(props); } @Bean public NewTopic newTopic() {
// 创建 topic,指定 名称、分区数、副本数 return new NewTopic("hello-kafka-test-topic", 3, (short) 2); } }
配置生产者
创建配置类KafkaProducerConfiguration 代码如下:里面涉及基本配置和自定义分区器、拦截器、事务等配置。
/ * kafka 生产者配置类 * * @author Leo * @create 2020/12/31 15:09 / @Configuration public class KafkaProducerConfiguration {
/ * 不包含事务 producerFactory * @return */ public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>(); //kafka 集群地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); //重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 3); //应答级别 //acks=0 把消息发送到kafka就认为发送成功 //acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 //acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 props.put(ProducerConfig.ACKS_CONFIG, "all"); //KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); //批量处理的最大大小 单位 byte props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); //发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); //生产者可用缓冲区的最大值 单位 byte props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, ); //每条消息最大的大小 props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ); //客户端ID props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-kafka"); //Key 序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Value 序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //消息压缩:none、lz4、gzip、snappy,默认为 none。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); //自定义分区器 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); return new DefaultKafkaProducerFactory<>(props); } / * 包含事务 producerFactory * @return */ public ProducerFactory<String, String> producerFactoryWithTransaction() {
DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<String, String>) producerFactory(); //设置事务Id前缀 defaultKafkaProducerFactory.setTransactionIdPrefix("tx"); return defaultKafkaProducerFactory; } / * 不包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory()); } / * 包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplateWithTransaction") public KafkaTemplate<String, String> kafkaTemplateWithTransaction() {
return new KafkaTemplate<>(producerFactoryWithTransaction()); } / * 以该方式配置事务管理器:就不能以普通方式发送消息,只能通过 kafkaTemplate.executeInTransaction 或 * 在方法上加 @Transactional 注解来发送消息,否则报错 * @param producerFactory * @return */ // @Bean // public KafkaTransactionManager
kafkaTransactionManager(ProducerFactory
producerFactory) {
// return new KafkaTransactionManager<>(producerFactory); // } }
配置消费者
创建配置类KafkaConsumerConfiguration代码如下:
/ * kafka 消费者配置类 * * @author Leo * @create 2020/12/31 15:09 / @Slf4j @Configuration public class KafkaConsumerConfiguration {
@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //设置 consumerFactory factory.setConsumerFactory(consumerFactory()); //设置是否开启批量监听 factory.setBatchListener(false); //设置消费者组中的线程数量 factory.setConcurrency(1); return factory; } / * consumerFactory * @return */ public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>(); //kafka集群地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); //自动提交 offset 默认 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //自动提交的频率 单位 ms props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //批量消费最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); //session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ); //请求超时 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, ); //Key 反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //Value 反序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //当kafka中没有初始offset或offset超出范围时将自动重置offset //earliest:重置为分区中最小的offset //latest:重置为分区中最新的offset(消费分区中新产生的数据) //none:只要有一个分区不存在已提交的offset,就抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //设置Consumer拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName()); return new DefaultKafkaConsumerFactory<>(props); } / * 消费异常处理器 * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() {
return new ConsumerAwareListenerErrorHandler() {
@Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//打印消费异常的消息和异常信息 log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId()); return null; } }; } }
配置文件配置
以配置文件的方式来进行配置的话,所有配置都在application.yml中,配置内容如下:
spring: application: name: hello-kafka kafka: listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single # 集群地址 bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # 生产者配置 producer: # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: # 客户端ID client-id: hello-kafka # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip properties: partitioner: #指定自定义分区器 class: top.zysite.hello.kafka.partitioner.MyPartitioner linger: # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka ms: 1000 max: block: # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms ms: 6000 # 消费者配置 consumer: # 默认消费者组 group-id: testGroup # 自动提交 offset 默认 true enable-auto-commit: false # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest properties: interceptor: classes: top.zysite.hello.kafka.interceptor.MyConsumerInterceptor session: timeout: # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 ms: request: timeout: # 请求超时 ms:
这里采用Java类配置。在 SpringBoot 启动类上添加注解@EnableKafka启用KafkaTemplate:
@SpringBootApplication @EnableKafka public class HelloKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(HelloKafkaApplication.class, args); } }
自定义分区器
可以看到上述配置包含自定义分区器的配置,自定义分区器需实现Partitioner接口并在生产者端进行配置,代码如下:
/ * 自定义分区器 * * @author Leo * @create 2021/5/26 13:40 / public class MyPartitioner implements Partitioner {
/ * 分区策略核心方法 * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//具体分区逻辑,这里全部发送到0号分区 return 0; } @Override public void close() {
} @Override public void configure(Map<String, ?> configs) {
} }
配置了之后所有的消息都会发送到 0 号分区。(这里只是为了演示)
消费者拦截器
可以看到上述配置也包含了消费者拦截器的配置,消费者拦截器需实现ConsumerInterceptor接口,代码如下:
/ * 消费者拦截器 * * @author Leo * @create 2021/5/27 16:30 / @Slf4j public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
/ * KafkaConsumer 会在 poll 方法返回之前调用该方法,可以在该方法中对消息进行过滤 * @param records * @return */ @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println(" before interceptor: " + records.count() + ""); Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); //遍历每个topic、partition for (TopicPartition topicPartition : records.partitions()) {
//获取特定topic、partition下的消息列表 List<ConsumerRecord<String, String>> recordList = records.records(topicPartition); //过滤 List<ConsumerRecord<String, String>> filteredList = recordList.stream() .filter(record -> !record.value().contains("filter")).collect(Collectors.toList()); //放入新的消息记录里 newRecords.put(topicPartition, filteredList); } ConsumerRecords<String, String> filteredRecords = new ConsumerRecords<>(newRecords); System.out.println(" after interceptor: " + filteredRecords.count() + ""); //返回过滤后的消息记录 return filteredRecords; } / * 提交完offset之后调用该方法 * @param offsets */ @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (!offsets.isEmpty()) {
offsets.forEach(((topicPartition, offsetAndMetadata) -> {
log.info("partition : " + topicPartition + ", offset : " + offsetAndMetadata); })); } } @Override public void close() {
} @Override public void configure(Map<String, ?> configs) {
} }
有消费者拦截器,自然也有生产者拦截器,只需要实现ProducerInterceptor接口并在生产者端进行配置即可,这里只演示消费者拦截器。
测试
为了方便看到效果,这里统一测试生产者和消费者。
创建生产者服务类
创建生产消息服务类KafkaProducerService代码如下:
/ * kafka 生产服务 * * @author Leo * @create 2020/12/31 16:06 / @Slf4j @Service public class KafkaProducerService {
@Qualifier("kafkaTemplate") @Resource private KafkaTemplate<String, String> kafkaTemplate; @Qualifier("kafkaTemplateWithTransaction") @Resource private KafkaTemplate<String, String> kafkaTemplateWithTransaction; / * 发送消息(同步) * @param topic 主题 * @param key 键 * @param message 值 */ public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
//可以指定最长等待时间,也可以不指定 kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); //指定key,kafka根据key进行hash,决定存入哪个partition // kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS); //存入指定partition // kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS); } / * 发送消息并获取结果 * @param topic * @param message * @throws ExecutionException * @throws InterruptedException */ public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); log.info("The partition the message was sent to: " + result.getRecordMetadata().partition()); } / * 发送消息(异步) * @param topic 主题 * @param message 消息内容 */ public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); //添加回调 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override public void onFailure(Throwable throwable) {
log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message); } @Override public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("sendMessageAsync success! topic: {}, message: {}", topic, message); } }); } / * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送 * @param topic * @param key * @param message * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 组装消息 Message msg = MessageBuilder.withPayload(message) .setHeader(KafkaHeaders.MESSAGE_KEY, key) .setHeader(KafkaHeaders.TOPIC, topic) .setHeader(KafkaHeaders.PREFIX,"kafka_") .build(); //同步发送 kafkaTemplate.send(msg).get(); // 组装消息 // ProducerRecord
producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS); } / * 以事务方式发送消息 * @param topic * @param key * @param message */ public void sendMessageInTransaction(String topic, String key, String message) {
kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {
@Override public Object doInOperations(KafkaOperations<String, String> kafkaOperations) {
kafkaOperations.send(topic, key, message); //出现异常将会中断事务,消息不会发送出去 throw new RuntimeException("exception"); } }); } }
创建消费者服务类
创建消费消息服务类KafkaConsumerService代码如下:
/ * kafka 消费服务 * * @author Leo * @create 2020/12/31 16:06 / @Slf4j @Service public class KafkaConsumerService {
/ * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * @param message 消息 */ @KafkaListener(id = "consumerSingle", topics = "hello-kafka-test-topic") public void consumerSingle(String message) {
log.info("consumerSingle ====> message: {}", message); } /* @KafkaListener(id = "consumerBatch", topicPartitions = { @TopicPartition(topic = "hello-batch1", partitions = "0"), @TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4")) })*/ / * 批量消费消息 * @param messages */ @KafkaListener(id = "consumerBatch", topics = "hello-batch") public void consumerBatch(List<ConsumerRecord<String, String>> messages) {
log.info("consumerBatch =====> messageSize: {}", messages.size()); log.info(messages.toString()); } / * 指定消费异常处理器 * @param message */ @KafkaListener(id = "consumerException", topics = "hello-kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler") public void consumerException(String message) {
throw new RuntimeException("consumer exception"); } / * 验证ConsumerInterceptor * @param message */ @KafkaListener(id = "interceptor", topics = "consumer-interceptor") public void consumerInterceptor(String message) {
log.info("consumerInterceptor ====> message: {}", message); } }
创建 Junit 测试类
使用 Junit 整合 SpringBoot 来测试:
@RunWith(SpringRunner.class) @SpringBootTest class HelloKafkaApplicationTests {
@Resource private KafkaProducerService kafkaProducerService; @Test void testSendMessageSync() throws Exception {
String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "firstMessage"; kafkaProducerService.sendMessageSync(topic, key, message); } @Test public void testSendMessageGetResult() throws Exception {
String topic = "hello-kafka-test-topic"; String key = "key"; String message = "helloSendMessageGetResult"; kafkaProducerService.sendMessageGetResult(topic, key, message); kafkaProducerService.sendMessageGetResult(topic, null, message); } @Test public void testSendMessageAsync() {
String topic = "hello-kafka-test-topic"; String message = "firstAsyncMessage"; kafkaProducerService.sendMessageAsync(topic, message); } @Test public void testMessageBuilder() throws Exception {
String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "helloMessageBuilder"; kafkaProducerService.testMessageBuilder(topic, key, message); } / * 测试事务 */ @Test public void testSendMessageInTransaction() {
String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "helloSendMessageInTransaction"; kafkaProducerService.sendMessageInTransaction(topic, key, message); } / * 测试批量消费 * @throws Exception */ @Test public void testConsumerBatch() throws Exception {
//写入多条数据到批量topic:hello-batch String topic = "hello-batch"; for(int i = 0; i < 20; i++) {
kafkaProducerService.sendMessageSync(topic, null, "batchMessage" + i); } } / * 测试消费者拦截器 * @throws Exception */ @Test public void testConsumerInterceptor() throws Exception {
String topic = "consumer-interceptor"; for(int i = 0; i < 2; i++) {
kafkaProducerService.sendMessageSync(topic,null, "normalMessage" + i); } kafkaProducerService.sendMessageSync(topic, null, "filteredMessage"); kafkaProducerService.sendMessageSync(topic, null, "filterMessage"); } }
测试结果
先启动应用程序,然后依次执行 Junit 中的测试方法:
同步发送消息
执行方法testSendMessageSync,可以看到日志打印:

消费日志:

发送消息并获取消息发往的分区
执行方法testSendMessageGetResult,可以看到日志打印:

由于配置了自定义分区器,可以看到消息都发往了 0 号分区,如果没有配置自定义分区器,并且主题包含多个分区的话,正常情况下多条消息不会全部发往同一分区。(可以自行测试)
异步发送消息
异步发送消息无法通过 Junit 来测试,因为 Junit 方法执行完就结束了,没法看到成功或失败的回调打印。(可以通过System.in.read来阻塞,或则 Thread.sleep)。这里就不演示了。
MessageBuilder 只是以不同的形式来组装消息,可以自行测试。
以事务的形式发送消息
KafkaTemplate封装了方法executeInTransaction方法,可以让我们以事务的形式发送消息。
执行方法testSendMessageInTransaction,可以看到:

抛出了异常,通过异常说明和没有该消息的消费日志,可以证明该消息并没有发送到 Kafka。
测试批量消费
批量消费需要修改上述消费者端的配置:将配置类KafkaConsumerConfiguration中的配置稍作修改,往 setBatchListener 方法传入 true,表示开启批量监听。
@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //设置 consumerFactory factory.setConsumerFactory(consumerFactory()); //设置是否开启批量监听 factory.setBatchListener(true); //设置消费者组中的线程数量 factory.setConcurrency(1); return factory; }
先将批量消费监听器@KafkaListener注解注释掉并停止 SpringBoot 应用程序,再执行testConsumerBatch方法往主题中写入 20 条消息。
//@KafkaListener(id = "consumerBatch", topics = "hello-batch") public void consumerBatch(List<String> messages) {
log.info("consumerBatch =====> messageSize: {}", messages.size()); log.info(messages.toString()); }

写入消息之后启动 SpringBoot ,可以看到批量消费的日志打印:

消费者拦截器
将批量监听改回单条,即 setBatchListener 传入 false。执行方法testConsumerInterceptor,日志打印:

可以看到绿框中的消息可以被正常消费,红框中的消息被拦截器过滤了,没有消费,因为消息内容包含filter,这是前面自定义消费者拦截器的逻辑。
源码
源码地址:https://github.com/leo/hello-kafka
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/202067.html原文链接:https://javaforall.net
