|
今天我們就來談談平行擴展的關鍵組件之一PLINQ(Parallel LINQ)。微軟對PLINQ在Parallel FX中的定位是:PLINQ是TPL(Task Parallel Library)的一個高層應用。由于目前微軟對TPL研發(fā)的時間還比較短,這個社區(qū)預覽版的TPL版本的質量還是比較低的,而且微軟發(fā)布這個版本的目的也是為了更好的獲得開發(fā)社區(qū)的反饋信息,為了讓PLINQ有更高的質量,所以目前PLINQ還是基于ThreadPool的實現(xiàn),而不是基于TPL的API的。不過這只是內部實現(xiàn)不同而已,以后正式發(fā)布的時候PLINQ的對外接口的變更應該不會太大。
如何使用PLINQ?
1 添加System.Threading.dll到引用中
2 通過調用System.Linq.ParallelQuery.ASParallel擴展方法,將數(shù)據(jù)封裝到IParallelEnumerable中。
基于聲明方式的數(shù)據(jù)并行性
調用ASParallel擴展方法可使得編譯器使用System.Linq.ParallelEnumerable版本的查詢運算符,而不是System.Linq.Enumerable的。熟悉LINQ的人都知道,查詢表達式在編譯時都將轉化成對擴展方法的調用。對于LINQ而言,所有的擴展方法都封裝在System.Linq.Enumerable靜態(tài)類中,該類定義的都是針對IEnumerable數(shù)據(jù)源的擴展方法。而對于PLINQ,所有的擴展方法都封裝在System.Linq.ParallelEnumerable靜態(tài)類中,而且該類針對的都是IParallelEnumerable數(shù)據(jù)源的擴展方法,是System.Linq.Enumerable靜態(tài)類擴展方法的鏡像,只不過是通過并行方式對查詢進行評估。IParallelEnumerable接口從IEnumerable接口繼承,所以PLINQ也具有LINQ的延遲執(zhí)行的特點,以及執(zhí)行foreach。
為了讓大家清晰知道系統(tǒng)是如何將使用System.Linq.Enumerable版本的查詢運算符變成System.Linq.ParallelEnumerable版本的,我們先來看看System.Linq.ParallelQuery.ASParallel方法:
public static IParallelEnumerable ASParallel(IEnumerable source)
很顯然這個方法就是將IEnumerable的數(shù)據(jù)源轉化成IParallelEnumerable以使得使用平行版本的運算。這就是平行架構中通過ASParallel的聲明來使用并行使用數(shù)據(jù)的方式,也是PLINQ的編程模型。
所以這種基于聲明方式的數(shù)據(jù)并行性使得從LINQ到PLINQ的轉化非常容易,例如我們有這樣的LINQ代碼片段:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words select Process(word)).ToArray();
我們很容易就可以將其變成PLINQ版本:
string[] words = new[] { "Welcome", "to", "Beijing" };
(from word in words.ASParallel() select Process(word)).ToArray();
當然,如果你是通過查詢操作(就是直接調用靜態(tài)擴展函數(shù)),而不是使用查詢表達式(有時候查詢表達式?jīng)]有提供相應的表達式語句,例如C#3.0中沒有提供Skip和Take相對應的查詢表達式語句,我們只能通過直接調用查詢操作函數(shù))的情況下,將LINQ遷移到PLINQ,我們除了要調用ASParallel方法,還需要將直接調用Enumerable的方法改成對ParallelEnumerable的調用,例如:
IEnumerable data = ...;
var q = Enumerable.Select(Enumerable.OrderBy(
Enumerable.Where(data, (x) => p(x)),(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
要使用 PLINQ,必須按如下方式重新編寫該查詢:
IEnumerable data = ...;
var q = ParallelEnumerable.Select(ParallelEnumerable.OrderBy(
ParallelEnumerable.Where(data.ASParallel(), (x) => p(x)),
(x) => k(x)),(x) => f(x));
foreach (var e in q) a(e);
注意:有些查詢運算符是二元的,使用兩個IEnumerable作為輸入?yún)?shù)(例如Join),最左邊數(shù)據(jù)源的類型決定了使用LINQ還是PLINQ,因此你只需要在第一個數(shù)據(jù)源上調用ASParallel便能使查詢并行查詢,例如:
IEnumerable leftData = ..., rightData = ...;
var q = from x in leftData.ASParallel()
join y in rightData on x.a == y.b
select f(x, y);
PLINQ查詢處理模型
1 管道式(Pipelined Processing)
該模型是將查詢線程(運行查詢的線程)和枚舉線程(進行迭代輸出結果的線程)分開,處理器在有元素可用時就運行枚舉將輸出應用于foreach循環(huán)。也就是說不需要等到所有查詢結果完成才進行枚舉輸出,只要查詢結果能產(chǎn)生一個最終輸出結果時就會進行枚舉輸出。簡單說就是邊查詢邊輸出,這個模型的好處就是允許對輸出做更多增量處理,從而減少為了存放結果所需的內存,壞處是由于中間結果需要更多的同步而降低性能。PLINQ缺省的是采用此模型。
2 準動態(tài)(stop-and-go Processing)
在這種模型下啟動枚舉的線程會聯(lián)結所有其他線程來執(zhí)行查詢。在所有查詢結果完成之后才進行枚舉輸出。這種模型的效率比管道式稍微高一些,因為這種模型需要同步的系統(tǒng)開銷減少了。在對查詢進行ToArray,ToList或者排序聚合操作時,系統(tǒng)將自動轉為這種模型處理。因為這些操作都需要產(chǎn)生所有輸出。具體在代碼中是通過調用IParallelEnumerable接口的GetEnumerator的重載方法并且傳遞false參數(shù)來使用這種模型的,該方法如下:
IEnumerable GetEnumerator(bool usePipelining)
3 反轉枚舉(Inverted Enumeration)
該模型會為并行運行的PLINQ提供一個Lambda表達式,集合中的每個元素都運行一次。這是最高效的一種模型,因為它將高成本運算的控制反轉給Lambda函數(shù)了。但注意的是在Lambda函數(shù)中不能使用共享狀態(tài),否則可能會導致系統(tǒng)崩潰,因為PLINQ不知道如何進行并發(fā)同步控制。但有些不同的是,此模型不能簡單使用foreach循環(huán),而必須使用特殊的ForAll API.例如:
string[] words = new[] { "Welcome", "to", "Beijing","OK","Hua","Ying","Ni" ,"2008"};
var lazyBeeQuery = from word in words.ASParallel() select word;
lazyBeeQuery.ForAll<string>(word => { Console.WriteLine(word); });
在我的機器上(雙核)的輸出結果是:
Hua
Welcome
Ying
Ni
2008
to
Beijing
OK
細心的人可能會發(fā)現(xiàn)其順序和數(shù)組的順序不同,這就是PLINQ并行運行的結果,可能在您的機器上可能結果又不同。
同時ASParallel重載方法提供一個參數(shù)來控制查詢的并行度(就是多少個線程被用于查詢),該方法定義如下:
public static IParallelEnumerable ASParallel(
IEnumerable source,int degreeOfParallelism)
如果你希望在使用管道式處理時有一個單獨的線程專門用于枚舉輸出,你可以將degreeOfParallelism參數(shù)賦值為(Enviorment.ProcessCount-1)即可。
輸出結果順序
由于并行的原因輸出結果可能和原有的數(shù)據(jù)在數(shù)據(jù)源中的順序不一樣,例如:
string[] words = new[] { "Welcome", "to", "Beijing","OK","Hua","Ying","Ni" ,"2008"};
var lazyBeeQuery = from word in words.ASParallel() select word;
foreach (string word in lazyBeeQuery)
{
Console.WriteLine(word);
}
這時的輸出結果可能是:
Welcome
Hua
to
Ying
Beijing
Ni
OK
2008
如果我們希望輸出結果和原有的數(shù)據(jù)在數(shù)據(jù)源中的順序保持一致,可以使用ASParallel的帶有ParallelQueryOptions.PreserveOrdering參數(shù)的重載版本,例如上例中就可以改成如下就可以使輸出順序和原有結構一致:
var lazyBeeQuery = from word in words.ASParallel(ParallelQueryOptions.PreserveOrdering) select word;
注意:1 ParallelQueryOptions.PreserveOrdering參數(shù)的使用對ForAll API不起作用(目前是這樣,以后不知道是否會做改動)。
2 使用這個保留順序的選項會影響查詢的性能和擴展能力,因為PLINQ將從邏輯上在末尾增加一個排序操作,而排序是一個無法隨處理器數(shù)量的增加而顯著提高性能的運算符,所以要在必須的時候才用。
并發(fā)異常
在順序執(zhí)行LINQ的時候,任何異常都會停止后續(xù)查詢的運行。但在PLINQ中,由于是并行運行的,某一線程產(chǎn)生了異常,系統(tǒng)會嘗試盡快終止其他線程的運行,在所有線程關閉之后,產(chǎn)生的所有異常將會放到System.Threading.AggregateException中,你可以通過InnerExceptions屬性來得到所有異常只讀集合ReadOnlyCollection。
it知識庫:初識Parallel Extensions之PLINQ,轉載需保留來源!
鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯(lián)系我們修改或刪除,多謝。