无锁队列Disruptor

无锁队列Disruptor1 何为队列听到队列相信大家对其并不陌生 在我们现实生活中队列随处可见 去超市结账 你会看见大家都会一排排的站得好好的 等待结账 为什么要站得一排排的 你想象一下大家都没有素质 一窝蜂的上去结账 不仅让这个超市崩溃 还会容易造成各种踩踏事件 当然这些事其实在我们现实中也是会经常发生 当然在计算机世界中 队列是属于一种数据结构 队列采用的 FIFO firstinfirst 新元素 等待进入队列的元素 总是被插入到尾部 而读取的时候总是从头部开始读取 在计算中队列一般用来做排队 如线程池的等待排

1.何为队列

听到队列相信大家对其并不陌生,在我们现实生活中队列随处可见,去超市结账,你会看见大家都会一排排的站得好好的,等待结账,为什么要站得一排排的,你想象一下大家都没有素质,一窝蜂的上去结账,不仅让这个超市崩溃,还会容易造成各种踩踏事件,当然这些事其实在我们现实中也是会经常发生。

当然在计算机世界中,队列是属于一种数据结构,队列采用的FIFO(first in firstout),新元素(等待进入队列的元素)总是被插入到尾部,而读取的时候总是从头部开始读取。在计算中队列一般用来做排队(如线程池的等待排队,锁的等待排队),用来做解耦(生产者消费者模式),异步等等。

2.jdk中的队列

当然ArrayBlockingQueue,也有自己的弊端,就是性能比较低,为什么jdk会增加一些无锁的队列,其实就是为了增加性能,很苦恼,又需要无锁,又需要有界,这个时候恐怕会忍不住说一句你咋不上天呢?但是还真有人上天了。

3.Disruptor

Disruptor就是上面说的那个天,Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,并且是一个开源的并发框架,并获得2011Duke’s程序框架创新奖。能够在无锁的情况下实现网络的Queue并发操作,基于Disruptor开发的系统单线程能支撑每秒600万订单。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在内部集成了Disruptor用来替代jdk的队列,以此来获得高性能。

3.1为什么这么牛逼?

上面已经把Disruptor吹出了花了,你肯定会产生疑问,他真的能有这么牛逼吗,我的回答是当然的,在Disruptor中有三大杀器:

3.1.1锁和CAS

我们ArrayBlockingQueue为什么会被抛弃的一点,就是因为用了重量级lock锁,在我们加锁过程中我们会把锁挂起,解锁后,又会把线程恢复,这一过程会有一定的开销,并且我们一旦没有获取锁,这个线程就只能一直等待,这个线程什么事也不能做。

CAS(compare and swap),顾名思义先比较在交换,一般是比较是否是老的值,如果是的进行交换设置,大家熟悉乐观锁的人都知道CAS可以用来实现乐观锁,CAS中没有线程的上下文切换,减少了不必要的开销。 这里使用JMH,用两个线程,每次1一次调用,在我本机上进行测试,代码如下:

@BenchmarkMode({ 
   Mode.SampleTime}) @OutputTimeUnit(TimeUnit.MILLISECONDS) @Warmup(iterations=3, time = 5, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations=1,batchSize = ) @Threads(2) @Fork(1) @State(Scope.Benchmark) public class Myclass { 
    Lock lock = new ReentrantLock(); long i = 0; AtomicLong atomicLong = new AtomicLong(0); @Benchmark public void measureLock() { 
    lock.lock(); i++; lock.unlock(); } @Benchmark public void measureCAS() { 
    atomicLong.incrementAndGet(); } @Benchmark public void measureNoLock() { 
    i++; } } 

而我们的Disruptor中使用的就是CAS,他利用CAS进行队列中的一些下标设置,减少了锁的冲突,提高了性能。

另外对于jdk中其他的无锁队列也是使用CAS,原子类也是使用CAS。

3.1.2伪共享

为什么CPU会有L1、L2、L3这样的缓存设计?主要是因为现在的处理器太快了,而从内存中读取数据实在太慢(一个是因为内存本身速度不够,另一个是因为它离CPU太远了,总的来说需要让CPU等待几十甚至几百个时钟周期),这个时候为了保证CPU的速度,就需要延迟更小速度更快的内存提供帮助,而这就是缓存。对这个感兴趣可以把电脑CPU拆下来,自己把玩一下。

缓存行是万能的吗?NO,因为他依然带来了一个缺点,我在这里举个例子说明这个缺点,可以想象有个数组队列,ArrayQueue,他的数据结构如下:

class ArrayQueue{ 
    long maxSize; long currentIndex; } 
class LhsPadding { 
    protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding { 
    protected volatile long value; } class RhsPadding extends Value { 
    protected long p9, p10, p11, p12, p13, p14, p15; } 

其中的Value就被其他一些无用的long变量给填充了。这样你修改Value的时候,就不会影响到其他变量的缓存行。

最后顺便一提,在jdk8中提供了@Contended的注解,当然一般来说只允许Jdk中内部,如果你自己使用那就得配置Jvm参数 -RestricContentended = fase,将限制这个注解置位取消。很多文章分析了ConcurrentHashMap,但是都把这个注解给忽略掉了,在ConcurrentHashMap中就使用了这个注解,在ConcurrentHashMap每个桶都是单独的用计数器去做计算,而这个计数器由于时刻都在变化,所以被用这个注解进行填充缓存行优化,以此来增加性能。

在这里插入图片描述

3.1.3RingBuffer

在Disruptor中采用了数组的方式保存了我们的数据,上面我们也介绍了采用数组保存我们访问时很好的利用缓存,但是在Disruptor中进一步选择采用了环形数组进行保存数据,也就是RingBuffer。在这里先说明一下环形数组并不是真正的环形数组,在RingBuffer中是采用取余的方式进行访问的,比如数组大小为 10,0访问的是数组下标为0这个位置,其实10,20等访问的也是数组的下标为0的这个位置。

实际上,在这些框架中取余并不是使用%运算,都是使用的&与运算,这就要求你设置的大小一般是2的N次方也就是,10,100,1000等等,这样减去1的话就是,1,11,111,就能很好的使用index & (size -1),这样利用位运算就增加了访问速度。 如果在Disruptor中你不用2的N次方进行大小设置,他会抛出buffersize必须为2的N次方异常。

当然其不仅解决了数组快速访问的问题,也解决了不需要再次分配内存的问题,减少了垃圾回收,因为我们0,10,20等都是执行的同一片内存区域,这样就不需要再次分配内存,频繁的被JVM垃圾回收器回收。

在这里插入图片描述

自此三大杀器已经说完了,有了这三大杀器为Disruptor如此高性能垫定了基础。接下来还会在讲解如何使用Disruptor和Disruptor的具体的工作原理。

3.2Disruptor怎么使用

下面举了一个简单的例子:

ublic static void main(String[] args) throws Exception { 
    // 队列中的元素 class Element { 
    @Contended private String value; public String getValue() { 
    return value; } public void setValue(String value) { 
    this.value = value; } } // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory() { 
    int i = 0; @Override public Thread newThread(Runnable r) { 
    return new Thread(r, "simpleThread" + String.valueOf(i++)); } }; // RingBuffer生产工厂,初始化RingBuffer的时候使用 EventFactory<Element> factory = new EventFactory<Element>() { 
    @Override public Element newInstance() { 
    return new Element(); } }; // 处理Event的handler EventHandler<Element> handler = new EventHandler<Element>() { 
    @Override public void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException { 
    System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence); // Thread.sleep(); } }; // 阻塞策略 BlockingWaitStrategy strategy = new BlockingWaitStrategy(); // 指定RingBuffer的大小 int bufferSize = 8; // 创建disruptor,采用单生产者模式 Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy); // 设置EventHandler disruptor.handleEventsWith(handler); // 启动disruptor的线程 disruptor.start(); for (int i = 0; i < 10; i++) { 
    disruptor.publishEvent((element, sequence) -> { 
    System.out.println("之前的数据" + element.getValue() + "当前的sequence" + sequence); element.setValue("我是第" + sequence + "个"); }); } } 

在Disruptor中有几个比较关键的: ThreadFactory:这是一个线程工厂,用于我们Disruptor中生产者消费的时候需要的线程。 EventFactory:事件工厂,用于产生我们队列元素的工厂,在Disruptor中,他会在初始化的时候直接填充满RingBuffer,一次到位。 EventHandler:用于处理Event的handler,这里一个EventHandler可以看做是一个消费者,但是多个EventHandler他们都是独立消费的队列。 WorkHandler:也是用于处理Event的handler,和上面区别在于,多个消费者都是共享同一个队列。 WaitStrategy:等待策略,在Disruptor中有多种策略,来决定消费者获消费时,如果没有数据采取的策略是什么?下面简单列举一下Disruptor中的部分策略

BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。

BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu

LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可以减少lock加锁次数.

LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛异常。

YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu

EventTranslator:实现这个接口可以将我们的其他数据结构转换为在Disruptor中流通的Event。

3.3工作原理

上面已经介绍了CAS,减少伪共享,RingBuffer三大杀器,介绍下来说一下Disruptor中生产者和消费者的整个流程。

3.3.1生产者

对于生产者来说,可以分为多生产者和单生产者,用ProducerType.Single,和ProducerType.MULTI区分,多生产者和单生产者来说多了CAS,因为单生产者由于是单线程,所以不需要保证线程安全。

在disruptor中通常用disruptor.publishEvent和disruptor.publishEvents()进行单发和群发。

在disruptor发布一个事件进入队列需要下面几个步骤:

首先获取RingBuffer中下一个在RingBuffer上可以发布的位置,这个可以分为两类:

从来没有写过的位置

获取位置之后会进行cas进行抢占,如果是单线程就不需要。

接下来调用我们上面所介绍的EventTranslator将第一步中RingBuffer中那个位置的event交给EventTranslator进行重写。

进行发布,在disruptor还有一个额外的数组用来记录当前ringBuffer所在位置目前最新的序列号是多少,拿上面那个0,10,20举例,写到10的时候这个avliableBuffer就在对应的位置上记录目前这个是属于10,有什么用呢后面会介绍。进行发布的时候需要更新这个avliableBuffer,然后进行唤醒所有阻塞的生产者。

对了不知道大家对上述流程是不是很熟悉呢,对的那就是类似我们的2PC,两阶段提交,先进行RingBuffer的位置锁定,然后在进行提交和通知消费者。

3.3.1消费者

对于消费者来说,上面介绍了分为两种,一种是多个消费者独立消费,一种是多个消费者消费同一个队列,这里介绍一下较为复杂的多个消费者消费同一个队列,能理解这个也就能理解独立消费。 在我们的disruptor.strat()方法中会启动我们的消费者线程以此来进行后台消费。在消费者中有两个队列需要我们关注,一个是所有消费者共享的进度队列,还有个是每个消费者独立消费进度队列。 1.对消费者共享队列进行下一个Next的CAS抢占,以及对自己消费进度的队列标记当前进度。 2.为自己申请可读的RingBuffer的Next位置,这里的申请不仅仅是申请到next,有可能会申请到比Next大的一个范围,阻塞策略的申请的过程如下:

4.如果收缩完了之后比当前next要小,则继续循环申请。 5.交给handler.onEvent()处理

4.Log4j中的Disruptor

下面的图展现了Log4j使用Disruptor,ArrayBlockingQueue以及同步的Log4j吞吐量的对比,可以看见使用了Disruptor完爆了其他的,当然还有更多的框架使用了Disruptor,这里就不做介绍了。

在这里插入图片描述

最后

本文介绍了传统的阻塞队列的缺点,后文重点吹逼了下Disruptor,以及他这么牛逼的原因,以及具体的工作流程。

如果以后有人问你叫你设计一个高效无锁队列,需要怎么设计?相信你能从文章中总结出答案,如果对其有疑问或者想和我交流思路,可以加入我的Java学习群: ,和我一起讨论。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221989.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午4:49
下一篇 2026年3月17日 下午4:49


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号