VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > Python基础教程 >
  • C#教程之并行编程(Parallel Framework)

前言

并行编程:通过编码方式利用多核或多处理器称为并行编程,多线程概念的一个子集。

并行处理:把正在执行的大量的任务分割成小块,分配给多个同时运行的线程。多线程的一种。

并行编程分为如下几个结构

1.并行的LINQPLINQ

2.Parallel

3.任务并行结构

4.并发集合

5.SpinLockSpinWait

这些是.NET 4.0引入的功能,一般被称为PFX(Parallel Framework,并行框架)

Parallel类和任务并行结构称为TPL(Task Parallel Library,任务并行库)

 

并行框架(PFX)

1.并行框架基础

当前CPU技术达到瓶颈,而制造商将关注重点转移到提高内核技术上,而标准单线程代码并不会因此而自动提高运行速度。
利用多核提升程序性能通常需要对计算密集型代码进行一些处理:
1.将代码划分成块。
2.通过多线程并行执行这些代码块。
3.结果变为可用后,以线程安全和高性能的方式整合这些结果。
传统多线程结构虽然实现功能,但难度颇高且不方便,特别是划分和整理的步骤(本质问题是:多线程同时使用相同数据时,出于线程安全考虑进行锁定的常用策略会引发大量竞争)。
并行框架(Parallel Framework)专门用于在这些应用场景中提供帮助。

2.并行框架组成

PFX:高层由两个数据并行API组成:PLINQ或Parallel类。底层包含任务并行类和一组另外的结构为并行编程提供帮助。

 

基础并行语言集成查询(PLINQ)

语言集成查询(Language Integrated Query,LINQ)提供了一个简捷的语法来查询数据集合。而这种由一个线程顺序处理数据集合的方式我们称为顺序查询(sequential query)

并行语言集成查询(Parallel LINQ)LINQ并行版。它将顺序查询转换为并行查询,在内部使用任务,将集合中数据项的处理工作分散到多个CPU上,以并发处理多个数据项。

PLINQ将自动并行化本地的LINQ查询System.Linq.ParallelEnumerable类(它定义在System.Core.dll中,需要引用System.Linq)公开了所有标准LINQ操作符的并行版本。这些所有方法是依据System.Linq.ParallelQuery<T>扩展而来。

1.LINQ to PLINQ

要让LINQ查询调用并行版本,必须将自己的顺序查询(基于IEnumerable或IEnumerable<T>)转换成并行查询(基于ParallelQuery或ParallelQuery<T>),使用ParallelEnumerableAsParallel方法实现,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             ParallelQuery parallelQuery =
 7                 from n in numbers.AsParallel()//转换为并行
 8                 where n > 3
 9                 select n;
10             foreach (var item in parallelQuery)
11             {
12                 Console.WriteLine(item);
13             }
14             Console.ReadKey();
15         }
16     }

结果如下:使用Enumerable.Range生成的集合是顺序的,但是经过并行查询后顺序被打乱。

2.PLINQ to LINQ

 将执行并行查询的操作切换回执行顺序查询(并不常用),通过ParalleIEnumerableAsSequential实现。此时操作只由一个线程执行。

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             IEnumerable<int> enumerable = numbers.AsParallel().AsSequential().Where(c => c > 3);
 7             foreach (var item in enumerable)
 8             {
 9                 Console.WriteLine(item);
10             }
11             Console.ReadKey();
12         }
13     }

3.整合结果集(ForAll)

通常,一个LINQ查询的结果数据是让某个线程执行一个foreach来处理,此时只有一个线程遍历查询的所有结果,如果希望以并行方式处理查询结果,通过ParalleIEnumerableForAll方法处理查询,如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             IEnumerable<int> numbers = Enumerable.Range(1, 1000);
 6             (from n in numbers.AsParallel() where n > 3 select n).ForAll((d) =>
 7              {
 8                  d = d + 1000;
 9                  Console.WriteLine(d);//Console在此回损害性能,因为内部回对线程进行同步,此处因演示所以暂且一用
10              });
11             Console.ReadKey();
12         }
13     }

 执行结果如下:

 

解析PLINQ

1.PLINQ执行模型

如图所示:

2.异常处理

PLINQ的报错将以AggregateException形式被重抛,其中InnerExceeptions属性包含一个或多个真正异常,示例可看 异步编程(async&await)内的异常处理部分。

3.PLINQ结果的排序

 并行化查询当整理结果时不能保持初始化数据的原始次序。如果要保持序列的原始序列,可以通过在AsParallel之后调用AsOrdered来强制实现:

1             IEnumerable<int> numbers = Enumerable.Range(1, 10000);
2             var enumerable = numbers.AsParallel().Where(c => c > 3);

调用AsOrdered时,因为PLINQ要保持跟踪每个元素的原始位置,会导致性能损失。

调用AsUnordered,可以在后续的查询中低效AsOrdered产生的副作用,允许查询从调用AsUnordered时起更高效的执行。

4.PLINQ存在的局限与限制

1.若要使PLINQ发挥作用,必须具有一定数量的计算密集型工作可分配给工作者线程。大多数的LINQ to Objects查询执行速度很快,不仅没有必要并行化,而且划分、整理和协调额外线程的开销实际上会降低执行速度。而且查询若调用了非线程安全的方法,PLINQ的结果有可能不正确。

2.PLINQ能够并行化的内容还有些限制,以下查询运算符防止查询被并行化,除非源元素位于他们的元素索引位置:Take、TakeWhile、Skip和SkipWhileSelect、SelectMany和ElementAt的索引版本。

3.以下查询运算符是并行化的,但所使用的复杂划分策略有时可能比顺序处理的速度还要低:Join、GroupBy、GroupJonin、Distinct、Union、Intersect和Except。

5.PLINQ的结果

和普通LINQ查询一样,PLINQ查询也是延迟求值的。意味着执行只在开始使用时触发。但是列举结果集时和普通顺序查询有区别:

顺序查询:完全由使用者从输入序列中“拉取”每个元素。

并行查询:通常使用独立线程来获取序列中的元素,时间上比使用者需要它们时要提前,再通过查询链并行处理元素后将结果保存在一块缓存中,以便使用者按需取用。

注意:过早暂停结果列举,查询处理器也会暂停或结束,目的是不浪费CPU的时间或内存。在调用AsParallel之后调用WithMergeOptions可以调节PLINQ的缓冲行为。

6.如何使用PLINQ

为何优化将LINQ都并行化是不可取的,因为LINQ能解决大多数问题,执行速度也很快,因此无法从并行化中收益。

一种更好的方式是找出CPU密集的瓶颈,然后考虑通过LINQ的形式表达(这类重构,LINQ往往会使代码量变少,而且增强可读性)。

PLINQ十分适用于易并行问题。他还可以很好地处理结构化的阻塞任务。

PLINQ不适于镜像制作,因为将数百万元素整理为一个输出序列将带来瓶颈,相反将元素写入一个数组或托管内存块中,然后使用Parallel类或任务并行管理多线程是更好的选择。

 

Parallel类

Parallel类是对线程的一个很好的抽象。该类位于System.Threading.Tasks命名空间中,提供了数据和任务并行性

PFX通过Parallel类中的三个静态方法,提供了一种基本形式的结构化并行机制:

1.Parallel.Invoke

Parallel.Invoke用于并行执行一组委托,示例如下:

1         static void Main(string[] args)
2         {
3             Parallel.Invoke(
4                 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}"),
5                 () => Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId}")
6                 );
7             Console.ReadKey();
8         }

执行结果

Parallel.Invoke方法并行执行一组Action委托,然后等待它们完成。

1 public static void Invoke(params Action[] actions);

示例看起来像是创建和等待两个Task对象的一种捷径。但两者存在重要的区别:
如果传入一个包含数据量非常大的委托数组时,Parallel.Invoke方法仍然能高效工作,这是因为在底层,Parallel.Invoke方法是将大量元素划分成较小的块,分配给底层的Task执行,而不是每个委托创建一个独立Task

2.Parallel.For

Parallel.For执行C# for循环的并行化等价循环,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             //顺序循环
 6             {
 7                 for (int i = 0; i < 10; i++)
 8                 {
 9                     Test(i);
10                 }
11             }
12             Console.WriteLine("并行化for开始");
13             //顺序执行转换为并行化
14             {
15                 Parallel.For(0, 10, i => Test(i));
16             }
17             //顺序执行转换为并行化(更简单的方式)
18             {
19                 Parallel.For(0, 10, Test);
20             }
21             Console.ReadKey();
22         }
23         static void Test(int i)
24         {
25             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{i}");
26         }
27     }

结果如下:

3.Parallel.ForEach

Parallel.ForEach执行C# foreach循环的并行化等价循环,示例如下:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" };
 6             //顺序循环
 7             {
 8                 foreach (string num in data)
 9                 {
10                     Test(num);
11                 }
12             }
13             Console.WriteLine("并行化foreach开始");
14             //顺序执行转换为并行化
15             {
16                 Parallel.ForEach(data, num => Test(num));
17             }
18             Console.ReadKey();
19         }
20         static void Test(string str)
21         {
22             Console.WriteLine($"当前线程Id:{Thread.CurrentThread.ManagedThreadId},输出结果为:{str}");
23         }
24     }

执行结果:

注意:以上三个方法都会引发阻塞直到所有工作完成为止。和PLINQ一样,在出现未处理的异常之后,余下的工作者在它们当前的迭代之后停止,而一场将被抛回给调用者,并封装在一个AggregateException中。

4.索引&跳出(ParallelLoopState)

有时迭代索引很有用处,但是切忌不可像顺序循环的用法使用共享变量(循环内i++)的方式使用,因为共享变量值在并行上下文中是线程不安全的

同样的,因为并行ForForEach中的循环体是一个委托,所以无法使用break语句提前退出循环,必须调用ParallelLoopState对象上的BreakStop方法。

ForEach为例,ForEach重载的其中之一如下,它包含Acton的其中有三个参数(TSourec=子元素,ParallelLoopState=并行循环状态,long=索引):

1 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)

所以,想要得到索引和提前跳出的正确方式如示例:

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine