kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
上一篇 2022年10月8日 下午4:46
下一篇 2022年10月8日 下午4:46


相关推荐

  • Windows系统好用的文本编辑器[通俗易懂]

    Windows系统好用的文本编辑器[通俗易懂]文章目录Windows系统好用的文本编辑器建议notepad3notepad++EmEditorsublimetext3vscodeWindows系统好用的文本编辑器win上默认的编辑器中文名叫[记事本],英文是[notepad],功能很弱…不知道为什么有的人说功能强大,明明撤回功能都只能返回一步的。目前win上有很多好用的编辑器,我只是用了几种,推荐顺便分析一下.链接:https://pan.baidu.com/s/1ZTfl6JtU11_1zuBnspoqYQ提取码:kpth通用优

    2022年5月4日
    582
  • JMM内存模型介绍「建议收藏」

    JMM内存模型介绍「建议收藏」一、JMM的定义1.什么是JMM《Java虚拟机规范》中曾试图定义一种“Java内存模型”(JavaMemoryModel简称JMM)来屏蔽各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。Java内存模型是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。JMM是围绕原子性,有序性、可见性展开。2.主内存与工作内存Java内存模型的主要目的是定义程

    2025年7月14日
    6
  • sqlserver语句创建表格_创建表的sql语句外键

    sqlserver语句创建表格_创建表的sql语句外键今天介绍一下如何使用SQLServer语句创建表并添加数据首先先了解一下表的模式,在数据库中根据模式进行分组避免表名称的冲突在SQLServer2014中直接新建表是默认的前缀dbo而命名其他的模式需要使用SQLServer语句进行创建下面将一步一步的进行演示,首先是创建一个数据库然后创建模式在后面使用根据创建的模式或者使用默认的模式名,进行创建表,语句如下图下面解释一下句子的意思看一下新建好的表后面介绍如何在新表里面添加数据…

    2022年8月31日
    4
  • 智谱开源GLM-4.5工具调用超越Claude Opus 4.1,成本仅1.4%

    智谱开源GLM-4.5工具调用超越Claude Opus 4.1,成本仅1.4%

    2026年3月12日
    2
  • pycharm激活码2021 3月最新注册码

    pycharm激活码2021 3月最新注册码,https://javaforall.net/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

    2022年3月15日
    54
  • Kimi-Audio:月之暗面开源的高性能语音处理工具包

    Kimi-Audio:月之暗面开源的高性能语音处理工具包

    2026年3月12日
    1

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号