首页 > Python基础教程 >
-
C#教程之并行编程(Parallel Framework)
前言
并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集。
并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。
并行编程分为如下几个结构:
1.并行的LINQ或PLINQ
2.Parallel类
3.任务并行结构
4.并发集合
5.SpinLock和SpinWait
这些是.NET 4.0引入的功能,一般被称为PFX(Parallel Framework,并行框架)。
Parallel类和任务并行结构称为TPL(Task Parallel Library,任务并行库)。
并行框架(PFX)
1.并行框架基础
当前CPU技术达到瓶颈,而制造商将关注重点转移到提高内核技术上,而标准单线程代码并不会因此而自动提高运行速度。
利用多核提升程序性能通常需要对计算密集型代码进行一些处理:
1.将代码划分成块。
2.通过多线程并行执行这些代码块。
3.结果变为可用后,以线程安全和高性能的方式整合这些结果。
传统多线程结构虽然实现功能,但难度颇高且不方便,特别是划分和整理的步骤(本质问题是:多线程同时使用相同数据时,出于线程安全考虑进行锁定的常用策略会引发大量竞争)。
而并行框架(Parallel Framework)专门用于在这些应用场景中提供帮助。
2.并行框架组成
PFX:高层由两个数据并行API组成:PLINQ或Parallel类。底层包含任务并行类和一组另外的结构为并行编程提供帮助。
基础并行语言集成查询(PLINQ)
语言集成查询(Language Integrated Query,LINQ)提供了一个简捷的语法来查询数据集合。而这种由一个线程顺序处理数据集合的方式我们称为顺序查询(sequential query)。
并行语言集成查询(Parallel LINQ)是LINQ的并行版。它将顺序查询转换为并行查询,在内部使用任务,将集合中数据项的处理工作分散到多个CPU上,以并发处理多个数据项。
PLINQ将自动并行化本地的LINQ查询,System.Linq.ParallelEnumerable类(它定义在System.Core.dll中,需要引用System.Linq)公开了所有标准LINQ操作符的并行版本。这些所有方法是依据System.Linq.ParallelQuery<T>扩展而来。
1.LINQ to PLINQ
要让LINQ查询调用并行版本,必须将自己的顺序查询(基于IEnumerable或IEnumerable<T>)转换成并行查询(基于ParallelQuery或ParallelQuery<T>),使用ParallelEnumerable的AsParallel方法实现,如示例:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 IEnumerable<int> numbers = Enumerable.Range(1, 1000); 6 ParallelQuery parallelQuery = 7 from n in numbers.AsParallel()//转换为并行 8 where n > 3 9 select n; 10 foreach (var item in parallelQuery) 11 { 12 Console.WriteLine(item); 13 } 14 Console.ReadKey(); 15 } 16 }
结果如下:使用Enumerable.Range生成的集合是顺序的,但是经过并行查询后顺序被打乱。
2.PLINQ to LINQ
将执行并行查询的操作切换回执行顺序查询(并不常用),通过ParalleIEnumerable的AsSequential实现。此时操作只由一个线程执行。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 IEnumerable<int> numbers = Enumerable.Range(1, 1000); 6 IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3); 7 foreach (var item in enumerable) 8 { 9 Console.WriteLine(item); 10 } 11 Console.ReadKey(); 12 } 13 }
3.整合结果集(ForAll)
通常,一个LINQ查询的结果数据是让某个线程执行一个foreach来处理,此时只有一个线程遍历查询的所有结果,如果希望以并行方式处理查询结果,通过ParalleIEnumerable的ForAll方法处理查询,如示例:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 IEnumerable<int> numbers = Enumerable.Range(1, 1000); 6 (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) => 7 { 8 d = d + 1000; 9 Console.WriteLine(d);//Console在此回损害性能,因为内部回对线程进行同步,此处因演示所以暂且一用 10 }); 11 Console.ReadKey(); 12 } 13 }
执行结果如下:
解析PLINQ
1.PLINQ执行模型
如图所示:
2.异常处理
PLINQ的报错将以AggregateException形式被重抛,其中InnerExceeptions属性包含一个或多个真正异常,示例可看 异步编程(async&await)内的异常处理部分。
3.PLINQ结果的排序
并行化查询当整理结果时不能保持初始化数据的原始次序。如果要保持序列的原始序列,可以通过在AsParallel之后调用AsOrdered来强制实现:
1 IEnumerable<int> numbers = Enumerable.Range(1, 10000); 2 var enumerable = numbers.AsParallel().Where(c => c > 3);
调用AsOrdered时,因为PLINQ要保持跟踪每个元素的原始位置,会导致性能损失。
调用AsUnordered,可以在后续的查询中低效AsOrdered产生的副作用,允许查询从调用AsUnordered时起更高效的执行。
4.PLINQ存在的局限与限制
1.若要使PLINQ发挥作用,必须具有一定数量的计算密集型工作可分配给工作者线程。大多数的LINQ to Objects查询执行速度很快,不仅没有必要并行化,而且划分、整理和协调额外线程的开销实际上会降低执行速度。而且查询若调用了非线程安全的方法,PLINQ的结果有可能不正确。
2.PLINQ能够并行化的内容还有些限制,以下查询运算符防止查询被并行化,除非源元素位于他们的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。
3.以下查询运算符是并行化的,但所使用的复杂划分策略有时可能比顺序处理的速度还要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。
5.PLINQ的结果
和普通LINQ查询一样,PLINQ查询也是延迟求值的。意味着执行只在开始使用时触发。但是列举结果集时和普通顺序查询有区别:
顺序查询:完全由使用者从输入序列中“拉取”每个元素。
并行查询:通常使用独立线程来获取序列中的元素,时间上比使用者需要它们时要提前,再通过查询链并行处理元素后将结果保存在一块缓存中,以便使用者按需取用。
注意:过早暂停结果列举,查询处理器也会暂停或结束,目的是不浪费CPU的时间或内存。在调用AsParallel之后调用WithMergeOptions可以调节PLINQ的缓冲行为。
6.如何使用PLINQ
为何优化将LINQ都并行化是不可取的,因为LINQ能解决大多数问题,执行速度也很快,因此无法从并行化中收益。
一种更好的方式是找出CPU密集的瓶颈,然后考虑通过LINQ的形式表达(这类重构,LINQ往往会使代码量变少,而且增强可读性)。
PLINQ十分适用于易并行问题。他还可以很好地处理结构化的阻塞任务。
PLINQ不适于镜像制作,因为将数百万元素整理为一个输出序列将带来瓶颈,相反将元素写入一个数组或托管内存块中,然后使用Parallel类或任务并行管理多线程是更好的选择。
Parallel类
Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性。
PFX通过Parallel类中的三个静态方法,提供了一种基本形式的结构化并行机制:
1.Parallel.Invoke
Parallel.Invoke:用于并行执行一组委托,示例如下:
1 static void Main(string[] args) 2 { 3 Parallel.Invoke( 4 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}"), 5 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}") 6 ); 7 Console.ReadKey(); 8 }
执行结果
Parallel.Invoke方法并行执行一组Action委托,然后等待它们完成。
1 public static void Invoke(params Action[] actions);
示例看起来像是创建和等待两个Task对象的一种捷径。但两者存在重要的区别:
如果传入一个包含数据量非常大的委托数组时,Parallel.Invoke方法仍然能高效工作,这是因为在底层,Parallel.Invoke方法是将大量元素划分成较小的块,分配给底层的Task执行,而不是每个委托创建一个独立Task。
2.Parallel.For
Parallel.For:执行C# for循环的并行化等价循环,示例如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //顺序循环 6 { 7 for (int i = 0; i < 10; i++) 8 { 9 Test(i); 10 } 11 } 12 Console.WriteLine("并行化for开始"); 13 //顺序执行转换为并行化 14 { 15 Parallel.For(0, 10, i => Test(i)); 16 } 17 //顺序执行转换为并行化(更简单的方式) 18 { 19 Parallel.For(0, 10, Test); 20 } 21 Console.ReadKey(); 22 } 23 static void Test(int i) 24 { 25 Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}"); 26 } 27 }
结果如下:
3.Parallel.ForEach
Parallel.ForEach:执行C# foreach循环的并行化等价循环,示例如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" }; 6 //顺序循环 7 { 8 foreach (string num in data) 9 { 10 Test(num); 11 } 12 } 13 Console.WriteLine("并行化foreach开始"); 14 //顺序执行转换为并行化 15 { 16 Parallel.ForEach(data, num => Test(num)); 17 } 18 Console.ReadKey(); 19 } 20 static void Test(string str) 21 { 22 Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{str}"); 23 } 24 }
执行结果:
注意:以上三个方法都会引发阻塞直到所有工作完成为止。和PLINQ一样,在出现未处理的异常之后,余下的工作者在它们当前的迭代之后停止,而一场将被抛回给调用者,并封装在一个AggregateException中。
4.索引&跳出(ParallelLoopState)
有时迭代索引很有用处,但是切忌不可像顺序循环的用法使用共享变量(循环内i++)的方式使用,因为共享变量值在并行上下文中是线程不安全的。
同样的,因为并行For或ForEach中的循环体是一个委托,所以无法使用break语句提前退出循环,必须调用ParallelLoopState对象上的Break或Stop方法。
以ForEach为例,ForEach重载的其中之一如下,它包含Acton的其中有三个参数(TSourec=子元素,ParallelLoopState=并行循环状态,long=索引):
1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
所以,想要得到索引和提前跳出的正确方式如示例:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine