有些图可能无法显示了,可以见这里:https://www.yuque.com/simonalong/jishu/qhdcb2
一、背景
1.来源
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内部的内存队列的延迟问题,而不是分布式队列。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。
2.应用背景和介绍
二、传统队列问题
首先这里说的队列也仅限于Java内部的消息队列
| 队列 | 有界性 | 锁 | 结构 | 队列类型 |
|---|---|---|---|---|
| ArrayBlockingQueue | 有界 | 加锁 | 数组 | 阻塞 |
| LinkedBlockingQueue | 可选 | 加锁 | 链表 | 阻塞 |
| ConcurrentLinkedQueue | 无界 | 无锁 | 链表 | 非阻塞 |
| LinkedTransferQueue | 无界 | 无锁 | 链表 | 阻塞 |
| PriorityBlockingQueue | 无界 | 加锁 | 堆 | 阻塞 |
| DelayQueue | 无界 | 加锁 | 堆 | 阻塞 |
1.伪共享概念
共享
缓存行
刚刚说的缓存失效其实指的是Cache line的失效,也就是缓存行,Cache是由很多个Cache line 组成的,每个缓存行大小是32~128字节(通常是64字节)。我们这里假设缓存行是64字节,而java的一个Long类型是8字节,这样的话一个缓存行就可以存8个Long类型的变量,如下图所示:

一个缓存对应的缓存行的结构图
cpu 每次从主内存中获取数据的时候都会将相邻的数据存入到同一个缓存行中。假设我们访问一个Long内存对应的数组的时候,如果其中一个被加载到内存中,那么对应的后面的7个数据也会被加载到对应的缓存行中,这样就会非常快的访问数据。
伪共享
刚我们说了缓存的失效其实就是缓存行的失效,缓存行失效的原理是什么,这里又涉及到一个MESI协议(缓存一致性协议),我们这里不介绍这个,我们只需知道这个会在一个缓存中的数据变化的时候会将其他所有存储该缓存的缓存(其实是缓存行)都失效,感兴趣的可以看下附录部分,首先我们用Disruptor中很经典的讲解伪共享的图来讲解下:

2.ArrayBlockingQueue 的伪共享问题
刚我们已经讲了伪共享的问题,那么ArrayBlockingQueue的这个伪共享问题存在于哪里呢,分析下核心的部分源码
public void put(E e) throws InterruptedException {
checkNotNull(e); final ReentrantLock lock = this.lock; //获取当前对象锁 lock.lockInterruptibly(); try {
while (count == items.length) //阻塞并释放锁,等待notFull.signal()通知 notFull.await(); //将数据放入数组 enqueue(e); } finally {
lock.unlock(); } }
private void enqueue(E x) {
final Object[] items = this.items; //putIndex 就是入队的下标 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //加锁 lock.lockInterruptibly(); try {
while (count == 0) //阻塞并释放对象锁,并等待notEmpty.signal()通知 notEmpty.await(); //在数据不为空的情况下 return dequeue(); } finally {
lock.unlock(); } }
private E dequeue() {
final Object[] items = this.items; //takeIndex 是出队的下标 E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

这三个变量很容易放到同一个缓存行中,为此专门用一个伪共享检测工具进行检测,目前检测伪共享的工具只有Intel的Intel Vtune 目前刚发现有mac os 版本,但是经过测试发现,该工具无法分析macOs 的处理器配置,用的时候发现如下错误“无法检测到支持的处理器配置”,这个可以遗留给其他同学,工具的安装和使用方式,可以查看附录中的另外的一个连接。
三、高性能原理
- 引入环形的数组结构:数组元素不会被回收,避免频繁的GC,
- 无锁的设计:采用CAS无锁方式,保证线程的安全性
- 属性填充:通过添加额外的无用信息,避免伪共享问题
- 元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增
1.环形数组结构
其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。
2.生产和消费模式
多生产者——生产
假设现在又两个生产者,开始写数据,通过CAS竞争,w1得到的34的空间,w2得到了78的空间,其中6是代表已被写入或者没有被消费的数据。

多生产者——消费
绿色代表已经写OK的数据
假设三个生产者在写中,还没有置位AvailableBuffer,那么消费者可获取的消费下标只能获取到6,然后等生产者都写OK后,通知到消费者,消费者继续重复上面的步骤。如下图

消费者常见的等待
BusySpinWaitStrategy: 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。 BlockingWaitStrategy: 使用锁和条件变量。CPU资源的占用少,延迟大,默认等待策略。 SleepingWaitStrategy: 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。 YieldingWaitStrategy: 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。 PhasedBackoffWaitStrategy: 上面多种策略的综合,CPU资源的占用少,延迟大
3.牛逼的下标指针
RingBuffer的指针(Sequence)属于一个volatile变量,同时也是我们能够不用锁操作就能实现Disruptor的原因之一,而且通过缓存行补充,避免伪共享问题。 该所谓指针是通过一直自增的方式来获取下一个可写或者可读数据,该数据是Long类型,不用担心会爆掉。有人计算过: long的范围最大可以达到9223372036854775807,一年365 * 24 * 60 * 60 = 31536000秒,每秒产生1W条数据,也可以使用292年。
class LhsPadding{
//缓存行补齐, 提升cache缓存命中率 protected long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding{
protected volatile long value; } class RhsPadding extends Value{
//缓存行补齐, 提升cache缓存命中率 protected long p9, p10, p11, p12, p13, p14, p15; } public class Sequence extends RhsPadding{
... }
四、用法
用法很简单,一共三个角色:生产者,消费者,disruptor对象
1.简单用法
disruptor对象
disruptor 就两个构造方法
public Disruptor( final EventFactory<T> eventFactory, // 数据实体构造工厂 final int ringBufferSize, // 队列大小,必须是2的次方 final ThreadFactory threadFactory, // 线程工厂 final ProducerType producerType, // 生产者类型,单个生产者还是多个 final WaitStrategy waitStrategy){
// 消费者等待策略 ... } public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory){
... }
生产处理
生产者这里没有固定的对象,只是需要获取放置数据的位置,然后进行publish
public void send(String data){
RingBuffer<MsgEvent> ringBuffer = this.disruptor.getRingBuffer(); //获取下一个放置数据的位置 long next = ringBuffer.next(); try{
MsgEvent event = ringBuffer.get(next); event.setValue(data); }finally {
//发布事件 ringBuffer.publish(next); } }
消费处理
消费处理可以有如下几种
public EventHandlerGroup<T> handleEventsWith(final EventHandler
super T>... handlers){
... } public EventHandlerGroup<T> handleEventsWith(final EventProcessor... processors){
... } public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories){
... } public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers){
... }
简单用例
//消费者 public class MsgConsumer implements EventHandler<MsgEvent>{
private String name; public MsgConsumer(String name){
this.name = name; } @Override public void onEvent(MsgEvent msgEvent, long l, boolean b) throws Exception {
System.out.println(this.name+" -> 接收到信息: "+msgEvent.getValue()); } } //生产者处理 public class MsgProducer {
private Disruptor disruptor; public MsgProducer(Disruptor disruptor){
this.disruptor = disruptor; } public void send(String data){
RingBuffer<MsgEvent> ringBuffer = this.disruptor.getRingBuffer(); long next = ringBuffer.next(); try{
MsgEvent event = ringBuffer.get(next); event.setValue(data); }finally {
ringBuffer.publish(next); } } public void send(List<String> dataList){
dataList.stream().forEach(data -> this.send(data)); } } //触发测试 public class DisruptorDemo {
@Test public void test(){
Disruptor<MsgEvent> disruptor = new Disruptor<>(MsgEvent::new, 1024, Executors.defaultThreadFactory()); //定义消费者 MsgConsumer msg1 = new MsgConsumer("1"); MsgConsumer msg2 = new MsgConsumer("2"); MsgConsumer msg3 = new MsgConsumer("3"); //绑定配置关系 disruptor.handleEventsWith(msg1, msg2, msg3); disruptor.start(); // 定义要发送的数据 MsgProducer msgProducer = new MsgProducer(disruptor); msgProducer.send(Arrays.asList("nihao","hah")); } }
输出(消费没有固定顺序):
1 -> 接收到信息: nihao 3 -> 接收到信息: nihao 3 -> 接收到信息: hah 2 -> 接收到信息: nihao 2 -> 接收到信息: hah 1 -> 接收到信息: hah
2.其他用法
上面主要介绍了多消费统一消费,但是在生产者模型中是有很多种,如下,一对一,一对多,多对多,多对一

生产者配置
其中生产模式中的单生产者模式和多生产模式,这里主要是通过一个枚举:ProduceType来区分,建议,多个生产者用多生产者模式,性能会好点。
消费者配置
消费者模式这里分为两种:
统一消费像上面简单用法中运用即可,对于分组消费,用函数 handleEventsWithWorkerPool 即可
/ * 分组处理 handleEventWithWorkerPool */ @Test public void test1(){
Disruptor<MsgEvent> disruptor = new Disruptor(MsgEvent::new, 1024, Executors.defaultThreadFactory()); disruptor.handleEventsWithWorkerPool(new MyWorkHandler("work1"), new MyWorkHandler("work2")); disruptor.start(); MsgProducer msgProducer = new MsgProducer(disruptor); msgProducer.send(Arrays.asList("aaa","bbb")); }
输出:
work1 : MsgEvent(value=bbb) work2 : MsgEvent(value=aaa) work1 : MsgEvent(value=cc) work2 : MsgEvent(value=dd)
消费顺序配置
在消费配置中,这里可以有很多种消费方式,比如:
1.消费者的顺序消费
/ * 测试顺序消费 * 每一条消息的消费者1和3消费完毕后,消费者2再进行消费 */ @Test public void test2(){
MsgConsumer msg1 = new MsgConsumer("1"); MsgConsumer msg2 = new MsgConsumer("2"); MsgConsumer msg3 = new MsgConsumer("3"); Disruptor<MsgEvent> disruptor = new Disruptor(MsgEvent::new, 1024, Executors.defaultThreadFactory()); disruptor.handleEventsWith(msg1, msg3).then(msg2); disruptor.start(); MsgProducer msgProducer = new MsgProducer(disruptor); msgProducer.send(Arrays.asList("aaa", "bbb", "ccc", "ddd")); }
输出(里面的是根据每一条消息的消费者顺序):
1 -> 接收到信息: aaa 3 -> 接收到信息: aaa 1 -> 接收到信息: bbb 1 -> 接收到信息: ccc 2 -> 接收到信息: aaa 3 -> 接收到信息: bbb 3 -> 接收到信息: ccc 3 -> 接收到信息: ddd 1 -> 接收到信息: ddd 2 -> 接收到信息: bbb 2 -> 接收到信息: ccc 2 -> 接收到信息: ddd
2.消费分为多个支线,而且也有消费顺序问题
/ * 测试多支线消费 * 消费者1和消费者3一个支线,消费者2和消费者4一个支线,消费者3和消费者4消费完毕后,消费者5再进行消费 */ @Test public void test3(){
MsgConsumer msg1 = new MsgConsumer("1"); MsgConsumer msg2 = new MsgConsumer("2"); MsgConsumer msg3 = new MsgConsumer("3"); MsgConsumer msg4 = new MsgConsumer("4"); MsgConsumer msg5 = new MsgConsumer("5"); //支线:消费者1和消费者3 disruptor.handleEventsWith(msg1, msg3); //支线:消费者2和消费者4 disruptor.handleEventsWith(msg2, msg4); //消费者3和消费者4执行完之后,指向消费者5 disruptor.after(msg3, msg4).handleEventsWith(msg5); disruptor.start(); MsgProducer msgProducer = new MsgProducer(disruptor); msgProducer.send(Arrays.asList("aaa", "bbb", "ccc", "ddd")); }
1 -> 接收到信息: aaa 2 -> 接收到信息: aaa 2 -> 接收到信息: bbb 3 -> 接收到信息: aaa 3 -> 接收到信息: bbb 4 -> 接收到信息: aaa 4 -> 接收到信息: bbb 5 -> 接收到信息: aaa 1 -> 接收到信息: bbb 5 -> 接收到信息: bbb
五、常见问题
下面介绍下一些常见问题。
1.disruptor应该如何用才能发挥最大功效?
disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。
2.如果buffer常常是满的怎么办?
一种是把buffer变大,另一种是从源头解决producer和consumer速度差异太大问题,比如试着把producer分流,或者用多个disruptor,使每个disruptor的load变小。
3. 什么时候使用disruptor?
如果对延迟的需求很高,可以考虑使用。
六、参考:
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/201385.html原文链接:https://javaforall.net
