kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
上一篇 2022年10月8日 下午4:46
下一篇 2022年10月8日 下午4:46


相关推荐

  • TFW格式简介

    TFW格式简介最近要用到 TFW 格式来自动配准 Tiff 图片 在网上找到了这个 就摘抄下来了 TFW 格式 TFW TIFFWorldFil 格式说明 TFW 文件是关于 TIFF 影像坐标信息的文本文件 ArcInfo Microstation AutoCAD 等均支持该格式的坐标信息文件 此文件定义了影像象素坐标与实际地理坐标的仿射关系 TFW 文件包含相关的 TIFF

    2026年3月19日
    2
  • 锋利的jquery ——学习笔记

    锋利的jquery ——学习笔记1 代码收集 在一个 id 为 table 的表格的 tbody 中 如果每行最后一列中的 checkbox 没有被禁用 则把这行的背景设置为红色 table gt tbody gt tr has td last has checkbox enabled css background red nbsp 先引入 jquery 1 5 1 min js 然后引入自

    2026年3月19日
    1
  • 获取和分析Dump的几种工具简介[通俗易懂]

    获取和分析Dump的几种工具简介[通俗易懂]最近在进一步学习support技能的时候,了解到分析Dump的重要性,经过学习,做一些笔记。一、什么是Dump文件。Dump文件时进程的内存镜像。可以把程序的执行状态保存到Dump文件中。Dump文件分为内核模式Dump和用户模式Dump。其中内核模式Dump是操作系统创建的崩溃转储,例如蓝屏Dump。而在我们调试或Troubleshooting过程中使用的Dump是用户模式Dump,又分为F…

    2022年10月2日
    5
  • CACL联赛第一轮参赛记录

    CACL联赛第一轮参赛记录我是重庆理工大学 Icode 社团成员 我们参加了 CACL 第一赛季的比赛 这是本赛季第一轮关于 波士顿房价预测 的比赛实验记录 这个比赛看似简单 但我们费了不少心血 最终拿到了第六名的成绩 只能寄望于第二轮的比赛拿到更好的排名啦 下面我将分享本次比赛涉及到的知识点 如有不对之处 欢迎指正 一 标准化与归一化归一化对数据的数值范围进行特定缩放 但不改变其数据分布的一种线性特征变换 标准化对

    2026年3月19日
    3
  • linux menuconfig搜索,linux系统menuconfig解析

    linux menuconfig搜索,linux系统menuconfig解析在对linux进行编译,常用的命令是makemenuconfig,使用图形界面来对整个系统进行裁剪;这里主要就makemenuconfig的执行过程进行解析。介绍跟makemenuconfig这个命令相关的文件,包括三类,包括.config,Kconfig,Makefile。为什么不说三个,而说三类呢?因为Kconfig和Makefile是配合使用的,在很多的子目录都存在,而.config只…

    2022年5月30日
    38
  • [HTML] 那个圣诞树,把代码分享给大家

    [HTML] 那个圣诞树,把代码分享给大家<!DOCTYPEHEMLPUBLIC><html><head><metacharset=”utf-8″><style>html,body{width:100%;height:100%;margin:0;padding:0;border:0;}div{margin:0;padding:0;border:0;}.nav{position:absolute;top:.

    2022年7月12日
    19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号