kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • linux查看全部环境变量_centos7环境变量配置

    linux查看全部环境变量_centos7环境变量配置一、查看环境变量 $env二、查看PATH $echo$PATH三、修改PATH 在Linux里设置环境变量的方法(exportPATH)一般来说,配置交叉编译工具链的时候需要指定编译工具的路径,此时就需要设置环境变量。例如我的mips-linux-gcc编译器在“/opt/au1200_rm/build_tools/bin”目录下,build_tools就是

    2022年10月1日
    2
  • RabbitMQ入门:路由(Routing)

    在上一篇博客《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exchange建立

    2022年2月16日
    69
  • HTML+CSS代码橙色导航菜单

    效果预览:http://hovertree.com/code/texiao/ks63r6aq.htm1<!DOCTYPEhtml>2<htmlxmlns="ht

    2021年12月21日
    62
  • Vue 箭头函数

    Vue 箭头函数箭头函数1.1认识箭头函数传统定义函数:constaaa=function(parse){}对象字面量中定义函数:constobj={ bbb(parse){ }}Es6中箭头函数;constccc=()=>{}箭头函数的参数和返回值参数问题:放入两个参数:constobj=(num1,num2)=>{retu…

    2022年6月24日
    51
  • vue中如何关闭eslint「建议收藏」

    vue中如何关闭eslint「建议收藏」1.在目录中新建vue.config.js2.在新建文件中输入module.exports={lintOnSave:false}就可以关闭啦~

    2022年10月8日
    2
  • 锁定文件失败 未能启动虚拟机_win10无法分析无人应答文件

    锁定文件失败 未能启动虚拟机_win10无法分析无人应答文件首先看一下出现的错误:出现这个错误我也是纠结了好半天,试了网上的方法结果还是没有效果,比如下面的这个方法也不行,不知道是不是我机器的问题:后来误打误撞地把问题解决了,在创建虚拟机的最后一步将勾选的“创建后开启此虚拟机(P)”去的,即不勾选,创建完后再手动启动虚拟机,就可以了,如下图所示:…

    2025年11月14日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号