C#并行编程之数据并行Tasks.Parallel类
作者:springsnow
一、并行概念
1、并行编程
在.NET 4中的并行编程是依赖Task Parallel Library(后面简称为TPL) 实现的。在TPL中,最基本的执行单元是task(中文可以理解为"任务"),一个task就代表了你要执行的一个操作。你可以为你所要执行的每一个操作定义一个task,TPL就负责创建线程来执行你所定义的task,并且管理线程。TPL是面向task的,自动的;而传统的多线程是以人工为导向的。
现在已经进入了多核的时代,我们的程序如何更多的利用好硬件cpu,答案是并行处理。在.net4.0之前我们要开发并行的程序是非常的困难,在.net4.0中,在命名空间System.Threading.Tasks提供了方便的并行开发的类库。
2、数据并行
数据并行指的是对源集合或数组的元素同时(即,并行)执行相同操作的场景。 在数据并行操作中,对源集合进行分区,以便多个线程能够同时在不同的网段上操作。
任务并行库 (TPL) 支持通过 System.Threading.Tasks.Parallel 类实现的数据并行。 此类对 for 循环和 foreach 循环提供了基于方法的并行执行。你为Parallel.For 或 Parallel.ForEach 循环编写的循环逻辑与编写连续循环的相似。 无需创建线程或列工作项。 在基本循环中,不需要加锁。TPL 为你处理所有低级别的工作。
Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()方法允许同时调用不同的方法。
二、Parallel.Invoke():并行调用多个任务 。
例1:同时调用2个任务
static void Main(string[] args) { var watch = Stopwatch.StartNew(); Parallel.Invoke(Run1, Run2); watch.Stop(); Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds) } static void Run1() { Console.WriteLine("我是任务一,我跑了3s"); Thread.Sleep(3000); } static void Run2() { Console.WriteLine("我是任务二,我跑了5s"); Thread.Sleep(5000); }
例2:说明并不是每个任务一个线程。
// 定义一个线程局部变量,返回其线程名 ThreadLocal<string> ThreadName = new ThreadLocal<string>(() => { return "Thread" + Thread.CurrentThread.ManagedThreadId; }); // 打印出当前线程名的方法。 Action action = () => { // 如果 ThreadName.IsValueCreated 为true,在这个线程上不是第一次运行这个方法。 bool repeat = ThreadName.IsValueCreated; Console.WriteLine("ThreadName = {0} {1}", ThreadName.Value, repeat ? "(repeat)" : ""); }; // 调用8个方法,你应该会看到一些重复的线程名 Parallel.Invoke(action, action, action, action, action, action, action, action); ThreadName.Dispose();
三、Parallel.For(): for 循环的并行运算
我们知道串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.for它会在底层根据硬件线程的运行状况来充分的使用所有的可利用的硬件线程,注意这里的Parallel.for的步行是1。
在For()方法中,前两个参数定义了循环的开头和结束。示例从0迭代到9。第3个参数是一个 Action<int>委托。整数参数是循环的迭代次数,该参数被传递给Action < int >委托引用的方法。 Parallel.For方法的返回类型是ParallelLoopResult结构,它提供了循环是否结束的信息。
ParallelLoopResult result = Parallel.For(0, 10, i => { Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId); Thread.Sleep(10); }); Console.WriteLine(result.IsCompleted);
首先先写一个普通的循环:
private void NormalFor() { for (var i = 0; i < 10000; i++) { for (var j = 0; j < 1000; j++) { for (var k = 0; k < 100; k++) { DoSomething(); } } } }
再看一个并行的For语句:
private void ParallelFor() { Parallel.For(0, 10000, i => { for (int j = 0; j < 1000; j++) { for (var k = 0; k < 100; k++) { DoSomething(); } } }); }
上面的例子中,只是将最外层的For语句替换成了Parallel.For,Parallel执行速度可以提高近一倍。
四、Parallel.ForEach():foreach 循环的并行运算
private void NormalForeach() { foreach (var file in GetFiles()) { DoSomething(); } } private void ParallelForeach() { Parallel.ForEach(GetFiles(), file => { DoSomething(); }); }
ForEach的使用跟For使用几乎是差不多了,只是在对非泛型的Collection进行操作的时候,需要通过Cast方法进行转换。
ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现。
for (int j = 1; j < 4; j++) { Console.WriteLine("\n第{0}次比较", j); ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); for (int i = 0; i < 3000000; i++) { bag.Add(i); } Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); bag = new ConcurrentBag<int>(); watch = Stopwatch.StartNew(); watch.Start(); Parallel.ForEach(Partitioner.Create(0, 3000000), i => { for (int m = i.Item1; m < i.Item2; m++) { bag.Add(m); } }); Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds); GC.Collect(); }
五、线程局部变量
下面这段代码多次运行每次的结果都不一样,因为total变量是公共的,而我们的程序是多个线程的加,而多个线程之间是不能把数据共享的。
public void NormalParallelTest() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; Parallel.For(0,nums.Length,i=> { total += nums[i]; }); Console.WriteLine("The total is {0}", total); }
其实我们需要的是在每个线程中计算出一个和值,然后再进行累加。我们来看看线程局部变量:
泛型方法Parallel.For<T>的原型:
public static ParallelLoopResult For<TLocal> (int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally );
- TLocal:线程变量的类型;第一个、第二个参数就不必多说了,就是起始值跟结束值。
- localInit:每个线程的线程局部变量初始值的设置;
- body:每次循环执行的方法,其中方法的最后一个参数就是线程局部变量;
- localFinally:每个线程之后执行的方法。
1、Parallel.For中定义局部变量:
从2开始,累加2个,得49.
int[] nums = Enumerable.Range(0, 10).ToArray(); long total = 0; Parallel.For<long>(0, nums.Length, () => { return 2; }, (j, loop, subtotal) =>//1、每次循环执行的方法 { subtotal += nums[j]; Console.WriteLine("主体: thread {1}, task {2},结果:{0}", j+ ":" +nums[j] + "-" + subtotal, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); return subtotal; }, (x) =>//2、每个线程执行之后执行的方法 { Console.WriteLine(" 最终执行:thread {1}, task {2},结果:{0} ", x, Thread.CurrentThread.ManagedThreadId, Task.CurrentId); Interlocked.Add(ref total, x); }); Console.WriteLine("The total is {0}", total);
2、Parallel.Each中定义局部变量:
要注意的是,我们必须要使用ForEach<TSource, TLocal>,因为第一个参数表示的是迭代源的类型,第二个表示的是线程局部变量的类型,其方法的参数跟For是差不多的。
public void ForeachThreadLocalTest() { int[] nums = Enumerable.Range(0, 1000000).ToArray(); long total = 0; Parallel.ForEach<int,long>(nums,()=>0, (member,loopState,subTotal)=>//1、每次循环执行的方法 { subTotal += member; return subTotal; }, (perLocal)=>//2、每个线程执行之后执行的方法 Interlocked.Add(ref total,perLocal) ); Console.WriteLine("The total is {0}", total); }
六、Break、Stop中断与停止线程
在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
- Break“中断”:表示完成当前线程上当前迭代之前的所有线程上的所有迭代,然后退出循环。(比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。)
- Stop“停止”:表示在方便的情况下尽快停止所有迭代。(比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。)
首先我们可以看到在Parallel.For的一个重载方法:
public static ParallelLoopResult For (int fromInclusive, int toExclusive, Action<int, ParallelLoopState > body)
在委托的最后一个参数类型为ParallelLoopState,而ParallelLoopState里面提供给我们两个方法:Break、Stop来终止迭代。
private void StopLoop() { var Stack = new ConcurrentStack<string>(); Parallel.For(0, 10000, (i, loopState ) => { if (i < 1000) Stack.Push(i.ToString()); else { loopState.Stop(); return; } }); Console.WriteLine("Stop Loop Info:\n elements count:{0}", Stack.Count); }
七、Cancel取消循环
在并行的循环中支持通过传递ParallelOptions参数中的CancellationToken进行取消循环的控制,我们可以CancellationTokenSource实例化之后传递给ParallelOptions对象Cancellation值。下面来看个示例:
在For循环的实现代码内部,Parallel类验证CancellationToken 的结果,并取消操作。一旦取消操作,For()方法就抛出个OperationCanceledException类型的异常,这是本例捕获的异常。使用 CancellationTokeri可以注册取消操作时的信息。为此,需要调用Register方法,并传递一个在取消 操作时调用的委托。
var cts = new CancellationTokenSource(); cts.Token.Register(() =>Console.WriteLine("*** token canceled")); // start a task that sends a cancel after 500 ms new Task(() => { Thread.Sleep(500); cts.Cancel(false); }).Start(); try { ParallelLoopResult result = Parallel.For(0, 100, new <strong>ParallelOptions</strong>() { CancellationToken = cts.Token, }, x => { Console.WriteLine("loop {0} started", x); int sum = 0; for (int i = 0; i < 100; i++) { Thread.Sleep(2); sum += i; } Console.WriteLine("loop {0} finished", x); }); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); }
八、Handel Exceptions异常处理
在处理并行循环的异常的与顺序循环异常的处理是有所不同的,并行循环里面可能会一个异常在多个循环中出现,或则一个线程上的异常导致另外一个线程上也出现异常。比较好的处理方式就是,首先获取所有的异常最后通过AggregateException来包装所有的循环的异常,循环结束后进行throw。看一段示例代码:
private void HandleNumbers(int[] numbers) { var exceptions = new ConcurrentQueue<Exception>(); Parallel.For(0, numbers.Length, i => { try { if (numbers[i] > 10 && numbers[i] < 20) { throw new Exception(String.Format("numbers[{0}] betwewn 10 to 20",i)); } } catch (Exception e) { exceptions.Enqueue(e); } }); if (exceptions.Count > 0) throw new AggregateException(exceptions); }
测试方法:
public void HandleExceptions() { var numbers = Enumerable.Range(0, 10000).ToArray(); try { this.HandleNumbers(numbers); } catch(AggregateException exceptions) { foreach (var ex in exceptions.InnerExceptions) { Console.WriteLine(ex.Message); } } }
对上面的方法说明下,在HandleNumbers方法中,就是一个小的demo如果元素的值出现在10-20之间就抛出异常。在上面我们的处理方法就是:在循环时通过队列将所有的异常都集中起来,循环结束后来抛出一个AggregateException。
到此这篇关于C#并行编程之数据并行Tasks.Parallel类的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持脚本之家。