基于 CAS 无锁实现的 Disruptor.NET 居然慢于 BlockingCollection,是真的吗?

基于 CAS 无锁实现的 Disruptor.NET 居然慢于 BlockingCollection,是真的吗?

StackOverflow 有人说自己的 Disruptor.NET 代码比 BlockingCollection 还有慢 2 倍,并且把完整代码贴出,楼下几个老外也的回复说了一堆,但是没研究出个所以然来,讨论到最后甚至说可能你的场景不适合 Disruptor,我对此表示怀疑,BlockingCollection 内部实现比较简单粗暴,必要时就加锁,取数据时用信号量等待添加操作完成,而 Disruptor 是专门针对 CPU 缓存的特性优化过的,内部没有锁只有 CAS 原子操作,而且还考虑到了 false sharing,因此理论上 Disruptor 不会比 BlockingCollection 慢。

 

可是既然实际应用上出现问题,那就要分析下原因了。

把他的代码弄下来看了一下,问题多多啊。

在 Disruptor EventHandler 里面不定时调用 Console.WriteLine ,但是在 BlockingCollection 的 Handler 里面却只是记录了数据, Console.WriteLine 内部可是有锁的,调用的开销很大,如何能取得公平的结果呢?

另外 RingBuffer 的 Size 太小只有 64,严重影响 Disruptor 的表现,实际测试对比下来,应该 1024 或更大。

还有 BlockingCollection 里面的 while (!dataItems.IsCompleted) 写的也有问题,即使 BlockingCollection Producer 在循环中一直做添加操作,BlockingCollection 内部状态也并不是一直在添加状态中,这样导致添加循环还没做完,可是计时器的循环已经提前结束,导致 BlockingCollection 测得时间少于实际时间。

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

 

然后重新修改了他的代码,实测 Disruptor 10 倍速度于 BlockingCollection (这里插一句题外话,Disruptor .NET 版本的速度全面快于 Java 版本,不少场景下的速度比 Java 版本要快 10 倍,.NET 版是从 Java 移植过来的实现也和 Java 保持一直,是哪些语言特性导致性能差异这么大呢?)。

 

然后我拿着实测的结果和修改后的代码,在 Stackoverflow 上这个问题下面贴上了我的回答:

I read the BlockingCollecton code, You add many Console.WriteLine in Disruptor but no one in BlockingCollection, Console.WriteLine is slow, it have a lock inside.

Your RingBufferSize is too small, this effects performance, shoule be 1024 or larger.

and while (!dataItems.IsCompleted) may have some problem, BlockCollection isn’t always in adding state, it will cause thread ends early.

I have rewrite you code, Disruptor is 10x faster than BlockingCollection with multi producer (10 parallel producet), 2x faster than BlockingCollection with Single producer:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}
 

BlockingCollectionTest with a shared ValueEntry instance (no new ValueEntry() in for loop)

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:16.962s

    BlockingCollectionTest Time:18.399

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0 DisruptorTest Time:6.101s

    BlockingCollectionTest Time:19.526

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.928s

    BlockingCollectionTest Time:20.25

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.448s

    BlockingCollectionTest Time:20.649

BlockingCollectionTest create a new ValueEntry() in for loop

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:27.374s

    BlockingCollectionTest Time:21.955

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:5.011s

    BlockingCollectionTest Time:20.127

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.877s

    BlockingCollectionTest Time:22.656

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    DisruptorTest Time:2.384s

    BlockingCollectionTest Time:23.567

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/119604.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 8款最好用的Java集成开发工具(IDE)

    8款最好用的Java集成开发工具(IDE)8款最好用的Java集成开发工具(IDE)IDE的选择涉及到很多方面,例如项目性质、团队和企业的偏好等等,但是对于一些基本的需求,每一个好的IDE都是必须满足的,例如它要支持你使用的语言,无论是是Scala、还是Groovy或者是Java8,IDE都应该能完美支持。其次,它还要与控制系统兼容;然后,它还要帮助程序员轻松处理文本;最后,它还要支持可靠快速的调试和测试。Thi

    2022年7月8日
    21
  • 织梦php如何完全卸载,织梦dedecms如何去掉或删除power by dedecms

    织梦php如何完全卸载,织梦dedecms如何去掉或删除power by dedecms做贼心虚——当看到网站页面中出现powerbydedecms,哥的心里总感觉虚得慌。为何在使用dedecms时,自己并不想让别人知道该网站是用dedecms做的呢?是为了网站安全考虑不透露信息,还是不想让人知道你用的仅是开源系统,low逼了一地!一些用wordpress搭建的网站,常看到网页底部有一行字或配小图,大意是:自豪地使用wordpress来进行创作。而作为具备同样功能的dedecms…

    2022年7月13日
    11
  • vue常用组件封装_vue组件全局注册和局部注册

    vue常用组件封装_vue组件全局注册和局部注册项目中肯定会常用的一些基础组件,比如弹窗,toast之类的,要是在每个页面去引入的话那也太麻烦了,还好vue提供了一个全局注册组件的api,即Vue.compoment。在入口文件main.js里import需要的组件,使用Vue.compoment注册即可//src/main.jsimportmodelfrom’@/components/BaseModel’importtoastfrom’@/components/BaseButton’Vue.component(‘BaseMod

    2025年11月20日
    6
  • MySQL 主键详解

    MySQL 主键详解先来看下我们正常的建表代码 CREATETABLEu t idint 11 NOTNULLAUTO INCREMENT user namevarchar 40 NOTNULL passwordvarc 255 NOTNULL ageint 4 NOTNULL PRIMARYKEY id ENGINE InnoDBAUTO INCRE

    2025年9月18日
    4
  • 深度学习笔记三:反向传播(backpropagation)算法[通俗易懂]

    深度学习笔记三:反向传播(backpropagation)算法[通俗易懂]接上一篇的最后,我们要训练多层网络的时候,最后关键的部分就是求梯度啦。纯数学方法几乎是不可能的,那么反向传播算法就是用来求梯度的,用了一个很巧妙的方法。反向传播算法应该是神经网络最基本最需要弄懂的方法了,要是反向传播方法不懂,后面基本上进行不下去。非常推荐的是Howthebackpropagationalgorithmworks在最开始的博客中提过,这本书是这篇笔记用到的教材之

    2022年5月5日
    67
  • 【转载】这才是真正的分布式锁

    【转载】这才是真正的分布式锁

    2021年11月20日
    42

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号