Java并行編程:從并行任務(wù)集獲取反饋
在并行任務(wù)啟動(dòng)后,強(qiáng)制性地從并行任務(wù)得到反饋。
假想有一個(gè)程序,可以發(fā)送批郵件,還使用了多線程機(jī)制。你想知道有多少郵件成功發(fā)送嗎?你想知道在實(shí)際發(fā)送過程期間,這個(gè)批處理工作的實(shí)時(shí)進(jìn)展嗎?
要實(shí)現(xiàn)多線程的這種反饋,我們可以使用Callable接口。此接口的工作方式基本上與Runnable相同,但是執(zhí)行方法(call())會(huì)返回一個(gè)值,該值反映了執(zhí)行計(jì)算的結(jié)果。
- package com.ricardozuasti;
- import java.util.concurrent.Callable;
- public class FictionalEmailSender implements Callable<Boolean>{
- private String to;
- private String subject;
- private String body;
- public FictionalEmailSender(String to, String subject, String body){
- this.to = to;
- this.subject = subject;
- this.body = body;
- }
- @Override
- public Boolean call() throws InterruptedException {
- // 在0~0.5秒間模擬發(fā)送郵件
- Thread.sleep(Math.round(Math.random()*0.5*1000));
- // 假設(shè)我們有80%的幾率成功發(fā)送郵件
- if(Math.random()>0.2){
- return true;
- }else{
- return false;
- }
- }
- }
注意:Callable接口可用于返回任意數(shù)據(jù)類型,因此我們的任務(wù)可以返回我們需要的任何信息。
現(xiàn)在,我們使用一個(gè)線程池ExecutorService來發(fā)送郵件,由于我們的任務(wù)是以Callable接口實(shí)現(xiàn)的,我們提交執(zhí)行的每個(gè)新任務(wù),都會(huì)得到一個(gè)Future引用。注意我們要使用直接的構(gòu)造器創(chuàng)建ExecutorService,而不是使用來自Executors的工具方法創(chuàng)建。這是因?yàn)槭褂弥付怲hreadPoolExecutor提供了一些方法可以派上用場(chǎng)。
- package com.ricardozuasti;
- import java.util.concurrent.Future;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.ArrayList;
- import java.util.List;
- public class Concurrency2 {
- public static void main(String[] args){
- try{
- ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1,
- TimeUnit.SECONDS, new LinkedBlockingQueue());
- List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);
- // 發(fā)送垃圾郵件, 用戶名假設(shè)為4位數(shù)字
- for(int i=1000; i<10000; i++){
- futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",
- "Knock, knock, Neo", "The Matrix has you...")));
- }
- // 提交所有的任務(wù)后,關(guān)閉executor
- System.out.println("Starting shutdown...");
- executor.shutdown();
- // 每秒鐘打印執(zhí)行進(jìn)度
- while(!executor.isTerminated()){
- executor.awaitTermination(1, TimeUnit.SECONDS);
- int progress = Math.round((executor.getCompletedTaskCount()
- *100)/executor.getTaskCount());
- System.out.println(progress + "% done (" +
- executor.getCompletedTaskCount() + " emails have been sent).");
- }
- // 現(xiàn)在所有郵件已發(fā)送完, 檢查futures, 看成功發(fā)送的郵件有多少
- int errorCount = 0;
- int successCount = 0;
- for(Future<Boolean> future : futures){
- if(future.get()){
- successCount++;
- }else{
- errorCount++;
- }
- }
- System.out.println(successCount + " emails were successfully sent, but " +
- errorCount + " failed.");
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- }
執(zhí)行這個(gè)類,輸出結(jié)果如下:
- Starting shutdown...
- 1% done (118 emails have been sent).
- 2% done (232 emails have been sent).
- 3% done (358 emails have been sent).
- 5% done (478 emails have been sent).
- 6% done (587 emails have been sent).
- 7% done (718 emails have been sent).
- 9% done (850 emails have been sent).
- 10% done (969 emails have been sent).
- ……
所有的任務(wù)都由ExecutorService提交,我們開始它的關(guān)閉(防止提交新任務(wù))并使用一個(gè)循環(huán)(實(shí)時(shí)場(chǎng)景,可能你會(huì)繼續(xù)做其它的事情)來等待,直至所有任務(wù)都被執(zhí)行完成、計(jì)算和打印當(dāng)前每次迭代的進(jìn)度。
注意,你可以存儲(chǔ)executor引用,也可以在任意時(shí)間從其它線程查詢它的計(jì)算結(jié)果和報(bào)告進(jìn)程進(jìn)度。
最后,使用Future集合引用,我們得到ExecutorService提交的每個(gè)Callable接口,通知成功發(fā)送的郵件數(shù)量和發(fā)送失敗的郵件數(shù)量。
此結(jié)構(gòu)不但易于使用,還使得相關(guān)性得到清晰的隔離,在調(diào)度程序和實(shí)際任務(wù)之間提供了一個(gè)預(yù)定義的通信機(jī)制。
原文鏈接:http://blog.csdn.net/chszs/article/details/7418880
【編輯推薦】