kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 远程桌面由于以下原因之一无法连接到远程计算机

    远程桌面由于以下原因之一无法连接到远程计算机由于计算机与远程服务器之间设置端口防护,阻断了远程连接默认的3389端口,出现以下问题,需要修改远程连接端口:1、登录远程连接服务器,修改注册表。1):开始-&gt;附件-&gt;运行/或快捷键win+R组合键,win就是键盘上的windows系统图标键。找到运行对话框。2):在对话框中输入regedit命令,打开注册表编辑器3):按照…

    2022年5月14日
    135
  • 隐私政策_隐私政策一定要加同意吗

    隐私政策_隐私政策一定要加同意吗隐私政策

    2022年4月20日
    78
  • angularjs子组件向父组件传值_react子组件传值

    angularjs子组件向父组件传值_react子组件传值Angular父组件给子组件传值的方法与注意事项

    2022年9月4日
    3
  • RPM安装命令总结[通俗易懂]

    RPM安装命令总结[通俗易懂]在Linux操作系统下,几乎所有的软件均通过RPM进行安装、卸载及管理等操作。RPM的全称为RedhatPackageManager,是由Redhat公司提出的,用于管理Linux下软件包的软件。Linux安装时,除了几个核心模块以外,其余几乎所有的模块均通过RPM完成安装。RPM有五种操作模式,分别为:安装、卸载、升级、查询和验证。1)用RPM安装软件包,最简单的

    2022年6月12日
    29
  • SQL数据库字符串与时间相互转换「建议收藏」

    SQL数据库字符串与时间相互转换「建议收藏」SQL数据库字符串与时间相互转换,字符串转日期时间格式,日期时间格式转字符串格式。

    2022年6月2日
    36
  • c语言中strstr函数怎么实现_c语言strstr函数怎么写

    c语言中strstr函数怎么实现_c语言strstr函数怎么写首先,我们来看strstr函数的使用可见,strstr函数是用来查找字串的一个函数。因为字符串中有“cde”子串,所以代码运行起来结果就是”cdef”。现在,我们已经将strstr函数的作用大概了解了一下,下面开始函数的模拟实现。首先,在cplusplus中搜索strstr可以了解到她返回的类型及参数相关链接:strstr-C++Reference(cplusplus.com)接下来,我们分析实现思路直到s1到了下图的位置,两者相等当*s1==*s2时进入while循环,s1+

    2022年10月9日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号