1.kafka集群配置的话需要在本地host文件里配置
如下:
192.168.5.11 kafka1 192.168.5.12 kafka2 192.168.5.13 kafka3 192.168.5.14 kafka4 192.168.5.15 kafka5 192.168.5.16 kafka6
2.先引依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

这是对应的版本,千万不要引错版本,负责回报找不到类这种异常
3.在application.yml中进行配置kafka
kafka: bootstrap-servers: 192.168.5.11:9092,192.168.5.12:9092,192.168.5.13:9092,192.168.5.14:9092,192.168.5.15:9092 producer: retries: 0 batch-size: 16384 buffer-memory: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 group-id: test-group-id listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false
4.可配置kafkaconfig也可让springboot自动装配
@Configuration @EnableKafka public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.properties.linger.ms}") private int linger; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; @Bean public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfig()); } @Bean public Map<String, Object> producerConfig() {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory()); } @Bean//通过bean创建(bean的名字为initialTopic) public NewTopic initialTopic() {
return new NewTopic("test",3, (short) 1 ); } @Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>(); //配置Kafka实例的连接地址 //kafka的地址,不是zookeeper props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); KafkaAdmin admin = new KafkaAdmin(props); return admin; } @Bean //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic,用于集群环境,创建对个副本 public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig()); } }
5.注入kafkaTemplate,生产者发送消息
@Slf4j public class TestHandlerKafka {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate; private static final String TOPIC_NOTIFY = "test"; @Autowired private KafkaSendResultHandler producerListener; public ResponseStatusList producer() {
ResponseStatusList responseStatusList = new ResponseStatusList(); try {
//发送消息前配置回调 kafkaTemplate.setProducerListener(producerListener); String key = "test"; ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_NOTIFY, key, JSON.toJSONString(entity)); SendResult<String, Object> stringObjectSendResult = send.get(); log.info("发送消息成功,", stringObjectSendResult); responseStatusList.setLocalDate(new Date()); responseStatusList.setStatusCode("0"); responseStatusList.setStatusString("正常"); return responseStatusList; } catch (Exception e) {
log.error(Status.KAFKA_SEND_ERROR.getMsg(), e); responseStatusList.setLocalDate(new Date()); responseStatusList.setStatusCode("1"); responseStatusList.setStatusString("其他未知错误"); return responseStatusList; } } }
6.可进行简单的消费
@Component public class KafkaConsumer {
private static final String TOPIC_NOTIFY = "test"; // 消费监听 @KafkaListener(topics = {
TOPIC_NOTIFY}) public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容 System.err.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/227152.html原文链接:https://javaforall.net
