kafka多个分区一个消费_kafka集群节点挂掉

kafka多个分区一个消费_kafka集群节点挂掉之前的csdn找不回来了,决定重新注册一个。望支持~~~为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。画了个简图:/***@ClassName:RiskPartitioner*@authorDHing**/publicclassRiskPartitionerimpleme…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

之前的csdn找不回来了,决定重新注册一个。望支持~~~

为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。

 画了个简图:

kafka多个分区一个消费_kafka集群节点挂掉

/**  
    * @ClassName: RiskPartitioner  
    * @author DHing  
    * 
*/  
    
public class RiskPartitioner implements Partitioner {

	private Logger LOG =  LoggerFactory.getLogger(getClass());
	  
	    /* (非 Javadoc)  
	    *   
	    *   
	    * @param topic
	    * @param key
	    * @param keyBytes
	    * @param value
	    * @param valueBytes
	    * @param cluster
	    * @return  
	    * @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)  
	    *这个方法就决定了消息往哪个分区里面发送
              这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
	    */  
	    
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
		 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
	        int num = partitions.size();
	        int partNum = 0;
	        try {
	          //  partNum = Integer.parseInt((String) key);
	           partNum = new Random().nextInt(255);
	        } catch (Exception e) {
	            partNum = key.hashCode();
	        }
	    return Math.abs(partNum % num);
	}

	@Override
	public void close() {
		
	}

	@Override
	public void configure(Map<String, ?> configs) {
		
	}
}

我们定定义分区过后,需要加入到Config进行生效:

@Configuration
public class KafkaProducerConfig {
	@Value("${kafka.server.producer.urls}")
	private String urls;
	@Value("${kafka.server.producer.key}")
	private String key;
	@Value("${kafka.server.producer.value}")
	private String value;
	
	private String acks;
	
	private String retries;
	
	private String batchSize;
	
	private String 	partitioner;
	
	
	public Properties getProp(){
		 Properties props = new Properties();
	        props.put("bootstrap.servers", urls);
	        props.put("acks", "all");
	        props.put("retries", 0);
	        props.put("batch.size", 16384);
	        props.put("linger.ms", 1);
	        props.put("buffer.memory", 33554432);
	        //自定义分区类
	        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
	        props.put("key.serializer", key);
	        props.put("value.serializer", value);
			return props;
	}
	
	@Bean
	@Qualifier("kafkaProducer")
	public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
		Properties props = new Properties();
//		props.put("bootstrap.servers", urls);
//		props.put("key.serializer", key);
//		props.put("value.serializer", value);
	    props.put("bootstrap.servers", urls);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        //自定义分区类
        props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
        props.put("key.serializer", key);
        props.put("value.serializer", value);
		return new KafkaProducer<LongSerializer, StringSerializer>(props);
	}

}

亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)

 

之前的csdn找不回来了,决定重新注册一个。望支持~~~

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/184515.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 程序设计-寻找三数之和为零的三元组(Java)

    程序设计-寻找三数之和为零的三元组(Java)分享一个大牛的人工智能教程。零基础!通俗易懂!风趣幽默!希望你也加入到人工智能的队伍中来!请点击http://www.captainbed.netpackagelive.every.day.Programming;importjava.util.ArrayList;importjava.util.Arrays;/***给定一个包含n个整数的数组nums,判断nums中是否存在三个元素a、b、c,使得a+b+c=0。*找出所有满足条件且不重复的三元组。**@auth

    2022年6月21日
    22
  • datagrip怎么安装(dg专业版注册码)

    !!!重点先上!!!今天在尝试使用DataGrip,第一次下载了最新的2018版本,后来发现注册码无法获取到(呵呵)。。。又迫于急切需要,对于非数据库开发人员,我觉得2017.2版本足够了(因为它刚好可以通过Licenseserver注册),所以就选择了这个版本!对于需要尝新使用更高版本的话,那就忽略本文的教程哈~1、下载2017.2版本(链接)2、安装后正常启动,进入填写注册码页面时,…

    2022年4月14日
    455
  • 启动、关闭ubuntu Linux防火墙

    启动、关闭ubuntu Linux防火墙由于 LInux 原始的防火墙工具 iptables 过于繁琐 所以 ubuntu 默认提供了一个基于 iptable 之上的防火墙工具 ufw sudoufwstatu 检查防火墙的状态 sudoufwversi 防火墙版本 ubuntu 系统默认已安装 ufw 2 启用运行以上两条命令后 防火墙在系统启动时自动开启 关闭所有外部对本机的访问 但本机访问外部正常打开或关闭某个端口 例如 sudoufwallow 允许所有的外部 IP 访问本机的 25 tcp smtp 端口 sudo

    2025年10月27日
    4
  • javaweb-spring-51

    javaweb-spring-51

    2021年5月17日
    93
  • windows net 命令详解「建议收藏」

    windows net 命令详解「建议收藏」综合了WINDOWS98,WINDOWSWORKSTATION和WINDOWSSERVER三个操作系统关于NET命令的解释,希望可以全面一些。先说一些:(1)NET命令是一个命令行命令。(2)管理网络环境、服务、用户、登陆。。。。等本地信息(3)WIN98,WINWORKSTATION和WINNT都内置了NET命令。(4)但WIN98的NET命令和WO

    2022年5月28日
    39
  • SplitContainer容器控件左右Panel大小调整「建议收藏」

    SplitContainer容器控件左右Panel大小调整「建议收藏」1、新建一个Winform窗体,从上图中选择SplitContainer空间,拖拽到Form到上,如下图:2、你会发现,随便点击Panel1或者Panel2,会显示出粗框,但怎么调整两个Panel的大小呢?两个Panel之间的那条线,是选不中的,哈哈,不信可以试试。那么如何才能调整两个Form的大小呢?==》随便单击一个Panel,再按一下Esc,会出现下图:这时,

    2022年7月18日
    95

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号