如何正確的使用Java事件通知
通過實現觀察者模式來提供 Java 事件通知(Java event notification)似乎不是件什么難事兒,但這過程中也很容易就掉進一些陷阱。本文介紹了我自己在各種情形下,不小心制造的一些常見錯誤。
Java 事件通知
讓我們從一個最簡單的 Java Bean 開始,它叫StateHolder,里面封裝了一個私有的 int 型屬性 state 和常見的訪問方法:
- public class StateHolder {
- private int state;
- public int getState() {
- return state;
- }
- public void setState( int state ) {
- this.state = state;
- }
- }
現在假設我們決定要 Java bean 給已注冊的觀察者廣播一條 狀態已改變 事件。小菜一碟!!!定義一個最簡單的事件和監聽器簡直擼起袖子就來……
- // change event to broadcast
- public class StateEvent {
- public final int oldState;
- public final int newState;
- StateEvent( int oldState, int newState ) {
- this.oldState = oldState;
- this.newState = newState;
- }
- }
- // observer interface
- public interface StateListener {
- void stateChanged( StateEvent event );
- }
接下來,我們需要在 StateHolder 的實例里注冊 StatListeners。
- public class StateHolder {
- private final Set listeners = new HashSet<>();
- [...]
- public void addStateListener( StateListener listener ) {
- listeners.add( listener );
- }
- public void removeStateListener( StateListener listener ) {
- listeners.remove( listener );
- }
- }
***一個要點,需要調整一下StateHolder#setState這個方法,來確保每次狀態有變時發出的通知,都代表這個狀態真的相對于上次產生變化了:
- public void setState( int state ) {
- int oldState = this.state;
- this.state = state;
- if( oldState != state ) {
- broadcast( new StateEvent( oldState, state ) );
- }
- }
- private void broadcast( StateEvent stateEvent ) {
- for( StateListener listener : listeners ) {
- listener.stateChanged( stateEvent );
- }
- }
搞定了!要的就是這些。為了顯得專(zhuang)業(bi)一點,我們可能還甚至為此實現了測試驅動,并為嚴密的代碼覆蓋率和那根表示測試通過的小綠條而洋洋自得。而且不管怎么樣,這不就是我從網上那些教程里面學來的寫法嗎?
那么問題來了:這個解決辦法是有缺陷的……
#p#
并發修改
像上面那樣寫 StateHolder 很容易遇到并發修改異常(ConcurrentModificationException),即使僅僅限制在一個單線程里面用也不例外。但究竟是誰導致了這個異常,它又為什么會發生呢?
- java.util.ConcurrentModificationException
- at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
- at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
- at com.codeaffine.events.StateProvider.broadcast(StateProvider.java:60)
- at com.codeaffine.events.StateProvider.setState(StateProvider.java:55)
- at com.codeaffine.events.StateProvider.main(StateProvider.java:122)
乍一看這個錯誤堆棧包含的信息,異常是由我們用到的一個 HashMap 的 Iterator 拋出的,可在我們的代碼里沒有用到任何的迭代器,不是嗎?好吧,其實我們用到了。要知道,寫在 broadcast 方法里的 for each 結構,實際上在編譯時是會被轉變成一個迭代循環的。
因為在事件廣播過程中,如果一個監聽器試圖從 StateHolder 實例里面把自己移除,就有可能導致 ConcurrentModificationException。所以比起在原先的數據結構上進行操作,有一個解決辦法就是我們可以在這組監聽器的快照(snapshot)上進行迭代循環。
這樣一來,“移除監聽器”這一操作就不會再干擾事件廣播機制了(但要注意的是通知還是會有輕微的語義變化,因為當 broadcast 方法被執行的時候,這樣的移除操作并不會被快照體現出來):
- private void broadcast( StateEvent stateEvent ) {
- Set snapshot = new HashSet<>( listeners );
- for( StateListener listener : snapshot ) {
- listener.stateChanged( stateEvent );
- }
- }
但是,如果 StateHolder 被用在一個多線程的環境里呢?
#p#
同步
要再多線程的環境里使用 StateHolder ,它就必須是線程安全的。不過這也很容易實現,給我們類里面的每個方法加上 synchronized 就搞定了,不是嗎?
- public class StateHolder {
- public synchronized void addStateListener( StateListener listener ) { [...]
- public synchronized void removeStateListener( StateListener listener ) { [...]
- public synchronized int getState() { [...]
- public synchronized void setState( int state ) { [...]
現在我們讀寫操作 一個 StateHolder 實例的時候都有了內置鎖(Intrinsic Lock) 做保證,這使得公有方法具有了原子性,也確保了正確的狀態對不同的線程都可見。任務完成!
才怪……盡管這樣的實現是線程安全的,但一旦程序要調用它,就需要承擔死鎖的風險。
設想一下如下這種情形:線程 A 改變了 StateHolder 的狀態 S,在向各個監聽器(listener)廣播這個狀態 S 的時候,線程 B 視圖訪問狀態 S ,然后被阻塞。如果 B 持有了一個對象的同步鎖,這個對象又是關于狀態 S的,并且本來是要廣播給眾多監聽器當中的某一個的,這種情況下我們就會遇到一個死鎖。
這就是為什么我們要縮小狀態訪問的同步性,在一個“保護通道”里面來廣播這個事件:
- public class StateHolder {
- private final Set listeners = new HashSet<>();
- private int state;
- public void addStateListener( StateListener listener ) {
- synchronized( listeners ) {
- listeners.add( listener );
- }
- }
- public void removeStateListener( StateListener listener ) {
- synchronized( listeners ) {
- listeners.remove( listener );
- }
- }
- public int getState() {
- synchronized( listeners ) {
- return state;
- }
- }
- public void setState( int state ) {
- int oldState = this.state;
- synchronized( listeners ) {
- this.state = state;
- }
- if( oldState != state ) {
- broadcast( new StateEvent( oldState, state ) );
- }
- }
- private void broadcast( StateEvent stateEvent ) {
- Set snapshot;
- synchronized( listeners ) {
- snapshot = new HashSet<>( listeners );
- }
- for( StateListener listener : snapshot ) {
- listener.stateChanged( stateEvent );
- }
- }
- }
上面這段代碼是在之前的基礎上稍加改進來實現的,通過使用 Set 實例作為內部鎖來提供合適(但也有些過時)的同步性,監聽者的通知事件在保護塊之外發生,這樣就避免了一種死等的可能。
注意: 由于系統并發操作的天性,這個解決方案并不能保證變化通知按照他們產生的順序依次到達監聽器。如果觀察者一側對實際狀態的準確性有較高要求,可以考慮把 StateHolder 作為你事件對象的來源。
如果事件順序這在你的程序里顯得至關重要,有一個辦法就是可以考慮用一個線程安全的先入先出(FIFO)結構,連同監聽器的快照一起,在 setState 方法的保護塊里緩沖你的對象。只要 FIFO 結構不是空的,一個獨立的線程就可以從一個不受保護的區域塊里觸發實際事件(生產者-消費者模式),這樣理論上就可以不必冒著死鎖的危險還能確保一切按照時間順序進行。我說理論上,是因為到目前為止我也還沒親自這么試過。。
鑒于前面已經實現的,我們可以用諸如 CopyOnWriteArraySet 和 AtomicInteger 來寫我們的這個線程安全類,從而使這個解決方案不至于那么復雜:
- public class StateHolder {
- private final Set listeners = new CopyOnWriteArraySet<>();
- private final AtomicInteger state = new AtomicInteger();
- public void addStateListener( StateListener listener ) {
- listeners.add( listener );
- }
- public void removeStateListener( StateListener listener ) {
- listeners.remove( listener );
- }
- public int getState() {
- return state.get();
- }
- public void setState( int state ) {
- int oldState = this.state.getAndSet( state );
- if( oldState != state ) {
- broadcast( new StateEvent( oldState, state ) );
- }
- }
- private void broadcast( StateEvent stateEvent ) {
- for( StateListener listener : listeners ) {
- listener.stateChanged( stateEvent );
- }
- }
- }
既然 CopyOnWriteArraySet 和 AtomicInteger 已經是線程安全的了,我們不再需要上面提到的那樣一個“保護塊”。但是等一下!我們剛剛不是在學到應該用一個快照來廣播事件,來替代用一個隱形的迭代器在原集合(Set)里面做循環嘛?
這或許有些繞腦子,但是由 CopyOnWriteArraySet 提供的 Iterator(迭代器)里面已經有了一個“快照“。CopyOnWriteXXX 這樣的集合就是被特別設計在這種情況下大顯身手的——它在小長度的場景下會很高效,而針對頻繁迭代和只有少量內容修改的場景也做了優化。這就意味著我們的代碼是安全的。
隨著 Java 8 的發布,broadcast 方法可以因為Iterable#forEach 和 lambdas表達式的結合使用而變得更加簡潔,代碼當然也是同樣安全,因為迭代依然表現為在“快照”中進行:
- private void broadcast( StateEvent stateEvent ) {
- listeners.forEach( listener -> listener.stateChanged( stateEvent ) );
- }
#p#
異常處理
本文的***介紹了如何處理拋出 RuntimeExceptions 的那些損壞的監聽器。盡管我總是嚴格對待 fail-fast 錯誤機制,但在這種情況下讓這個異常得不到處理是不合適的。尤其考慮到這種實現經常在一些多線程環境里被用到。
損壞的監聽器會有兩種方式來破壞系統:***,它會阻止通知向觀察者的傳達過程;第二,它會傷害那些沒有準備處理好這類問題的調用線程。總而言之它能夠導致多種莫名其妙的故障,并且有的還難以追溯其原因,
因此,把每一個通知區域用一個 try-catch 塊來保護起來會顯得比較有用。
- private void broadcast( StateEvent stateEvent ) {
- listeners.forEach( listener -> notifySafely( stateEvent, listener ) );
- }
- private void notifySafely( StateEvent stateEvent, StateListener listener ) {
- try {
- listener.stateChanged( stateEvent );
- } catch( RuntimeException unexpected ) {
- // appropriate exception handling goes here...
- }
- }
總結
綜上所述,Java 的事件通知里面有一些基本要點你還是必須得記住的。在事件通知過程中,要確保在監聽器集合的快照里做迭代,保證事件通知在同步塊之外,并且在合適的時候再安全地通知監聽器。
但愿我寫的這些讓你覺得通俗易懂,最起碼尤其在并發這一節不要再被搞得一頭霧水。如果你發現了文章中的錯誤或者有其它的點子想分享,盡管在文章下面的評論里告訴我吧。