一个简单的Parallel.ForEach实现

一个简单的Parallel.ForEach实现在.net的TaskParallelLibrary中有一个很方便的功能Parallel.ForEach,可以实现多任务的并发执行,另外还带着栅栏功能,非常好用。但是这一功能必须需要clr4.0支持(CTP版的不大好用),对于低版本的.net要实现类似功能只有自己写一个了。codeproject上面文章PoorMan’sParallel.ForEachIterator中就有一种简单而…

大家好,又见面了,我是你们的朋友全栈君。

在.net的Task Parallel Library中有一个很方便的功能Parallel.ForEach,可以实现多任务的并发执行,另外还带着栅栏功能,非常好用。但是这一功能必须需要clr4.0支持(CTP版的不大好用),对于低版本的.net要实现类似功能只有自己写一个了。

codeproject上面文章Poor Man’s Parallel.ForEach Iterator中就有一种简单而有效的实现。但作者附录的代码有如下几个问题:

  1. 无法对每个并发任务分别制定不同的线程数
  2. 算法本身有点问题,任务执行完会报错
  3. 不能快速响应异常

针对以上几点,我对那段代码做了一点小改进,代码如下:

static class Parallel
{

    public static void ParallelForEach<T>(this IEnumerable<T> enumerable, Action<T> action, int NumberOfParallelTasks)
    {

        var syncRoot = new object();

        if (enumerable == null) return;

        var enumerator = enumerable.GetEnumerator();

        InvokeAsync<T> del = InvokeAction;

        var seedItemArray = new T[NumberOfParallelTasks];
        var resultList = new List<IAsyncResult>(NumberOfParallelTasks);
        var waitHanles = new List<WaitHandle>(NumberOfParallelTasks);

        for (int i = 0; i < NumberOfParallelTasks; i++)
        {

            lock (syncRoot)
            {

                if (!enumerator.MoveNext())
                    break;
                seedItemArray[i] = enumerator.Current;
            }

            var iAsyncResult = del.BeginInvoke(enumerator, action, seedItemArray[i], syncRoot, i, null, null);
            resultList.Add(iAsyncResult);
            waitHanles.Add(iAsyncResult.AsyncWaitHandle);
        }

        var taskCount = waitHanles.Count;

        for (int i = 0; i < taskCount; i++)
        {

            var index = WaitHandle.WaitAny(waitHanles.ToArray());
            del.EndInvoke(resultList[index]);
            resultList[index].AsyncWaitHandle.Close();
            waitHanles.RemoveAt(index);
            resultList.RemoveAt(index);
        }
    }

    delegate void InvokeAsync<T>(IEnumerator<T> enumerator,
    Action<T> achtion, T item, object syncRoot, int i);

    static void InvokeAction<T>(IEnumerator<T> enumerator, Action<T> action,
            T item, object syncRoot, int i)
    {

        //if (String.IsNullOrEmpty(Thread.CurrentThread.Name))
        // Thread.CurrentThread.Name =
        //String.Format(“Parallel.ForEach Worker Thread No:{0}”, i);

        bool moveNext = true;

        while (moveNext)
        {

            try
            {

                action.Invoke(item);
            }
            catch (Exception)
            {

                throw;
            }

            lock (syncRoot)
            {

                moveNext = enumerator.MoveNext();
                if (moveNext)
                    item = enumerator.Current;
            }
        }
    }
}

整个算法非常简洁,这里就不多介绍了。如果有错误欢迎指正。

转载于:https://www.cnblogs.com/TianFang/archive/2009/06/28/1512588.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/161829.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 【shell案例】学员管理系统「建议收藏」

    【shell案例】学员管理系统「建议收藏」前言学员管理系统涉及到学员的增删改查,这是一个综合性比较强的项目,在所有的编程语言里都会有不同版本的学员信息管理系统,难度适中效果截图学员管理系统源码#!/bin/bash##随机点名册whiletruedo echo”学生随机点名系统” echo”1.添加学员名单” echo”2.遍历学员名单” echo”3.随机点名” echo”4.删除某个学员的信息” echo”5.退出” read-p”请输入你要选择的序号:”num case

    2022年9月21日
    0
  • mysql 主从1146_mysql 主从复制1146错误处理办法

    mysql 主从1146_mysql 主从复制1146错误处理办法错误现象:Replicate_Wild_Ignore_Table:Last_Errno:1146Last_Error:Error’Table’mydb.test1146’doesn’texist’onquery.Defaultdatabase:’mydb’.Query:’insertintotest1146values(‘bigdiao’)’方法一、在slave上重…

    2022年5月6日
    109
  • 调用第三方接口大致流程

    调用第三方接口大致流程下面以风控为例,业务是调用第三方接口获取支付宝报告天机支付宝获取流程:1本质:中转站:前台把参数传给我,我接受参数后传给天机,天机在传给支付宝,最后获取数据,在这个过程中   我们和天机都充当的是中转站的角色。2流程:a前台传客户的基本信息参数    b后台接受参数,传给天机,天机返回淘宝的认证地址链接,后台把链接返回给前台;    c前台打开链接,进入认证页面,进行认…

    2022年6月1日
    48
  • git基本使用(超详细)[通俗易懂]

    git基本使用(超详细)[通俗易懂]git基本使用一:Git是什么?Git是目前世界上最先进的分布式版本控制系统。二:SVN与Git的最主要的区别?1.SVN是集中式版本控制系统,版本库是集中放在中央服务器的,而干活的时候,用的都是自己的电脑,所以首先要从中央服务器哪里得到最新的版本,然后干活,干完后,需要把自己做完的活推送到中央服务器。集中式版本控制系统是必须联网才能工作,如果在局域网还可以,带宽够大,速度够快,如果在互联网下,如果网速慢的话,就纳闷了。2.Git是分布式版本控制系统,那么它就没有中央服务器的,每个人的电脑就是一个

    2022年9月21日
    0
  • eclipse经常卡死的解决方法总结_eclipse运行一段时间后死机

    eclipse经常卡死的解决方法总结_eclipse运行一段时间后死机使用eclipse3.6版本时,每当用alt+/或.来自动补全代码时,eclipse经常会卡死。这是eclipse3.6版本的一个bug,网上有朋友直接选择打补丁,也有人配置eclipse的Contentassist延时加长来解决这个问题。既然是版本问题,还是直接治标好了,升级新版本的eclipse解决这个问题:Help->checkforupdates选择要更新的组件

    2022年10月10日
    0
  • 一篇文章让你了解Hive和HBase的区别

    相信做大数据开发的朋友对hive和HBase一定不会陌生。HBASE想了解更多大数据相关知识可以点击“了解更多”Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。HBase是Hadoop的数据库,一个分布式、可扩展、大数据的存储。单个的从字面意思上或许很难看出二者…

    2022年4月9日
    53

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号