springboot 整合kafka

springboot 整合kafka1 kafka 集群配置的话需要在本地 host 文件里配置如下 192 168 5 11kafka1192 168 5 12kafka2192 168 5 13kafka3192 168 5 14kafka4192 168 5 15kafka5192 168 5 16kafka62 先引依赖 dependency groupId org springframew kafka groupId dependency

1.kafka集群配置的话需要在本地host文件里配置
如下:

192.168.5.11 kafka1 192.168.5.12 kafka2 192.168.5.13 kafka3 192.168.5.14 kafka4 192.168.5.15 kafka5 192.168.5.16 kafka6 

2.先引依赖

 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 

在这里插入图片描述
这是对应的版本,千万不要引错版本,负责回报找不到类这种异常
3.在application.yml中进行配置kafka




kafka: bootstrap-servers: 192.168.5.11:9092,192.168.5.12:9092,192.168.5.13:9092,192.168.5.14:9092,192.168.5.15:9092 producer: retries: 0 batch-size: 16384 buffer-memory:  key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 group-id: test-group-id listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false 

4.可配置kafkaconfig也可让springboot自动装配

@Configuration @EnableKafka public class KafkaConfig { 
    @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.properties.linger.ms}") private int linger; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; @Bean public ProducerFactory<String, Object> producerFactory() { 
    return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public Map<String, Object> producerConfig() { 
    Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { 
    return new KafkaTemplate<String, Object>(producerFactory()); } @Bean//通过bean创建(bean的名字为initialTopic) public NewTopic initialTopic() { 
    return new NewTopic("test",3, (short) 1 ); } @Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic public KafkaAdmin kafkaAdmin() { 
    Map<String, Object> props = new HashMap<>(); //配置Kafka实例的连接地址 //kafka的地址,不是zookeeper props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); KafkaAdmin admin = new KafkaAdmin(props); return admin; } @Bean //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic,用于集群环境,创建对个副本 public AdminClient adminClient() { 
    return AdminClient.create(kafkaAdmin().getConfig()); } } 

5.注入kafkaTemplate,生产者发送消息

 @Slf4j public class TestHandlerKafka { 
    @Autowired private KafkaTemplate<String, Object> kafkaTemplate; private static final String TOPIC_NOTIFY = "test"; @Autowired private KafkaSendResultHandler producerListener; public ResponseStatusList producer() { 
    ResponseStatusList responseStatusList = new ResponseStatusList(); try { 
    //发送消息前配置回调 kafkaTemplate.setProducerListener(producerListener); String key = "test"; ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_NOTIFY, key, JSON.toJSONString(entity)); SendResult<String, Object> stringObjectSendResult = send.get(); log.info("发送消息成功,", stringObjectSendResult); responseStatusList.setLocalDate(new Date()); responseStatusList.setStatusCode("0"); responseStatusList.setStatusString("正常"); return responseStatusList; } catch (Exception e) { 
    log.error(Status.KAFKA_SEND_ERROR.getMsg(), e); responseStatusList.setLocalDate(new Date()); responseStatusList.setStatusCode("1"); responseStatusList.setStatusString("其他未知错误"); return responseStatusList; } } } 

6.可进行简单的消费

@Component public class KafkaConsumer { 
    private static final String TOPIC_NOTIFY = "test"; // 消费监听 @KafkaListener(topics = { 
   TOPIC_NOTIFY}) public void onMessage1(ConsumerRecord<?, ?> record){ 
    // 消费的哪个topic、partition的消息,打印出消息内容 System.err.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } } 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/227152.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午9:50
下一篇 2026年3月16日 下午9:50


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号