– Start
点击此处观看本系列配套视频
package shangbo.kafka.example2; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class App { public static void main(String[] args) { // Producer 配置信息,应该配置在属性文件中 Properties props = new Properties(); //指定要连接的 broker,不需要列出所有的 broker,但建议至少列出2个,以防某个 broker 挂了 props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("enable.idempotence", "true"); // props.put("retries", 3); // 不能手动设置 retries,自动设置为 Integer.MAX_VALUE // props.put("acks", "1"); // 不能手动设置 acks,自动设置为 all // 创建 Producer Producer
producer = new KafkaProducer
(props); // 发送消息 producer.send(new ProducerRecord
("topic0", "message 4"), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception != null) { System.out.println("send message4 failed with " + exception.getMessage()); } else { // offset 是消息在 partition 中的编号,可以根据 offset 检索消息 System.out.println("message4 sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset()); } } }); // producer 需要关闭,放在 finally 里 producer.close(); } }
– 更多参见:Kafka 精萃
– 声 明:转载请注明出处
– Last Edited on 2018-06-13
– Written by ShangBo on 2018-06-13
– End
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/216740.html原文链接:https://javaforall.net
