kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 消息中间件有哪些

    消息中间件有哪些1 学习中间件的方式和技巧理解中间件在项目架构中的作用 以及各中间件的底层实现 可以使用一些类比的生活概念去理解中间件 使用一些主流图或者脑图的方式去梳理各个中间件在架构中的作用 尝试使用 Java 技术去实现中间件 静下来去思考中间件在项目中设计和使用的原因 如何找到对应的替代方案 尝试编写博客总结类同中间件技术的对比和使用场景 学会查看中间件的源码以及开源项目和博客 2 消息中间件的应用场景跨系统数据传递 高并发的流量削峰 数据的分发和异步处理 大数据分析与传递 分布式事务比

    2026年2月3日
    2
  • 基于Redis6.2.6版本部署Redis Cluster集群

    基于Redis6.2.6版本部署Redis Cluster集群基于 Redis6 2 6 版本部署 RedisCluster 集群 1 Redis6 2 6 简介以及环境规划在 Redis6 x 版本中主要增加了多线程的新特性 多线性对于高并发场景是非常有必要的 Redis6 x 新特性如下 多线程 IO 重新设计了客户端缓存功能 RESP3 协议支持 SSLACL 权限控制提升了 RDB 日志加载速度发布官方的 Redis 集群代理模块 RedisCluster 集群原理可以查看之前发布的文章 环境规划 IP 主机名端口号

    2025年10月3日
    3
  • matlab interp1db,matlab – Matlab interp1图出现数据偏移 – 堆栈内存溢出

    matlab interp1db,matlab – Matlab interp1图出现数据偏移 – 堆栈内存溢出本质上,我正在尝试使用Matlab的interp1方法平滑图像分割中的轮廓线。不幸的是,interp1表现不佳,可能是因为我使用不正确。我的插值代码如下:y2=interp1(x,y,’nearest’);然后,我尝试将原始x值对y2以及原始函数作图(请参见附图)。plot(x,y2,’x’);我认为插值可以使原始函数平滑(在某种程度上可以做到),但是interp1方法似乎偏离…

    2022年6月7日
    30
  • Linux 于 shell 变数 $#,$@,$0,$1,$2 含义解释:

    Linux 于 shell 变数 $#,$@,$0,$1,$2 含义解释:

    2022年1月12日
    56
  • 递归和迭代

    递归和迭代一.递归(Recursion)1.递归:以相似的方式重复自身的过程2.递归在程序中表现为:在函数的定义中直接或间接调用函数自身3.递归和循环:(1)递归是有去(递去)有回(归来),因为存在终止

    2022年7月4日
    19

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号