初探Disruptor

初探DisruptorDisruptor 简介 Disruptor 是一个用于在线程间通信的高效低延时的开源框架 它被用在了 LMAX 系统中 这个系统是建立在 JVM 平台上 核心是一个业务逻辑处理器 官方号称它能够在一个线程里每秒处理 6 百万订单 Disruptor 的设计似乎和 BlockingQueu 相近 生产者 消费者 但是 Disruptor 能够在无锁的情况下实现 Queue 并发操作 也就是 Disruptor 实际上是

Disruptor简介

Disruptor 是一个用于在线程间通信的高效低延时的开源框架,它被用在了LMAX系统中。这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,官方号称它能够在一个线程里每秒处理6百万订单。
Disruptor的设计似乎和BlockingQueue相近(生产者—消费者),但是 Disruptor能够在无锁的情况下实现Queue并发操作,也就是 Disruptor实际上是非阻塞的。下图是官方给出的和ArrayBlockingQueue对比测试结果:
在这里插入图片描述




从上面可以明显看出: ArrayBlockingQueue的效率比Disruptor低很多。


Disruptor构成

RingBuffer

Producer

Producer即生产者,泛指调用 Disruptor 发布事件(把写入缓冲队列的一个元素定义为一个事件)的用户代码。

Consumer/EventProcessor

Consumer和EventProcessor是一个概念,新的版本中由EventProcessor概念替代了Consumer。

Sequence

Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。

生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。

说明:虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。

Sequence Barrier

用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系

在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

Wait Strategy

当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

  • BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
  • BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大。
  • SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
  • YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
  • PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大。

Event

在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

EventProcessor

EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。通过把EventProcessor提交到线程池来真正执行,有两类Processor。

其中一类消费者是BatchEvenProcessor。每个BatchEvenProcessor有一个Sequence,来记录自己消费RingBuffer中消息的情况。所以,一个消息必然会被每一个BatchEvenProcessor消费。

另一类消费者是WorkProcessor。每个WorkProcessor也有一个Sequence,多个WorkProcessor还共享一个Sequence用于互斥的访问RingBuffer。一个消息被一个WorkProcessor消费,就不会被共享一个Sequence的其他WorkProcessor消费。这个被WorkProcessor共享的Sequence相当于尾指针。

EventHandler

Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。开发者实现EventHandler,然后作为入参传递给EventProcessor的实例。

在这里插入图片描述

总上来看:Producer生产event数据,EventHandler作为消费者消费event并进行逻辑处理。消费消息的进度通过Sequence来控制。费者之间以及消费者和RingBuffer之间的依赖关系由SequenceBarrier来控制。


Disruptor入门Demo

Disruptor创建过程有四步:

  • 建立一个工厂Event类,用于创建Event类实例对象
  • 建立一个监听事件类,用于处理数据
  • 实例化Disruptor实例,配置相应的参数,编写Disruptor核心组件
  • 编写生产者组件,向Disruptor容器中投递数据

下面以一个具体例子说明:

Evevt类:商品信息

public class OrderEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } } 

工厂Event类

public class OrderEventFactory implements EventFactory 
      
        { public OrderEvent newInstance() { return new OrderEvent(); } } 
      

EventHandler类:

public class OrderEventHandler implements EventHandler 
       
         { public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception { System.out.println("消费的内容是:" + orderEvent.getValue()); } } 
       

Producer类:产生商品

public class OrderEventProducer { private RingBuffer 
        
          ringBuffer; public OrderEventProducer(RingBuffer 
         
           ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data){ //1 在生产者发送消息的时候, 首先从我们的ringBuffer里面 获取一个可用的序号 long sequence = ringBuffer.next(); //0 try { //2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); //3 进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4 提交发布操作 ringBuffer.publish(sequence); } } } 
          
        

执行的Main方法:

 public static void main(String[] args) { int ringBufferSize = 1024 * 1024; ExecutorService executor = Executors.newFixedThreadPool(4); / * 1 eventFactory: 消息(event)工厂对象 * 2 ringBufferSize: 容器的长度 * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler * 4 ProducerType: 单生产者 还是 多生产者 * 5 waitStrategy: 等待策略 */ //1. 实例化disruptor对象 Disruptor 
         
           disruptor = new Disruptor 
          
            ( new OrderEventFactory(), ringBufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy() ); //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系) disruptor.handleEventsWith(new OrderEventHandler()); //3. 启动disruptor disruptor.start(); //4. 获取实际存储数据的容器: RingBuffer RingBuffer 
           
             ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer data = ByteBuffer.allocate(8) ; for(long i=0;i<100;i++){ data.putLong(0,i); producer.sendData(data); } disruptor.shutdown(); executor.shutdown(); } 
            
           
         

参考文章:

  • LMAX-Exchange/disruptor
  • 每秒钟承载600万订单级别的无锁并行计算框架-Disruptor
  • Disruptor(无锁并发框架)-发布
  • 并发框架Disruptor场景应用
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226231.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午7:33
下一篇 2026年3月17日 上午7:33


相关推荐

  • 中间人攻击原理

    中间人攻击原理中间人攻击中间人攻击 Man in the MiddleAttack 简称 MITM 攻击 是一种 间接 的入侵攻击 这种攻击模式是通过各种技术手段将受入侵者控制的一台计算机虚拟放置在网络连接中的两台通信计算机之间 这台计算机就称为 中间人 中间人攻击常见的两种方法 ARP 欺骗 DNS 欺骗 1 DNS 欺骗目标将其 DNS 请求发送到攻击者这里 然后攻击

    2026年3月18日
    2
  • 认识Vue 的 export、export default、import

    认识Vue 的 export、export default、import首先要知道 export import exportdefaul 是什么 nbsp 作为 copy 砖家 具体概念我还真是迷糊 查阅资料 ES6 模块主要有两个功能 export 和 import export 用于对外输出本模块 一个文件可以理解为一个模块 变量的接口 import 用于在一个模块中加载另一个含有 export 接口的模块 也就是说使用 export 命令定义了模块的对外接口以后 其他 JS

    2025年8月14日
    7
  • openstack介绍_openstack开发

    openstack介绍_openstack开发什么是云计算最早提出来是亚马逊公司,发家是靠卖书,最后自己把自己卖书的业务移到互联网上,随着自己公司业务的增加,自己公司内部服务器就不够用了,慢慢就开始做虚拟化,做了虚拟化之后,随着公司组织架构的复

    2022年8月2日
    8
  • Spring @Conditional注解 详细讲解及示例

    Spring @Conditional注解 详细讲解及示例前言 Conditional 是 Spring4 新提供的注解 它的作用是按照一定的条件进行判断 满足条件给容器注册 bean Conditional 的定义 此注解可以标注在类和方法上 Target ElementType TYPE ElementType METHOD Retention RetentionPol RUNTIME Documentedpu

    2026年3月20日
    1
  • GBDT算法梳理_gbdt分类

    GBDT算法梳理_gbdt分类集成算法大致分为两类:Boosting(迭代)和Bagging(装袋)。在前面的博客中,有提到,存在强依赖关系、必须串行生成的序列化方法,代表算法是Boosting,不存在强依赖关系、可同时生成的并行化方法,代表算法有Bagging、RandomForest。其中Boosting集成算法的典型代表算法有Adaboost,GBDT(GradientBoostingDecisionTree),…

    2022年10月12日
    5
  • 看这一篇就够了!Claude Code 接入四大国产编程模型 DeepSeek、GLM、Qwen、Kimi 全指南

    看这一篇就够了!Claude Code 接入四大国产编程模型 DeepSeek、GLM、Qwen、Kimi 全指南

    2026年3月16日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号