Pulsar整理

Pulsar整理Pulsar 整理一 Pulsar 简述 1 1 定义 ApachePulsar 是一个分布式 高性能的服务器到服务器的消息解决方案 ApachePulsar 是最初在 Yahoo 创建的开源分布式 pub sub 消息传递系统 现已成为 ApacheSoftwa 的一部分 1 2Pulsar 基础架构概述 Plusar 包含以下几种组件 如下图 架构之间的协作 如下图 1

Pulsar整理

一,Pulsar简述

1.1 定义

Apache Pulsar是一个分布式、高性能的服务器到服务器的消息解决方案。Apache Pulsar是最初在Yahoo创建的开源分布式pub-sub消息传递系统,现已成为Apache Software Foundation的一部分。

1.2 Pulsar基础架构概述

Plusar包含以下几种组件,如下图:

在这里插入图片描述

架构之间的协作,如下图:

在这里插入图片描述

1.3 特点

Pulsar之所以能够称为下一代消息队列,主要是因为以下特性:

  • 线性扩展,能够扩容到成百上千个节点,扩展性强
  • 高吞吐,已经在Yahoo的生产环境中经受了考验,每秒数百万消息
  • 低延迟,在大规模的消息量下依然可以保持低延迟(<5ms)
  • 持久化机制,Pulsar的持久化机制构建在 Apache BookKeeper之上,提供了写与读之间的IO隔离
  • 基于地理位置的复制, 用户只需要配置好可用区,消息就会源源不断的复制到其他可用区,当某一个可用区挂掉或者发生网络分区,pulsar会在之后不断的重试
  • 部署方式的多样化, 既可以运行在裸机,也支持目前例如Docker,K8S的一些容器化方案以及不同的云厂商。
  • Topic支持多种消费模式, exclusive, shared, failover

二,Pulsar安装

单机版安装官网地址: http://pulsar.apache.org/docs/en/standalone/#start-pulsar-standalone

下面介绍的是集群安装

2.1 下载和安装Pulsar

2.1.1 下载方式

1)https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz

2)使用wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz

需要在群集中的每台节点上安装Pulsar,包括运行ZooKeeper和BookKeeper的计算机。

2.1.2 解压缩
tar -xvzf apache-pulsar-2.3.0-bin.tar.gz cd apache-pulsar-2.3.0 

目录包含以下子目录:

目录 包含
bin Pulsar的命令行工具,如pulsar和pulsar-admin
conf Pulsar的配置文件,包括broker配置,ZooKeeper配置等
data ZooKeeper和BookKeeper使用的数据存储目录。
lib Pulsar使用的JAR文件。
logs 安装创建的日志。

2.2 部署Zookeeper集群

参考: https://blog.csdn.net/weixin_/article/details/

启动Zk

./zkServer.sh start 

2.3 初始化集群元数据

初始化群集元数据命令如下:

bin/pulsar initialize-cluster-metadata \ --cluster pulsar-cluster-1 \ --zookeeper hadoop101:2181 \ --configuration-store hadoop101:2181 \ --web-service-url http://hadoop101:8080 \ --web-service-url-tls https://hadoop101:8443 \ --broker-service-url pulsar://hadoop101:6650 \ --broker-service-url-tls pulsar+ssl://hadoop101:6651 

配置解释:

–cluster 集群名称
–zookeeper 集群的本地ZooKeeper连接字符串,指定一个节点地址即可
–configuration-store 整个实例的配置存储连接字符串,指定一个节点地址即可
–web-service-url 集群的Web服务URL
–broker-service-url 代理服务URL,可与集群中的代理进行交互
  • --*-tls 仅当你在实例中使用TLS身份验证时,才需要使用标志。

2.4 部署BookKeeper集群

2.4.1 配置Bookkeeper bookies

1)修改配置文件conf/bookkeeper.conf

zkServers=hadoop101:2181, hadoop102:2181, hadoop103:2181 extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent 
2.4.2 启动bookies
  1. 后台启动:
bin/pulsar-daemon start bookie 
  1. 前台启动:
bin/bookkeeper bookie 
  1. 验证bookie是否正常工作:
bin/bookkeeper shell bookiesanity 

2.5 部署Pulsar broker

2.5.1 配置broker

1)修改配置文件conf/broker.conf

​ 确保zookeeperServers和configurationStoreServers参数。

zookeeperServers= hadoop101:2181, hadoop102:2181, hadoop103:2181 configurationStoreServers= hadoop101:2181, hadoop102:2181, hadoop103:2181 

2)您还需要指定群集名称(与初始化群集元数据时提供的名称相匹配)

clusterName=pulsar-cluster-1 
2.5.2 启动broker

1)前台启动broker:

bin/pulsar broker 

2)后台启动broker:

bin/pulsar-daemon start broker 

2.6 连接到正在运行的集群

修改conf/client.conf

webServiceUrl=http://hadoop101:8080/ brokerServiceurl=pulsar://hadoop101:6650/ 

2.7 使用shell命令行操作

2.7.1 生产数据

1)向test主题生产数据,命令如下:

bin/pulsar-client produce \ persistent://public/default/test \ -n 1 \ -m "Hello Pulsar" 

2)shell命令解析:

bin/pulsar-client produce topic options 
参数 描述 默认值
-f--files 以逗号分隔的文件路径;必须指定-m或-f
-m--messages 要发送的以逗号分隔的消息字符串;必须指定-m或-f
-n--num-produce 发送消息的次数;消息/文件* num-produce的计数应低于1000 1
-r--rate 生成的速率(以每秒消息数为单位);值0表示尽可能快地生成消息 0.0
2.7.2 消费数据

1)消费test主题的数据,命令如下:

bin/pulsar-client consume \ persistent://public/default/test \ -n 100 \ -s "consumer-test" \ -t "Exclusive" 

2)shell命令解析:

bin/pulsar-client consume topic options 
参数 描述 默认
--hex 以十六进制格式显示二进制消息。 false
-n--num-messages 要消耗的消息数量,0表示永久消耗。 0
-r--rate 速率(以每秒消息数为单位)消耗; 值0表示尽可能快地使用消息 0.0
-s--subscription-name 订阅名称
-t--subscription-type 订阅的类型。可能的值:独占,共享,故障转移。 独占

上面的消息成功发布到主题后,看到如下消息为生产消费成功:

----- got message ----- Hello Pulsar 

三,Plusar架构深入

  • Topic(主题)、Subscription(订阅)和Cursors(游标)的基本概念

在这里插入图片描述

3.1 Topic的分区以及分区策略

1)消息存储在topic中。逻辑上topic是一个日志结构,每个消息都在这个日志结构中有一个偏移量。

2)Producer将消息发送到一个指定的topic。

3)Consumer通过订阅来消费topic中的消息。

4)分区:Pulsar将一个主题的数据分布到多台机器上,保证高吞吐量。默认情况下,Pulsar的主题是不进行分区的,但通过命令行工具或API可以很容易的创建分区主题,并指定分区的数量。如下图:

在这里插入图片描述

5)分区策略:单个分区,轮询分区,哈希分区,自定义分区,帮助我们更好的跨分区,跨消费者分布数据。如下图:

在这里插入图片描述

3.2 Subscription(订阅)消费方式:

  • Exclusive(独享):一个订阅只能有一个消息者消费消息

在这里插入图片描述

  • Shared(共享):一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息

在这里插入图片描述

  • Fail-Over(灾备):一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。

在这里插入图片描述

3.3 偏移量:Cursors(游标)

Cursor是日志中的当前偏移量。Subscription将其Cursor存储至Bookkeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。(Kafka在0.8版本之前是将消费进度存储到ZK中的,0.8版本之后就将消费进度存储到了Kafka的Topic中, Plusar将消费进度也存储到了BK的ledger中。)

在这里插入图片描述

3.4 Ledger 和 Fragment

Ledger本身就是一个日志。条目(消息或者一组消息)追加到Ledgers,Ledgers追加到一个Topic,一个Topic实际上是一组ledgers流。

Ledger一旦关闭是不可变的,它是最小的删除单元,也就是说我们不能删除单个条目而是去删除整个Ledger。

Fragment是集群中最小的分布单元,被分解为多个。

在这里插入图片描述

四,Pulsar和Kafka的区别

对比 Pulsar Kafka
主题可扩展性 轻松处理百万个主题,随意添加更多Bookies,无需rebalance 不擅长处理大量主题,Broker是绑定存储状态的,集群扩展时必须做rebalance
性能 分层架构,IO隔离读写分离,支持 non-persistent,更高的吞吐量 服务和数据绑定,持久主题
消费模式 流(Stream)模式 :独占和故障切换订阅方式 队列(Queue)模式:共享订阅的方式 主要集中在流(Stream)模式,对单个partition是独占消费。无队列模式
操作更简单 使用专门的Cursor管理,并存在Ledger中 使用偏移量Offset,存在zk中或者_consumer_topic中
消费重复消息删除 可设置启动消息重复删除 无,需要自己逻辑实现
无限的数据保留期 消息只有被所有订阅消费后才会删除,不会丢失数据,但允许设置保留期,保留被消费的数据。支持TTL(TIme To Live) 根据设置的保留期来删除消息,有可能消息未被消费,过期后就删除了。不支持TTL。
  • 对比总结:

Apache Pulsar将高性能的流(Apache Kafka所追求的)和灵活的传统队列(RabbitMQ所追求的)结合到一个统一的消息模型和API中。 Pulsar使用统一的API为用户提供一个支持流和队列的系统,且具有同样的高性能。

五,Java APi

  • 添加依赖
 <dependency> <groupId>org.apache.pulsar 
     groupId> <artifactId>pulsar-client 
      artifactId> <version>2.4.1 
       version>  
        dependency> 

5.1 客户端实例

package com.wcb; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; public class Client { 
    private PulsarClient client; public Client() throws PulsarClientException { 
    client = PulsarClient.builder();//创建客户端 .serviceUrl("pulsar://hadoop101:6650") .build(); } public void Close() throws PulsarClientException { 
    client.close(); } public PulsarClient getPulsarClient(){ 
    return client; } } 

5.2 producer

  • topic2的主题生产数据
package com.wcb; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import java.util.concurrent.TimeUnit; public class PulProducer { 
    private Client client; private Producer<byte[]> producer; public PulProducer(String topic) throws PulsarClientException { 
    client = new Client();//创建pulsar客户端 producer = createProducer(topic);//创建生产者 } private Producer<byte[]> createProducer(String topic) throws PulsarClientException { 
    return client.getPulsarClient().newProducer() .topic(topic) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); } //异步发送数据 public void sendMessage(String message) { 
    producer.sendAsync(message.getBytes()).thenAccept(msgId -> { 
    System.out.printf("Message with ID %s successfully sent", msgId); }); } public void sendOnce(String message) { 
    / * 发送一次就关闭 */ try { 
    producer.send(message.getBytes()); System.out.printf("Message with content %s successfully sent", message); producer.close(); client.Close(); } catch (PulsarClientException e) { 
    // TODO Auto-generated catch block e.printStackTrace(); } } // todo add exceptionally(). public void close(Producer<byte[]> producer){ 
    producer.closeAsync() .thenRun(() -> System.out.println("Producer closed")); } public static void main(String[] args) throws PulsarClientException { 
    PulProducer producer = new PulProducer("topic2"); // producer.sendMessage("Hello World ,lalla"); producer.sendOnce("Hello World ,pulsar"); } } 

5.3 Consumer(消费一次就关闭)

  • 消费主题为topic2的数据(获取一次,就关闭会话
package com.wcb; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class PulConsumer { 
    private Client client; private Consumer consumer; public PulConsumer(String topic, String subscription) throws PulsarClientException { 
    client = new Client();//创建pulsar客户端 consumer = createConsumer(topic, subscription);//创建消费者 } //使用独占方式消费数据 private Consumer createConsumer(String topic, String subscription) throws PulsarClientException { 
    return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe(); } public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException { 
    /* * 获取一次,就关闭会话 */ // Wait for a message System.out.printf("Start pulsar"); CompletableFuture<Message> msg = consumer.receiveAsync(); // System.out.printf("Message received: %s", new String(msg.get().getData())); String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData()); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); consumer.close(); client.Close(); return result; } public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException { 
    PulConsumer consumer = new PulConsumer("topic2", "my-sub"); String reString = consumer.getMessage(); System.out.println(reString); } } 

5.4 Consumer

  • 消费主题为topic2的数据(持续消费)
package com.wcb; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class PulConsumerAll { 
    private Client client; private Consumer consumer; public PulConsumerAll(String topic, String subscription) throws PulsarClientException { 
    client = new Client();//获取客户端 consumer = createConsumer(topic, subscription);//创建消费者 } private Consumer createConsumer(String topic, String subscription) throws PulsarClientException { 
    return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe(); } public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException { 
    /* * 用来异步获取,保持回话 */ do { 
    // Wait for a message CompletableFuture<Message> msg = consumer.receiveAsync(); System.out.printf("Message received: %s", new String(msg.get().getData())); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledge(msg.get()); } while (true); } public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException { 
    PulConsumerAll consumer = new PulConsumerAll("topic2", "my-sub"); consumer.receiveMessage(); } } 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/206974.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午3:04
下一篇 2026年3月19日 下午3:04


相关推荐

  • JVM垃圾回收算法与参数配置

    JVM垃圾回收算法与参数配置引用计数法这是个古老而经典的垃圾收集算法 其核心就是在对象被其他所引用时计数器 1 而当引用失效时 1 但是这种方式有非常严重的问题 无法处理循环引用的情况 还有就是每次进行加减操作比较浪费系统性能 标记清除法分为标记和清除两个阶段进行处理内存中的对象 当然这种方式也有非常大的弊端 就是空间碎片问题 垃圾回收后的空间不连续 不连续的内存空间工作效率低于连续的内存空间 复制算法 java

    2025年9月28日
    5
  • 金现代 (300830.SZ)

    金现代 (300830.SZ)

    2026年3月12日
    3
  • JavaScript之正则表达式的使用方法详细介绍[通俗易懂]

    JavaScript之正则表达式的使用方法详细介绍[通俗易懂]首先必须说明的是,这类文章(js正则表达式)在c站或者整个it类论坛是很多人写过的,而我认为我这篇的不同之处在于更加“小白”化,这也与我一贯的风格有关吧。关于JavaScript正则表达式,其他的文章大多一上来就太过激进,不利于初学者学习(我当粗就是这么被劝退的),这也是我为什么要坚持写这篇文章,希望小白在看了这篇文章后,不管能不能完全掌握JavaScript正则表达式,但至少对JavaScript正则表达式能有一个比较深刻的印象吧。

    2025年8月5日
    7
  • 关于Openclaw的一些见解

    关于Openclaw的一些见解

    2026年3月15日
    2
  • java 线程dump分析_java获取线程dump文件

    java 线程dump分析_java获取线程dump文件Java线程dump分析,可适用IBM的工具:IBMThreadandMonitorDumpAnalyzerforJava使用方法:java-jarjca*.jar。可加必要的VM参数,如:-Xms128m-Xmx128mhttps://www.ibm.com/support/pages/ibm-thread-and-monitor-dump-analyzer-java-tmda…

    2026年4月16日
    4
  • Zookeeper 分布式锁 – 图解 – 秒懂

    Zookeeper 分布式锁 – 图解 – 秒懂疯狂创客圈Java分布式聊天室【亿级流量】实战系列之-26【博客园总入口】文章目录写在前面1.1.分布式锁简介1.1.1.图解:公平锁和可重入锁模型1.1.2.图解:zookeeper分布式锁的原理1.1.3.分布式锁的基本流程1.1.4.加锁的实现1.1.5.释放锁的实现1.1.1.分布式锁的应用场景写在最后疯狂创客圈亿级流量高并发IM实战系…

    2025年8月30日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号