[Pulsar系列] 10分钟学会Pulsar消息系统概念

[Pulsar系列] 10分钟学会Pulsar消息系统概念ApachePulsar 是一个支持多租户的 高性能的服务与服务之间消息通讯的解决方案 最初由雅虎开发 现在由 Apache 软件基金会管理 Pulsar 在 Yahoo 的生产环境运行了三年多 助力 Yahoo 的主要应用 如 YahooMail YahooFinance YahooSports Flickr Gemini 广告平台和 Yahoo 分布式键值存储系统 Sherpa Kafka 不够好 智联招聘基于 Pulsar 打造企业级事件中心 Pulsar 的主要特性如下 Pulsar 实例原生支持多集群

Apache Pulsar

Pulsar是一个支持多租户的、高性能的服务与服务之间消息通讯的解决方案,最初由雅虎开发,现在由Apache软件基金会管理。

Pulsar在Yahoo的生产环境运行了三年多,助力Yahoo的主要应用,如Yahoo Mail、Yahoo Finance、Yahoo Sports、Flickr、Gemini广告平台和Yahoo分布式键值存储系统Sherpa。

Kafka不够好,智联招聘基于Pulsar打造企业级事件中心。

Pulsar的主要特性如下:

  • Pulsar实例原生支持多集群,能无缝的基于地理位置进行跨集群备份
  • 非常低的消息发布和端到端的延迟
  • 无缝扩展到超过百万个topic
  • 支持Java,Go,Pytho和C++的客户端
  • Topic支持多种订阅模式: 独占(exclusive), 共享(shared)和灾备(failover)
  • 通过Apache BookKeeper提供的持久化消息存储机制保证消息的送达
  • serverless的轻量级计算框架Pulsar Functions提供了原生的流数据处理
  • serverless的连接器框架Pulsar IO构建于 Pulsar Functions之上,能够轻松的将数据从Pulsar中移入和移出
  • 当数据老化时,分层存储将数据从热存储卸载到冷存储(如S3和GCS)

消息队列的使用场景包括异步处理,应用解耦,流量削锋和消息通讯四个场景.

1. 消息系统概念

1.1 Messages

1.2 Producers

  • 同步发送:producer发送每条消息后会等待broker的确认,如果没有收到确认信息,producer会认为发送失败
  • 异步发送:Producer将会把消息放入blocking队列,然后马上返回。 然后客户端在后台将消息发送给broker。如果队列已满( 配置的最大数量),根据传入producer的参数,producer可能阻塞或者直接返回失败。

压缩:消息在发送过程中可以被压缩来节省带宽,pulsar支持LZ4,ZLIB,ZSTD,SNAPPY类型。
批处理:如果启用了批处理,生产者将在单个请求中发送批量消息。批处理大小由最大消息数和最大发布延迟决定。

1.3 Consumers

  • 同步接收:同步接收将会阻塞,直到消息可用
  • 异步接收:异步接收立即返回future值,例如java中的CompletableFuture,一旦新消息可用,它立即完成。

监听:客户端库为consumers提供listener的实现,例如Java客户端,提供MesssageListener接口,实现该接口,一旦接受到新的消息,received方法将被调用。

void received(Consumer<T> consumer,Message<T> msg); 

确认:当一个consumer 成功消费掉一条消息后,那么这个consumer会发送一个确认请求到broker,broker会丢弃这条消息,否则保存这条消息。

消息的确认可以一个接一个,也可以累积一起。 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次重发给那个消费者。




否定确认:当consumer 在一定时间内没有成功消费消息,而想再次消费该条消息,那么这个consumer可以发送一个否定确认到broker,然后broker重发这条消息。消息可以一条接一条的否定确认,也可以累积否定确认,这取决于消费订阅模式。在独占和灾备模式,消费者只能否定确认其接收的最后一条消息。在共享模式,消费者可以独立否定确认。
确认超时:当一条消息没有被成功消费,并且您想要触发broker自动重发消息时,您可以采用未确认消息自动重发机制。客户端将在整个AckTimeout时间范围内跟踪未确认的消息,并在指定确认超时时间自动向broker发送重发未确认的消息请求。

在确认超时之前使用否定确认。否定确认以更精确的方式控制单个消息的重发,并在消息处理时间超过确认超时时间后,避免无效的重发消息。

死信(Dead letter)topic:死信topic使您能够在消费者无法成功消费某些消息时消费新消息。在这种机制中,无法消费的消息存储在单独的topic,称为死信topic。您可以决定如何处理死信topic中的消息。
在Java客户端中,可以使用以下例子处理死信topic:

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) .build()) .subscribe(); 

死信topic依赖于消息的重发。您需要确认消息的重发方法:否定确认或确认超时。在确认超时之前使用否定确认。

目前,死信topic仅适用于共享模式。

1.4 Topics

和其他的发布订阅系统一样,Pulsar 中的 topic 是被命名的通道,用做从producer到 consumer传输消息。 Topic的名称是具有明确定义结构的URL:

{ 
   persistent|non-persistent}://tenant/namespace/topic 

persistent/non-persistent:topic的类型,包括持久化和非持久化(默认是持久类型)。topic指定持久化后,所有的消息会持久化到硬盘(这意味着多块硬盘,除非是单机模式的broker)。反之,非持久topic的数据不会存储到硬盘上。

tenant:topic在实例中的租户,租户对于Pulsar的多租户来说是必不可少的,可以分布在多个集群中。

namespace:Topic的管理单元,充当关联topic组的管理机制。 大多数的topic配置在namespace层面生效。 每个tenant可以有多个namespace。

topic:topic名称是自由定义的,在pulsar实例中无特殊意义。












1.4.1 namespace

命名空间是租户内部逻辑上的命名术语。 一个租户可以通过admin API创建多个命名空间。 例如,一个对接多个应用的租户,可以为每个应用创建不同的namespace。 Namespace使得程序可以以层级的方式创建和管理topic。 例如:”my-tenant/app1″ ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic。

1.4.2 订阅模型
1.4.2.1 Exclusive

Exclusive模式为默认订阅模式。

[Pulsar系列] 10分钟学会Pulsar消息系统概念

1.4.2.2 Failover
1.4.2.3 Shared

Shared模式的限制
有两点需注意,1、不保证消息顺序; 2、不能使用累计确认

Key_shared:
在Key-shared模式下,多个消费者可以关联到同一订阅。消息以分布式在消费者之间传递,具有相同key/orderingKey 的消息仅传递给一个消费者。无论消息被重发多少次,它都发给同一个消费者。当消费者连接或断开连接时,将导致某些消息的key的消费者变更。
[Pulsar系列] 10分钟学会Pulsar消息系统概念




该模式限制:消息必须指定key/orderingKey;不能使用累计确认;该模式目前是测试版,可以在broker.config禁用。

1.5 多topic订阅

当consumer订阅pulsar的topic时,它默认指定订阅了一个topic,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:

  • 通过最基础的正则表达式(regex),例如 persistent://public/default/finance-.*
  • 通过明确指定的topic列表

通过正则订阅多主题时,所有的主题必须在同一个namespace。

当订阅多主题时,Pulsar客户端会自动调用Pulsar的API来发现匹配表达式或者列表的所有topic,然后全部订阅。 如果此时有暂不存在的topic,那么一旦这些topic被创建,conusmer会自动订阅。

不能保证顺序性
当消费者订阅多topic时,Pulsar所提供对单一topic订阅的顺序保证,就hold不住了。 如果你在使用Pulsar的时候,遇到必须保证顺序的需求,强烈建议不要使用此特性。

下面是多主题订阅在java中的例子:

import java.util.regex.Pattern; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; PulsarClient pulsarClient = // 实例化pulsar客户端 // 订阅一个namespace下的所有topic Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*"); Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer() .topicsPattern(allTopicsInNamespace) .subscriptionName("subscription-1") .subscribe(); // 根据正则订阅一个namespace下的多个topic Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*"); Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer() .topicsPattern(someTopicsInNamespace) .subscriptionName("subscription-1") .subscribe(); 
1.6.1 路由模式

发布到分区主题时,必须指定路由模式。路由模式决定每个消息应该发布到哪个分区,即哪个内部主题。三种路由模式如下:

  • RoundRobinPartition:如果没有key,所有的消息通过轮询方式被路由到不同的分区,以达到最大吞吐量。请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果为message指定了key,分区的producer会把key做hash,然后分配消息到指定的分区。 这是默认的模式。
  • SinglePartition:如果没有key被提供,producer将会随机选择一个分区,把所有的消息发往该分区。 如果为message指定了key,分区的producer会把key做hash,然后分配消息到指定的分区。
  • CustomPartition:使用客制化消息路由实现,可以决定特定的消息进入指定的分区。 用户可以创建客制化的路由模式,通过使用 Java client ,实现MessageRouter接口。

1.7 顺序保证

  • 按key分区:所有拥有相同key的消息有序, 并且会被发送至相同的partition。使用SinglePartition或RoundRobinPartition模式, 每条消息都需要有key。
  • 按producer:来自于相同producer的消息有序,路由策略为SinglePartition, 且每条消息都没有key。
1.7.1 HashingScheme

1.8 非持久topic

non-persistent://tenant/namespace/topic 

非持久topic中,broker会立即发布消息给所有连接的订阅者,而不会在BookKeeper中存储。 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。但是同时,Pulsar的一些核心优势也丧失掉了。

非持久topic,消息数据仅存活在内存。 如果broker挂掉或者因其他情况不能从内存取到,你的消息数据就可能丢失。 只有在真的确信你的使用场景符合,并且你可以忍受时,才可去使用非持久topic。

默认非持久topic在broker上是开启的。 你可以通过broker的配置关闭。 你可以通过使用pulsar-admin-topics接口管理非持久topic。

1.8.1 性能

非持久消息通常比持久消息更快,因为broker无须持久化消息,当消息被分发给所有订阅者时,会立即发送ack给producer。 非持久topic让producer有更低的发布延迟。

1.8.2 客户端API
PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); String npTopic = "non-persistent://public/default/my-topic"; //这里表明是非持久化 String subscriptionName = "my-subscription-name"; Consumer<byte[]> consumer = client.newConsumer() .topic(npTopic) .subscriptionName(subscriptionName) .subscribe(); 

这里还有一个非持久topic的java producer例子:

Producer<byte[]> producer = client.newProducer() .topic(npTopic) .create(); 

1.9 消息保留和到期(retention and expiry)

Pulsar broker默认如下:

  • 立即删除所有已经被cunsumer确认过的的消息
  • 以消息backlog的形式,持久保存所有的未被确认消息

Pulsar有两个特性,让你可以覆盖上面的默认行为:

  • 消息存留让你可以保存consumer确认过的消息
  • 消息过期让你可以给未被确认的消息设置存活时长(TTL) 所有消息保留和到期都在namespace级别进行管理。有关操作方法,请参阅Message retention and expiry cookbook。
    下图说明了这两种概念:
    [Pulsar系列] 10分钟学会Pulsar消息系统概念
    图中第一个是消息存留,存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 没有留存规则的话,所有被确认的消息都会被删除。
    图中第二个是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)








1.10 重复数据消除(Message deduplication)

消息重复数据消除是在namespace级别处理的。

1.10.1 生产者幂等

消息去重的另外一种方法是确保每条消息仅生产一次。 这种方法通常被叫做生产者幂等。 这种方式的缺点是,把消息去重的工作推给了应用去做。 在Pulsar中,去重被broker处理的,这意味着你不需要修改你的客户端代码。 你只需要做一些管理上的变化(参考Managing message deduplication )。

1.10.2 去重和实际一次语义

消息去重,使Pulsar成为与流处理引擎(SPE)或者其他寻求实际一次处理语义的系统连接的完美消息系统。 消息系统若不提供自动消息去重,则需要SPE或者其他系统保证去重。这意味着严格的消息顺序来自于让程序承担额外的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/215218.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午2:25
下一篇 2026年3月18日 下午2:25


相关推荐

  • pycharm如何全局进行查找一个关键词

    pycharm如何全局进行查找一个关键词PyCharm的FindinPath功能提供了全局查找功能,快捷键为Ctrl+Shift+F。Find则是在当前文件查找,快捷键为Ctrl+F。这两个个功能非常实用。FindinPath的使用:按快捷键Ctrl+Shift+F或从从菜单Edit-》Find-》FindinPath进入全局查找界面。如下图所示,在Texttofind输入要查找的内…

    2022年8月27日
    8
  • socket原理讲解_电感器的作用及原理

    socket原理讲解_电感器的作用及原理1.网络中进程之间如何通信进程通信的概念最初来源于单机系统。由于每个进程都在自己的地址范围内运行,为保证两个相互通信的进程之间既互不干扰又协调一致工作,操作系统为进程通信提供了相应设施,如UNIXBSD有:管道(pipe)、命名管道(namedpipe)软中断信号(signal)UNIXsystemV有:消息(message)、共享存储区(sharedmemory)和信号量(semaphore)等.他们都仅限于用在本机进程之间通信。网间进程通信要解决的是不同主机进程间的相互

    2022年10月10日
    4
  • Nutch 使用总结

    Nutch 使用总结Nutch nbsp 目录结构 nbsp 在 bin 文件夹下存放的是用于命令行运行的文件 Nutch 的配置文件都放在了 conf 下 lib 是一些运行所需要的 jar 文件 plugins 下存放的相应的插件 在 src 文件夹中的是 Nutch 的所有源文件 webapps 文件夹中存放的是 web 运行相关文件 nutch 0 9 war 是 nbsp Nutch 所提供的基于 Tomcat 的应用程序包 1 nbsp 将起始 nbsp URL nbsp 集合注入到 nbsp Nut

    2026年3月18日
    2
  • 宝塔面板部署 OpenClaw(Clawdbot)实战指南:从零到云端 AI 助理

    宝塔面板部署 OpenClaw(Clawdbot)实战指南:从零到云端 AI 助理

    2026年3月13日
    3
  • Hadoop操作HDFS命令「建议收藏」

    Hadoop操作HDFS命令「建议收藏」Hadoop操作HDFS命令如下所示: hadoopfs 查看HadoopHDFS支持的所有命令 hadoopfs–ls 列出目录及文件信息 hadoopfs–lsr 循环列出目录、子目录及文件信息 hadoopfs–puttest.txt/user/sunlightcs 将本地文件系统的test.txt复制到HDFS文件系统的/

    2022年10月4日
    4
  • IDEA左侧的project目录中,看不到项目的文件结构图,项目目录不见了

    IDEA左侧的project目录中,看不到项目的文件结构图,项目目录不见了

    2021年9月30日
    144

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号