8.解析Kafka中的 Topic 和 Partition

8.解析Kafka中的 Topic 和 Partition目录 1 什么是 Topic2 什么是 Partition3 Topic 和 Partition 的存储 4 producer 消息分发策略 5 消费者如何消费指定分区消息 1 什么是 TopicKafka 和 ActiveMQ 一样 都是非常优秀的消息订阅 发送的中间件 在 ActiveMQ 中 我们知道它有 Queue 和 Topic 的概念 但是在 Kafk

目录

1.什么是Topic

2.什么是Partition

3.Consumer Group 消费者组

4.Topic 和 Partition 的存储

5.producer消息分发策略

6.消费者如何消费指定分区消息   


        topic 是逻辑上的概念,而
partition
是物理上的概念,每个
partition
对应于一个
log
文件,该 log
文件中存储的就是
producer
生产的数据

1.什么是Topic

        Kafka 和 ActiveMQ 一样,都是非常优秀的消息订阅/发送的中间件。在 ActiveMQ 中,我们知道它有 Queue 和 Topic 的概念,但是在 Kafka 中,只有 Topic 这一个概念(Kafka 消费端通过 group.id 属性可以实现 ActiveMQ 中 Queue 的功能,参见图1)

        在 Kafka 中,Topic 是一个存储消息的逻辑概念,可以理解为是一个消息的集合。每条发送到 Kafka 集群的消息都会自带一个类别,表明要将消息发送到哪个 Topic 上。在存储方面,不同的 Topic 的消息是分开存储的,每个 Topic 可以有多个生产者向他发送消息,也可以有多个消费者去消费同一个Topic中的消息(参见图2

8.解析Kafka中的 Topic 和 Partition

补充:        

        此处Queue涉及到一个消费者组(Consumer Group)的概念。(上图groupid=1处的说明,有点小问题,这个消费者组 groupid 参考本文 3.Consumer Group 消费者组)


8.解析Kafka中的 Topic 和 Partition

2.什么是Partition

        Partition,在 Kafka 中是分区的意思。分区,提高了Kafka的并发,也解决了Topic中数据的负载均衡。即:Kafka 中每个 Topic 可以划分多个分区(每个 Topic 至少有一个分区),同一个 Topic 下的不同分区包含的消息是不同的(分区可以间接理解成数据库的分表操作)。

        每个消息在被添加到分区的时候,都会被分配一个 offset (偏移量),它是消息在当前分区中的唯一编号。Kafka 通过 offset 可以保证消息在分区中的顺序性,但是跨分区是无序的,即 Kafka 只保证在同一个分区内的消息是有序的。

        如下图,我们通过命令(命令如下↓↓↓)创建一个名为 test 的 Topic,并对其进行分区,设置 3 个分区,分别是 test-0、test-1、test-2。每一条消息发送到 broker 的时候,会根据 Partition 的分区规则计算,然后选择将该消息存储到哪一个 Partition。如果 Partition 规则设置合理,那么所有的消息都会均匀的分布在不同的 Partition 中,这样就类似于数据库的分库分表的概念,将数据做了分片处理操作。

8.解析Kafka中的 Topic 和 Partition

问题1:此时你可能会有疑惑,为什么第一个producer会将消息写入 test-0,以此类推,此处涉及到5.producer消息分发策略。请继续往后看。  

创建 Topic 命令如下:

        bin/kafka-topics.sh –create –zookeeper 192.168.204.201:2181,192.168.204.202:2181,192.168.204.203:2181 replication-factor 1 –partitions 3 –topic test

备注:bin/kafka-topics.sh –create   —->kafka自带命令  –create表示创建 topic

          zookeeper xxx.xxx.xxx.xxx:2181  —->zookeeper 集群地址

          replication-factor 1   —->备份数(1个备份)

          –partitions 3   —->kafka分区数(表示分了3个分区)

          –topic test  —->要创建的 topic 的名称

3.Consumer Group 消费者组

        消费者组,由多个 consumer
组成。
消费者组内每个消费者负
责消费不同分区的数据,一个分区只能由一个组内的某一个消费者消费;消费者组之间互不影响。
所有的消费者都属于某个消费者组,即
消费者组是逻辑上的一个订阅者
        比如一个 topic,有2个分区 partition0、partition1。有一个消费者组,组内有2个消费者 customer0、customer1。消费者组中的customer0 和 customer1 只能
【各自】
消费该topic中某个分区的数据,比如customer0消费partition0,customer1消费partition1。
        如果没有消费者组的概念,该topic有2个分区 partition0、partition1,只有一个消费者 customer0,那么 partition0 和 partition1 两个分区的数据都需要customer0 来消费。
消费者组的好处,可以提高消费能力!!!

4.Topic 和 Partition 的存储

本实例,是以192.168.204.201、192.168.204.202、192.168.204.203三台服务器搭建成的Kafka集群,来做介绍的

如下图,表示名称为 test 的 topic已经创建完成。那么 Partition 是如何存储的呢??

8.解析Kafka中的 Topic 和 Partition

        Partition 是以文件的形式存储在文件系统中,如上创建了一个名为 test 的topic,我们定义其有 3 个 partition,既然 partition 是以文件的形式存储,那么这 3 个 partition 在哪里存储着呢?

        我们可以在 kafka 的数据目录(/tmp/kafka-log)下找到,此目录可自行配置。在 /tmp/kafka-log 目录下,我们会看到有 3 个目录:test-0、test-1、test-2。命名规则是 topic_namepartition_id。所在目录如下图所示:

8.解析Kafka中的 Topic 和 Partition

 问题2:此时你可能会有疑惑,为什么 3个分区会随机分配到3台服务器,此时会涉及到多个分区在集群中的分配策略。那么多个分区如何在集群中做到合理的分配?

   答:(1)将所有 N 个Broker 和 i 个 Partition 排序(本例中 N = 3,i = 3)

          (2)将第 i 个 Partition 分配到 ( i % n)个 Broker 上。(这样 test-1 就分配到第一台了,以此类推)

5.producer消息分发策略

       消息是 Kafka 中最基本的数据单元。在 Kafka 中,一条消息由 keyvalue 两部分组成,key 和 value 值都可以为空。

       这里的 key 有什么用呢?当我们在发送一条消息时,我们可以指定这个 key ,那么 producer 则会根据 key 和 partition 机制,来判断当前这条消息应该发送并存储到哪个 partition 中。(此时问题1便得到了解决

       如果 Kafka 中的 key 为 null 该怎么办?默认情况下,Kafka 采用的是 hash 取模的分区算法。如果 key 为 null 的话,则会随机的分配一个分区。这个随机是在这个参数 “metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区,这个值默认情况下是 10 分钟更新一次。

       此外,Kafka 也为我们提供了自定义消息分发策略的入口,我们可以根据自身业务的情况,来自定义消息分发策略。那么如何来实现我们自己的分区策略呢?我们只需要定义一个类,实现 Partitioner 接口,重写它的 partition 方法即可。然后在配置 kafka 的时候,设置使用我们自定义的消息分发策略即可。如何自定义消息分发策略,请参照 4.1 自定义消息分发策略Demo

   5.1 自定义消息分发策略Demo

/ * 1.自定义分区策略 */ public class MyPartition implements Partitioner { Random random = new Random(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取分区列表 List 
   
     partitionInfos = cluster.partitionsForTopic(topic); int partitionNum = 0; if(key == null){ partitionNum = random.nextInt(partitionInfos.size());//随机分区 } else { partitionNum = Math.abs((key.hashCode())/partitionInfos.size()); } System.out.println("当前Key:" + key + "-----> 当前Value:" + value + "----->" + "当前存储分区:" + partitionNum); return partitionNum; } public void close() { } public void configure(Map 
    
      map) { } } 
     
   
/ * SpringBoot 下,添加如下partitioner.class 属性,指定使用自定义MyPartition类即可 */ spring: kafka: properties: partitioner.class: com.report.kafka.partition.MyPartition / * Spring使用 xml 或 注解形式,配置如下属性即可 */ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.report.kafka.partition.MyPartition");

6.消费者如何消费指定分区消息   

       此时,名称为 test 的 topic 有 3 个分区,分别为0、1、2,如果我们想消费分区0中的消息,该如何消费呢?使用Java操作kafka 有  spring-kafka.jar 和 kafka-clients.jar 两种方式。如下对这两种方式分别作了介绍,便可以完成对指定分区消息的消费。

/ * 1.使用 spring-kafka.jar包中的 KafkaTemplate 类型 * 使用 @KafkaListener 注解方式 * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 */ @KafkaListener(topicPartitions = {@TopicPartition(topic = "test",partitions = {"1"})}) / * 2.使用kafka-clients.jar包中的 KafkaConsumer 类型 * 如下:说明消费的是名称为test的topic下,分区 1 中的消息 */ TopicPartition topicPartition = new TopicPartition("test" , 1); KafkaConsumer consumer = new KafkaConsumer(props); consumer.assign(Arrays.asList(topicPartition));

到此处,Topic 和 Partition 的基本使用就介绍完了

如果本文对你有所帮助,那就给我点个赞呗 ^_^

End

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220129.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午9:10
下一篇 2026年3月17日 下午9:11


相关推荐

  • 安卓数据转移到iphone老是中断_安卓换iPhone数据怎么转移?这款神器一键搞定「建议收藏」

    安卓数据转移到iphone老是中断_安卓换iPhone数据怎么转移?这款神器一键搞定「建议收藏」每天12:18准时给大家惊喜!大家好!我是好奇仔,热衷于搜罗和分享各种好用、实用的软件神器和资源,有手机软件、办公软件、APP,还有网站资源……来自:PConline,作者:我爱我家换新手机了,内心当然是巨爽无比了!可是换机时有个步骤却让人觉得有点麻烦,那就是如何将旧手机的资料转移到新手机里去,安卓与iOS又如何互通呢?以前的操作得先将数据导出到电脑,然后可能还需要进行数据格式转换,接…

    2022年5月26日
    193
  • Java中字符串indexof() 的使用方法

    Java中字符串indexof() 的使用方法nbsp Java 中字符串中子串的查找共有四种方法 indexof indexOf 方法返回一个整数值 指出 String 对象内子字符串的开始位置 如果没有找到子字符串 则返回 1 如果 startindex 是负数 则 startindex 被当作零 如果它比最大的字符位置索引还大 则它被当作最大的可能索引 Java 中字符串中子串的查找共有四种方法 如下 1 intindexOf S

    2026年3月19日
    2
  • jqGrid基本用法与示例「建议收藏」

    jqGrid基本用法与示例「建议收藏」转自:https://chuanlu.iteye.com/blog/1953544一、jqGrid的基本用法1、html页面Html代码<!DOCTYPE html 

    2022年7月3日
    59
  • Docker镜像的导入导出

    Docker镜像的导入导出Docker 镜像的导入导出本文介绍 Docker 镜像的导入导出 用于迁移 备份 升级等场景 准备环境如下 CentOS7 0Docker1 18 导入导出命令介绍涉及的命令有 export import save loadsave 命令 dockersave options images images 示例 dockersave

    2026年3月20日
    1
  • UML教程_css餐厅游戏答案

    UML教程_css餐厅游戏答案 UML教程

    2025年8月6日
    5
  • Druid连接池的意义以及使用

    Druid连接池的意义以及使用建立数据库连接耗时耗费资源,一个数据库服务器能够同时建立的连接数也是有限的,在大型的Web应用中,可能同时会有成百上千的访问数据库的请求,如果Web应用程序为每一个客户请求分配一个数据库连接,将导致性能的急剧下降。数据库连接池的意义在于,能够重复利用数据库连接(有点类似线程池的部分意义),提高对请求的响应时间和服务器的性能。连接池中提前预先建立了多个数据库连接对象,然后将连接对象保存到连接池中…

    2022年7月23日
    8

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号