C# 4.0中的協變和逆變
在上一篇文章中,我們實現了一個簡單的爬蟲,并指出了這種方式的缺陷。現在,我們就來看一下,如何使用C# 4.0中所引入的“協變和逆變”特性來改進這種消息執行方式,這也是我認為在“普適Actor模型”中最合適的做法。這次,我們動真格的了,我們會一條一條地改進前文提出的缺陷。
協變和逆變
在以前的幾篇文章中,我們一直掛在嘴邊的說法便是消息于Actor類型的“耦合”太高。例如在簡單的爬蟲實現中,Crawler接受的消息為Crawl(Monitor, string),它的第一個參數為Monitor類型。但是在實際應用中,這個參數很可能需要是各種類型,唯一的“約束”只是它必須能夠接受一個ICrawlResponseHandler類型的消息,這樣我們就能把抓取的結果傳遞給它。至于操作Crawler對象的是Monitor還是Robot,還是我們單元測試時動態創建的Mock對象(這很重要),Crawler一概不會關心。
但就是這個約束,在以前的實現中,我們必須讓這個目標繼承Actor< ICrawlResponseHandler>,這樣它也就無法接受其他類型的消息了。例如Monitor還要負責一些查詢操作我們該怎么辦呢?幸運的是,在.NET 4.0(C# 4.0)中,我們只需要讓這個目標實現這樣一個接口即可:
- public interface IPort< out T>
- {
- void Post(Action< T> message);
- }
瞅到out關鍵字了沒?事實上,還有一個東西您在這里還沒有看到,這便是Action委托在.NET 4.0中的簽名:
- public delegate void Action< in T>(T obj);
就在這樣一個簡單的示例中,協變和逆變所需要的in和out都出現了。這意味著如果有兩個類型Parent和Child,其中Child是Parent的子類(或Parent接口的實現),那么實現了IPort< Child>的對象便可以自動賦值給IPort< Parent>類型的參數或引用1。使用代碼來說明問題可能會更清楚一些:
- public class Parent
- {
- public void ParentMethod() { };
- }
- public class Child : Parent { }
- static void Main(string[] args)
- {
- IPort< Child> childPort = new ChildPortType();
- IPort< Parent> parentPort = childPort; // 自動轉化
- parentPort.Post(p => p.ParentMethod()); // 可以接受Action< Parent>類型作為消息
- }
這意味著,我們可以把ICrawlRequestHandler和ICrawlResponseHandler類型寫成下面的形式:
- internal interface ICrawlRequestHandler
- {
- void Crawl(IPort< ICrawlResponseHandler> collector, string url);
- }
- internal interface ICrawlResponseHandler
- {
- void Succeeded(IPort< ICrawlRequestHandler> crawler, string url, string content, List< string> links);
- void Failed(IPort< ICrawlRequestHandler> crawler, string url, Exception ex);
- }
如今,Monitor和Crawler便可以寫成如下模樣:
- internal class Crawler : Actor< Action< Crawler>>, IPort< Crawler>, ICrawlRequestHandler
- {
- protected override void Receive(Action< Crawler> message) { message(this); }
- #region ICrawlRequestHandler Members
- void ICrawlRequestHandler.Crawl(IPort< ICrawlResponseHandler> collector, string url)
- {
- try
- {
- string content = new WebClient().DownloadString(url);
- var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast< Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- collector.Post(m => m.Succeeded(this, url, content, links));
- }
- catch (Exception ex)
- {
- collector.Post(m => m.Failed(this, url, ex));
- }
- }
- #endregion
- }
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>, ICrawlResponseHandler
- {
- protected override void Receive(Action< Monitor> message) { message(this); }
- #region ICrawlResponseHandler Members
- void ICrawlResponseHandler.Succeeded(IPort< ICrawlRequestHandler> crawler,
- string url, string content, List< string> links) { ... }
- void ICrawlResponseHandler.Failed(IPort< ICrawlRequestHandler> crawler,
- string url, Exception ex) { ... }
- #endregion
- private void DispatchCrawlingTasks(IPort< ICrawlRequestHandler> reusableCrawler)
- {
- if (this.m_readyToCrawl.Count < = 0)
- {
- this.WorkingCrawlerCount--;
- }
- var url = this.m_readyToCrawl.Dequeue();
- reusableCrawler.Post(c => c.Crawl(this, url));
- while (this.m_readyToCrawl.Count > 0 &&
- this.WorkingCrawlerCount < this.MaxCrawlerCount)
- {
- var newUrl = this.m_readyToCrawl.Dequeue();
- IPort< ICrawlRequestHandler> crawler = new Crawler();
- crawler.Post(c => c.Crawl(this, newUrl));
- this.WorkingCrawlerCount++;
- }
- }
- }
Monitor的具體實現和上篇文章區別不大,您可以參考文章末尾給出的完整代碼,并配合前文的分析來理解,這里我們只關注被標紅的兩行代碼。
在第一行中我們創建了一個Crawler類型的對象,并把它賦值給IPort< ICrawlerRequestHandler>類型的變量中。請注意,Crawler對象并沒有實現這個接口,它只是實現了IPort< Crawler>及ICrawlerRequestHandler。不過由于IPort< T>支持協變,于是IPort< Crawler>被安全地轉換成了IPort< ICrawlerRequestHandler>對象。
第二行中再次發生了協變:ICrawlRequestHandler.Crawel的第一個參數需要IPort< ICrawlResponseHandler>類型的對象,但是this是Monitor類型的,它并沒有實現這個接口。不過,和上面描述的一樣,由于IPort< T>支持協變,因此這樣的類型轉化是安全的,允許的。于是在Crawler類便可以操作一個“抽象”,而不是具體的Monitor類型來辦事了。
神奇不?但就是這么簡單。
“內部”消息控制
在上一篇文章中,我們還提出了Crawler實現的另一個缺點:沒有使用異步IO。WebClient本身的DownloadStringAsync方法可以進行異步下載,但是如果在異步完成的后續操作(如分析鏈接)會在IO線程池中運行,這樣我們就很難對任務所分配的運算能力進行控制。我們當時提出,可以把后續操作作為消息發送給Crawler本身,也就是進行“內部”消息控制——可惜的是,我們當時無法做到。不過現在,由于Crawler實現的是IPort< Crawler>接口,因此,我們可以把Crawler內部的任何方法作為消息傳遞給自身,如下:
- internal class Crawler : Actor< Action< Crawler>>, IPort< Crawler>, ICrawlRequestHandler
- {
- protected override void Receive(Action< Crawler> message) { message(this); }
- #region ICrawlRequestHandler Members
- public void Crawl(IPort< ICrawlResponseHandler> collector, string url)
- {
- WebClient client = new WebClient();
- client.DownloadStringCompleted += (sender, e) =>
- {
- if (e.Error == null)
- {
- this.Post(c => c.Crawled(collector, url, e.Result));
- }
- else
- {
- collector.Post(c => c.Failed(this, url, e.Error));
- }
- };
- client.DownloadStringAsync(new Uri(url));
- }
- private void Crawled(IPort< ICrawlResponseHandler> collector, string url, string content)
- {
- var matches = Regex.Matches(content, @"href=""(http://[^""]+)""").Cast< Match>();
- var links = matches.Select(m => m.Groups[1].Value).Distinct().ToList();
- collector.Post(c => c.Succeeded(this, url, content, links));
- }
- #endregion
- }
我們準備了一個private的Crawled方法,如果抓取成功了,我們會把這個方法的調用封裝在一條消息中重新發給自身。請注意,這是個私有方法,因此這里完全是在做“內部”消息控制。
開啟抓取任務
在上一篇文章中,我們為Monitor添加了一個Start方法,它的作用是啟動URL。我們知道,對單個Actor來說消息的處理是線程安全的,但是這個前提是使用“消息”傳遞的方式進行通信,如果直接調用Start公有方法,便會破壞這種線程安全特性。不過現在的Monitor已經不受接口的限制,可以自由接受任何它可以執行的消息,因此我們只要對外暴露一個Crawl方法即可:
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>,
- ICrawlResponseHandler,
- IStatisticRequestHandelr
- {
- ...
- public void Crawl(string url)
- {
- if (this.m_allUrls.Contains(url)) return;
- this.m_allUrls.Add(url);
- if (this.WorkingCrawlerCount < this.MaxCrawlerCount)
- {
- this.WorkingCrawlerCount++;
- IPort< ICrawlRequestHandler> crawler = new Crawler();
- crawler.Post(c => c.Crawl(this, url));
- }
- else
- {
- this.m_readyToCrawl.Enqueue(url);
- }
- }
- }
于是我們便可以向Monitor發送消息,讓其抓取特定的URL:
- string[] urls =
- {
- "http://www.cnblogs.com/dudu/",
- "http://www.cnblogs.com/TerryLee/",
- "http://www.cnblogs.com/JeffreyZhao/"
- };
- Random random = new Random(DateTime.Now.Millisecond);
- Monitor monitor = new Monitor(10);
- foreach (var url in urls)
- {
- var urlToCrawl = url;
- monitor.Post(m => m.Crawl(urlToCrawl));
- Thread.Sleep(random.Next(1000, 3000));
- }
上面的代碼會每隔1到3秒發出一個抓取請求。由于我們使用了消息傳遞的方式進行通信,因此對于Monitor來說,這一切都是線程安全的。我們可以隨時隨地為Monitor添加抓取任務。
接受多種消息(協議)
我們再觀察一下Monitor的簽名:
class Monitor : Actor< Action< Monitor>>, IPort< Monitor>, ICrawlResponseHandler
可以發現,如今的Monitor已經和它實現的協議沒有一對一的關系了。也就是說,它可以添加任意功能,可以接受任意類型的消息,我們只要讓它實現另一個接口即可。于是乎,我們再要一個“查詢”功能2:
- public interface IStatisticRequestHandelr
- {
- void GetCrawledCount(IPort< IStatisticResponseHandler> requester);
- void GetContent(IPort< IStatisticResponseHandler> requester, string url);
- }
- public interface IStatisticResponseHandler
- {
- void ReplyCrawledCount(int count);
- void ReplyContent(string url, string content);
- }
為了讓Monior支持查詢,我們還需要為它添加這樣的代碼:
- public class Monitor : Actor< Action< Monitor>>, IPort< Monitor>,
- ICrawlResponseHandler,
- IStatisticRequestHandelr
- {
- ...
- #region IStatisticRequestHandelr Members
- void IStatisticRequestHandelr.GetCrawledCount(IPort< IStatisticResponseHandler> requester)
- {
- requester.Post(r => r.ReplyCrawledCount(this.m_urlContent.Count));
- }
- void IStatisticRequestHandelr.GetContent(IPort< IStatisticResponseHandler> requester, string url)
- {
- string content;
- if (!this.m_urlContent.TryGetValue(url, out content))
- {
- content = null;
- }
- requester.Post(r => r.ReplyContent(url, content));
- }
- #endregion
- }
最后,我們來嘗試著使用這個“查詢”功能。首先,我們編寫一個測試用的TestStatisticPort類:
- public class TestStatisticPort : IPort< IStatisticResponseHandler>, IStatisticResponseHandler
- {
- private IPort< IStatisticRequestHandelr> m_statisticPort;
- public TestStatisticPort(IPort< IStatisticRequestHandelr> statisticPort)
- {
- this.m_statisticPort = statisticPort;
- }
- public void Start()
- {
- while (true)
- {
- Console.ReadLine();
- this.m_statisticPort.Post(s => s.GetCrawledCount(this));
- }
- }
- #region IPort< IStatisticResponseHandler> Members
- void IPort< IStatisticResponseHandler>.Post(Action< IStatisticResponseHandler> message)
- {
- message(this);
- }
- #endregion
- #region IStatisticResponseHandler Members
- void IStatisticResponseHandler.ReplyCrawledCount(int count)
- {
- Console.WriteLine("Crawled: {0}", count);
- }
- void IStatisticResponseHandler.ReplyContent(string url, string content) { ... }
- #endregion
- }
當調用Start方法時,控制臺將會等待用戶敲擊回車鍵。當按下回車鍵時,TestStatisticPort將會向Monitor發送一個IStatisticRequestHandler.GetCrawledCount消息。Monitor回復之后,屏幕上便會顯示當前已經抓取成功的URL數目。例如,我們可以編寫如下的測試代碼:
- static void Main(string[] args)
- {
- var monitor = new Monitor(5);
- monitor.Post(m => m.Crawl("http://www.cnblogs.com/"));
- TestStatisticPort testPort = new TestStatisticPort(monitor);
- testPort.Start();
- }
隨意敲擊幾下回車,結果如下:
總結
如今的做法,兼顧了強類型檢查,并使用C# 4.0中的協變和逆變特性,把上一篇文章中提出的問題解決了,不知您是否理解了這些內容?只可惜,我們在C# 3.0中還沒有協變和逆變。因此,我們還必須思考一個適合C# 3.0的做法。
順便一提,由于F#不支持協變和逆變,因此本文的做法無法在F#中使用。
注1:關于協變和逆變特性,我認為腦袋兄的這篇文章講的非常清楚——您看得頭暈了?是的,剛開始了解協變和逆變,以及它們之間的嵌套規則時我也頭暈,但是您在掌握之后就會發現,這的確是一個非常有用的特性。
注2:不知您是否發現,與之前internal的Crawl相關接口不同,Statistic相關接口是public的。我們在使用接口作為消息時,也可以通過這種辦法來控制哪些消息是可以對外暴露的。這也算是一種額外的收獲吧。
【編輯推薦】