kafuka生产者和消费者及配置

kafuka生产者和消费者及配置#kafka生产者配置#kafka集群kafka.bootstrap.servers=ip:端口#发送端确认模式kafka.acks=all#发送失败重试次数kafka.retries=10#批处理条数kafka.batch.size=16384#延迟统一收集,产生聚合,然后批量发送kafka.linger.ms=100#批处理缓冲区kafka.buffer.memo…

大家好,又见面了,我是你们的朋友全栈君。

#kafka 生产者配置
#kafka 集群
kafka.bootstrap.servers=ip:端口
#发送端确认模式
kafka.acks=all
#发送失败重试次数
kafka.retries =10
#批处理条数
kafka.batch.size=16384
#延迟统一收集,产生聚合,然后批量发送
kafka.linger.ms=100
#批处理缓冲区
kafka.buffer.memory=33554432
#key 序列化
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value序列化
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#消费端 集群
kafka.bootstrap.servers=IP:端口
#一个用于跟踪调查的ID ,最好同group.id相同
kafka.client.id=MesSystem
#Consumer归属的组ID
kafka.group.id=debtorInfo
#限制每回返回的最大数据条数
kafka.max.poll.records=1000
#是否自动提交
kafka.enable.auto.commit=false
#自动提交的频率
kafka.auto.commit.interval.ms=1000
#会话的超时限制
kafka.session.timeout.ms=15000
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

//生产者
KafkaProducerUtils.send("topics", json.toString());//往kafka中存入消息
//KafkaProducerUtils 工具类
package com.tera.util;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

import java.util.List;
import java.util.Properties;

public class KafkaProducerUtils {
    //把KafkaProducer对象放到本地线程中
    private static ThreadLocal<KafkaProducer> local = new ThreadLocal<KafkaProducer>();
    private static Properties props;
    private static KafkaProducer<String, String> producer;
    static {

        props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertyUtil.getProperty("kafka.bootstrap.servers"));
        props.put(ProducerConfig.ACKS_CONFIG, PropertyUtil.getProperty("kafka.acks"));
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.retries")));
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.batch.size")));
        props.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.linger.ms")));
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(PropertyUtil.getProperty("kafka.buffer.memory")));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertyUtil.getProperty("kafka.key.serializer"));
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertyUtil.getProperty("kafka.value.serializer"));
        producer = new KafkaProducer<String, String>(props);
        
    }
    
    static class SendCallback implements Callback {
        ProducerRecord<String, String> record;
        int sendSeq = 0;

        public SendCallback(ProducerRecord record, int sendSeq) {
            this.record = record;
            this.sendSeq = sendSeq;
        }
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            //send success
            if (null == e) {
                String meta = "send----topic:" + recordMetadata.topic() + ", partition:"
                        + recordMetadata.topic() + ", offset:" + recordMetadata.offset();
                System.out.println("send message success, record:" + record.toString() + ", meta:" + meta);
                
                System.out.println("value==========="+record.value());
                return;
            }
            //send failed
            System.out.println("send message failed, seq:" + sendSeq + ", record:" + record.toString() + ", errmsg:" + e.getMessage());
           
        }
    }

    /**
     * 发送消息到kafka
     * @param topicName
     * @param key
     * @param value
     */
    public static void send(String topicName,String value) throws Exception {
        if(StringUtils.isNullOrEmpty(topicName)){
            throw new Exception("参数错误,topicName不能为空");
        }
//        RecordMetadata recordMetadata =  producer.send(new ProducerRecord<String, String>(topicName,null,value)).get();
//        System.out.println("topic---"+recordMetadata.topic()+"--hasTimestamp---"+recordMetadata.hasTimestamp()+"--hasOffset"+
//        		recordMetadata.hasOffset()+"--partition--"+recordMetadata.partition()+"---"+recordMetadata.serializedKeySize()+"--"+recordMetadata.serializedValueSize()
//        		+"-----all--"+recordMetadata.toString()
//        		);
        ProducerRecord record= new ProducerRecord<String, String>(topicName,null,value);
       producer.send(record,new SendCallback(record,0));
        producer.flush();
    }
    /**
     * 发送消息到kafka
     * @param topicName
     * @param key
     * @param value
     */
    public static void sendBatch(String topicName,List<String> list) throws Exception {
        if(StringUtils.isNullOrEmpty(topicName)){
            throw new Exception("参数错误,topicName不能为空");
        }
        if(list==null || list.size() ==0){
            throw new Exception("参数错误,list不能为空");
        }
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            for (String value : list){
                producer.send(new ProducerRecord<String, String>(topicName,null,value));
            }
            producer.close();


    }


    public static void main(String[] args) {
        KafkaProducerUtils kafkaProducerUtils = new KafkaProducerUtils();
        try {
            kafkaProducerUtils.send("withdrawaldev","123");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
//消费者
@Autowired
private DefaultKafkaConsumerFactory consumerFactory; 
Consumer consumer = consumerFactory.createConsumer();
consumer.subscribe(Arrays.asList("t_message_log"));
ConsumerRecords<Integer, String> records = null;
records = consumer.poll(100);
 for (ConsumerRecord<Integer, String> record : records) {	
		value = record.value();//数据
			        	
 }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136688.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • docker 查看redis 版本「建议收藏」

    dockerexec-itcontainer-name-or-idredis-server-v

    2022年4月17日
    106
  • pycharm社区版安装教程 2019_pycharm安装教程2020社区版

    pycharm社区版安装教程 2019_pycharm安装教程2020社区版首先进入JetBrain的官网(国内正常访问):https://www.jetbrains.com/第一眼看到的界面如下图所示:然后找到我们的Pycharm专题页:进入Pycharm的专题页面之后,点击下载按钮(这里有两个按钮,点任何一个都行):然后进入到真正的下载页面你会发现有两个版本的Pycharm,一个是Professional版本(收费),另外一个是Community版本是永久免费的,而且后续升级什么的也都是免费的,我们下载这个就行了,Comm…

    2022年8月28日
    1
  • android studio不能输入中文_Android模拟器

    android studio不能输入中文_Android模拟器很多Android项目的运行都需要用到中文输入法,在一年前的AndroidStudio需要导入输入法apk安装,现在新版的AndroidStudio免去了这个麻烦,下面就教大家如何设置谷歌拼音输入法。

    2022年8月30日
    1
  • Android hybrid_android混合开发

    Android hybrid_android混合开发关于混合开发常问道的问题:Android如何嵌套h5页面?h5一般调用哪些Android哪些接口功能?Android如何调用网页(js)功能?问题1.ndroid如何嵌套h5页面答案:当我们用vue开发完项目,执行nmprunbuild打包生产dist目录,如何嵌套在Android框架中创建网页存放文件夹,在Android工程res下面添加assets文件夹,把dist目录内容拷贝到assets下。找到Android项目中.xml布局文件,添加webview组件及设置web

    2022年9月22日
    0
  • Java类加载机制与Tomcat类加载器架构

    Java类加载机制与Tomcat类加载器架构Java类加载机制类加载器虚拟机设计团队把类加载阶段中的“通过一个类的全限定名来获取描述此类的二进制字节流”这个动作放到Java虚拟机外部去实现,以便让应用程序自己决定如何去获取所需要的类。实现这个动作的代码模块称为“类加载器”。类加载器可以说是Java语言的一项创新,也是Java语言流行的重要原因之一,它最初是为了满足JavaApplet的需求而开发出来的。虽然目前JavaA

    2022年5月20日
    36
  • matlab double类型数据_timestamp是什么数据类型

    matlab double类型数据_timestamp是什么数据类型matlab中读取图片后保存的数据是uint8类型(8位无符号整数,即1个字节),以此方式存储的图像称作8位图像,相比较matlab默认数据类型双精度浮点double(64位,8个字节)可以节省存储空间。详细来说imread把灰度图像存入一个8位矩阵,当为RGB图像时,就存入8位RGB矩阵中。例如,彩色图像像素大小是400*300(高*宽),则保存的数据矩阵为400*300*3,其中每个颜…

    2022年9月17日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号