探究Visual Studio 2010中Parallel的使用
之前51cto曾經(jīng)報道過關于Visual Studio 2010中Parallel類實現(xiàn)并行計算,本文我們主要分析如何利用Parallel.For和Parallel.ForEach函數(shù)來并行化for循環(huán)和foreach循環(huán)。實際上,Parallel.For和Parallel.ForEach函數(shù)主要是針對“并行數(shù)據(jù)”的并行化操作,所謂并行數(shù)據(jù),就是整個數(shù)據(jù)集中數(shù)據(jù)單元是相互獨立的,可以同時進行處理。
在實際開發(fā)中,我們遇到的可以并行處理的不僅包括“并行數(shù)據(jù)”,還包括可以同時進行的“并行邏輯”。所謂“并行邏輯”,就是相互獨立,可以同時執(zhí)行的多個任務。比如,程序員陳良喬每天早上要做兩件事情:燒水洗臉和鍛煉身體。這兩件事情就是相互獨立可以并行的,也就是說他在燒水的時候可以同時鍛煉身體。在以前的單核時代,CPU在同一時間只能完成一件事情,那么陳良喬只能先燒水后鍛煉,或者是先鍛煉后燒水,這導致他上班總是遲到。
進入多核時代,CPU可以在同一時間完成多件事情了,借助.Net Framework 4.0中的Parallel類,我們可以方便地處理“并行邏輯”。現(xiàn)在,程序員陳良喬可以一邊鍛煉一邊燒水,再也沒有遲到過了。他逢人便說:“Parallel真是個好東西!自從用了它,我腰也不酸了,背也不疼了,編程更有勁兒了”
使用Parallel.Invoke處理并行邏輯
跟Parallel.For函數(shù)相似,Parallel.Invoke也是Parallel類的一個靜態(tài)函數(shù),它可以接受一個Action[]類型的對象作為參數(shù),這個對象,就是我們要執(zhí)行的任務。系統(tǒng)會根據(jù)代碼運行的硬件環(huán)境,主要是CPU運算核心的個數(shù),自動地進行線程的創(chuàng)建和分配。這有些類似于我們所熟悉的多線程開發(fā),通過為每個線程指定一個線程函數(shù)而讓多個任務同時進行,只是Parallel.Invoke函數(shù)簡化了線程的創(chuàng)建和分配等繁瑣的動作,我們只需要提供核心的線程函數(shù)就可以了。下面我們來看一個實際的例子。在上文中,我們介紹了程序員陳良喬起床的例子,在以前的單核時代,他起床大約是這個樣子的:
- // 串行式起床
- private static void GetUp()
- {
- Start("GetUp");
- // 先燒水
- boil();
- // 后鍛煉
- exercise();
- End("GetUp");
- }
- // 鍛煉
- private static void exercise()
- {
- Console.WriteLine("Exercise");
- Thread.Sleep(2000);
- Console.WriteLine("Finish Exercise");
- }
- // 燒水
- private static void boil()
- {
- Console.WriteLine("Boil");
- Thread.Sleep(3000);
- Console.WriteLine("Finish Boil");
- }
在單核時代,CPU在同一時間只能做一件事情,所以他只能先燒水,后鍛煉,這樣顯然會耽誤時間。一天,他又因為這事而遲到了,老板罵道,“你是豬啊,你不會用Parallel.Invoke一邊燒水一邊鍛煉啊?”于是,有了下面的并行式起床:
- // 并行式起床
- private static void ParallelGetUp()
- {
- Start("ParallelGetUp");
- // 在燒水的同時,鍛煉身體
- var steps = new Action[] { () => boil(), () => exercise() };
- Parallel.Invoke(steps);
- End("ParallelGetUp");
- }
通過Parallel.Invoke函數(shù),我們將一些相互獨立的任務同時執(zhí)行,實現(xiàn)了“并行邏輯”,也大大地提高了應用程序的性能和效率。從下面的截圖中,我們可以明顯地看出兩種方式的差別。串行方式所耗費的時間,是兩個步驟的時間總和,而并行方式所耗費的時間,大約是單個任務的耗時最長的哪一個。
#p#
對Parallel.Invoke進行控制
Parallel.Invoke提供了一個重載版本,它可以接受一個ParallelOptions對象作為參數(shù),對Parallel.Invoke的執(zhí)行進行控制。通過這個對象,我們可以控制并行的最大線程數(shù),各個任務是否取消執(zhí)行等等。例如,在一個智能化的家中,系統(tǒng)會判斷主人是否離開房間,如果主人離開了房間,則自動關閉屋子里的各種電器。利用Parallel.Invoke我們可以實現(xiàn)如下:
- public static void PInvokeCancel()
- {
- // 創(chuàng)建取消對象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對象,創(chuàng)建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設置最大線程數(shù)
- pOption.MaxDegreeOfParallelism = 2;
- // 創(chuàng)建一個守護監(jiān)視進程
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結束任務的執(zhí)行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 以ParallelOptions作為參數(shù),
- // 調用Parallel.Invoke
- Parallel.Invoke(pOption, () => ShutdownLights(pOption.CancellationToken),
- () => ShutdownComputer(pOption.CancellationToken));
- //輸出執(zhí)行結果
- Console.WriteLine("Lights and computer are tuned off.");
- }
- catch (Exception e)
- {
- Console.WriteLine(e.Message);
- }
- }
- private static void ShutdownLights(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Light is on. " );
- Thread.Sleep(1000);
- }
- }
- private static void ShutdownComputer(CancellationToken token)
- {
- while (!token.IsCancellationRequested)
- {
- Console.WriteLine("Computer is on." );
- Thread.Sleep(1000);
- }
- }
除了這種方式之外,ParallelOptions更多地應用在取消任務隊列中還未來得及執(zhí)行的任務。當我們限制了最大并發(fā)線程數(shù)的時候,如果需要通過Parallel.Invoke執(zhí)行的任務較多,則有可能部分任務在隊列中排隊而得不到及時的執(zhí)行,如果到了一定的條件這些任務還沒有執(zhí)行,我們可能取消這些任務。一個恰當?shù)默F(xiàn)實生活中的例子就是火車站買票。火車站買票的人很多,但是售票的窗口有限,當?shù)搅讼掳鄷r間后,窗口就不再售票了,也就是剩下的售票任務需要取消掉。我們可以用下面的代碼來模擬這樣一個場景:
- public static void PInvokeCancel()
- {
- // 創(chuàng)建取消對象
- CancellationTokenSource cts = new CancellationTokenSource();
- // 利用取消對象,創(chuàng)建ParallelOptions
- ParallelOptions pOption = new ParallelOptions() { CancellationToken = cts.Token };
- // 設置最大線程數(shù),也就相當于20個售票窗口
- pOption.MaxDegreeOfParallelism = 20;
- // 創(chuàng)建一個守護監(jiān)視進程
- // 當?shù)较掳鄷r間后就取消剩下的售票活動
- Task.Factory.StartNew(() =>
- {
- Console.WriteLine("Cancellation in 5 sec.");
- Thread.Sleep(5000);
- // 取消,結束任務的執(zhí)行
- cts.Cancel();
- Console.WriteLine("Canceled requested");
- });
- try
- {
- // 創(chuàng)建售票活動
- Action[] CustomerServices = CreateCustomerService(1000);
- // 以ParallelOptions作為參數(shù),
- // 調用Parallel.Invoke
- Parallel.Invoke(pOption, CustomerServices);
- }
- catch (Exception e)
- {
- // 當任務取消后,拋出一個異常
- Console.WriteLine(e.Message);
- }
- }
- // 創(chuàng)建售票的活動
- static Action[] CreateCustomerService(int n)
- {
- Action[] result = new Action[n];
- for (int i = 0; i < n; i++)
- {
- result[i] = () =>
- {
- Console.WriteLine("Customer Service {0}", Task.CurrentId);
- // 模擬售票需要的時間
- Thread.Sleep(2000);
- };
- }
- return result;
- }
#p#
并行任務之間的同步
有時候我們在處理并行任務的時候,各個任務之間需要同步,也就是同時執(zhí)行的并行任務,需要在共同到達某一個狀態(tài)的后再一共繼續(xù)執(zhí)行。我們可以舉一個現(xiàn)實生活中的例子。陳良喬,賈瑋和單春暉是好朋友,他們相約到電影院看《建國大業(yè)》。他們三個住在不同的地方,為了能一起買票進電影院,他們約好先在電影院門口的KFC會合,然后再一起進電影院。這其中就涉及到一個同步的問題:他們需要先在KFC會合。他們是從家里分別到KFC的,但是需要在KFC進行同步,等到三個人都到齊后在完成后后繼的動作,進電影院看電影。
為了完成并行任務之間的同步,.NET Framework中提供了一個類Barrier。顧名思義,Barrier就像一個關卡或者是剪票口一樣,通過Barrier類,我們可以管理并行任務的執(zhí)行,完成他們之間的同步。Barrier類的使用非常簡單,我們只需要在主線程中聲明一個Barrier對象,同時指明需要同步的任務數(shù)。然后,在需要進行同步的地方調用Barrier類的SignalAndWait函數(shù)就可以了。 當一個并行任務到達SignalAndWait后,它會暫停執(zhí)行,等待所有并行任務都到達同步點之后再繼續(xù)往下執(zhí)行。下面我們以一個實際的例子,來看看如何利用Barrier類完成看電影的同步問題。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace ParallelBarrier
- {
- class Program
- {
- // 用于同步的Barrier對象
- static Barrier sync;
- static void Main(string[] args)
- {
- // 創(chuàng)建Barrier對象,這里我們需要同步
- // 任務有三個
- sync = new Barrier(3);
- // 開始執(zhí)行并行任務
- var steps = new Action[] { () => gotothecinema("陳良喬", TimeSpan.FromSeconds(5) ),
- () => gotothecinema("賈瑋", TimeSpan.FromSeconds(2) ),
- () => gotothecinema("單春暉", TimeSpan.FromSeconds(4) )};
- Parallel.Invoke(steps);
- Console.ReadKey();
- }
- // 任務
- static void gotothecinema(string strName, TimeSpan timeToKFC )
- {
- Console.WriteLine("[{0}] 從家里出發(fā)。", strName);
- // 從家里到KFC
- Thread.Sleep(timeToKFC);
- Console.WriteLine("[{0}] 到達KFC。", strName);
- // 等待其他人到達
- sync.SignalAndWait();
- // 同步后,進行后繼動作
- Console.WriteLine("[{0}] 買票進電影院。", strName);
- }
- }
- }
在這段代碼中,我們首先創(chuàng)建了Barrier對象,因為在這里需要同步的任務有三個,所以創(chuàng)建Barrier對象時是的參數(shù)是3。然后就是使用Parallel.Invoke執(zhí)行并行任務。我們在并行任務gotothecinema中設置了一個同步點,在這里我們調用Barrier對象的SignalAndWait函數(shù),它表示當前任務已經(jīng)到達同步點并同時等待其他任務到達同步點。當所有任務都到達同步點之后,再繼續(xù)往下執(zhí)行。運行上面的程序,我們可以獲得這樣的輸出:
#p#
更復雜的任務之間的同步
我們在使用Barrier進行并行任務之間的同步時,有這樣一個缺陷,我們需要預先知道所有需要同步的并行任務的數(shù)目,如果這個數(shù)目是隨機的,就無法使用Barrier進行任務之間的同步了。并行任務數(shù)目不定這種情況很常見。我們還是來看上文中看電影的例子,每場進電影院看電影的觀眾數(shù)目是不固定的,那么退場的觀眾也是不固定的,甚至還有中途退場的。當所有觀眾都退場后,我們需要打掃電影院的衛(wèi)生。這里需要的同步的就是所有觀眾都退場。針對這種數(shù)目不定的多個并行任務,.NET Framework提供了CountdownEvent這個類來進行任務之間的同步。
就像它的名字一樣,CountdownEvent基于這樣一個簡單的規(guī)則:當有新的需要同步的任務產(chǎn)生時,就調用AddCount增加它的計數(shù),當有任務到達同步點是,就調用Signal函數(shù)減小它的計數(shù),當CountdownEvent的計數(shù)為零時,就表示所有需要同步的任務已經(jīng)完成,可以開始下一步任務了。下面我們利用CountdownEvent來模擬一下觀眾進場立場的情景。
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace CountdownEventDemo
- {
- // 觀眾類,用來表示一位觀眾
- class Customer
- {
- public Customer(int nID)
- {
- m_nID = nID;
- }
- // 觀眾的ID
- public int m_nID;
- }
- class Program
- {
- static void Main(string[] args)
- {
- // 創(chuàng)建CountdownEvent同步對象
- using (var countdown = new CountdownEvent(1))
- {
- // 產(chǎn)生一個隨機數(shù),表示觀眾的數(shù)目
- Random countRandom = new Random(DateTime.Now.Millisecond);
- int nCount = countRandom.Next(10);
- // 構造每一位觀眾看電影的任務
- Action[] seeafilm = new Action[ nCount ];
- for (int i = 0; i < nCount; i++)
- {
- // 構造Customer對象,表示觀眾
- Customer currentCustomer = new Customer( i+1 );
- seeafilm[i] = () =>
- {
- // 觀眾進場
- countdown.AddCount();
- Console.WriteLine("觀眾 {0} 進場。", currentCustomer.m_nID);
- // 模擬看電影的時間
- Thread.Sleep(countRandom.Next(3000,6000));
- // 觀眾退場
- countdown.Signal();
- Console.WriteLine("觀眾 {0} 退場。", currentCustomer.m_nID);
- };
- }
- //并行執(zhí)行任務
- Parallel.Invoke( seeafilm );
- // 在此同步,最后CountdownEvent的計數(shù)變?yōu)榱?
- countdown.Signal();
- countdown.Wait();
- }
- Console.WriteLine("所有觀眾退場,開始打掃衛(wèi)生。");
- Console.ReadKey();
- }
在這段代碼中,我們使用CountdownEvent進行隨機個數(shù)任務之間的同步。最后,我們可以得到這樣的輸出。
通過Parallel.Invoke函數(shù),我們可以輕松地將相互獨立的任務并行執(zhí)行,同時通過Barrier和CountdownEvent類進行任務之間的同步。這種并行計算的開發(fā)方式,比以前那種基于線程的并行計算開發(fā)方式簡便很多,解放了程序員的腦袋,讓他們可以把更多的腦力放到業(yè)務邏輯問題的解決之上。使用Parallel類,多快好省地開發(fā)并行計算應用程序。
【編輯推薦】