成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Reactive-MongoDB異步Java Driver解讀

大數據 MongoDB
從3.0 版本開始,MongoDB 開始提供異步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。

[[342496]]

 一、關于 異步驅動

從3.0 版本開始,MongoDB 開始提供異步方式的驅動(Java Async Driver),這為應用提供了一種更高性能的選擇。

但實質上,使用同步驅動(Java Sync Driver)的項目也不在少數,或許是因為先入為主的原因(同步Driver的文檔說明更加的完善),又或者是為了兼容舊的 MongoDB 版本。

無論如何,由于 Reactive 的發展,未來使用異步驅動應該是一個趨勢。

在使用 Async Driver 之前,需要對 Reactive 的概念有一些熟悉。

二、理解 Reactive (響應式)

響應式(Reactive)是一種異步的、面向數據流的開發方式,最早是來自于.NET 平臺上的 Reactive Extensions 庫,隨后被擴展為各種編程語言的實現。

在著名的 Reactive Manifesto(響應式宣言) 中,對 Reactive 定義了四個特征:

  • 及時響應(Responsive):系統能及時的響應請求。
  •  
  • 有韌性(Resilient):系統在出現異常時仍然可以響應,即支持容錯。
  • 有彈性(Elastic):在不同的負載下,系統可彈性伸縮來保證運行。
  • 消息驅動(Message Driven):不同組件之間使用異步消息傳遞來進行交互,并確保松耦合及相互隔離。

在響應式宣言的所定義的這些系統特征中,無一不與響應式的流有若干的關系,于是乎就有了 2013年發起的 響應式流規范(Reactive Stream Specification)。

https://www.reactive-streams.org/

其中,對于響應式流的處理環節又做了如下定義:

  • 具有處理無限數量的元素的能力,即允許流永不結束
  • 按序處理
  • 異步地傳遞元素
  • 實現非阻塞的負壓(back-pressure)

Java 平臺則是在 JDK 9 版本上發布了對 Reactive Streams 的支持。

下面介紹響應式流的幾個關鍵接口:

  • Publisher

Publisher 是數據的發布者。Publisher 接口只有一個方法 subscribe,用于添加數據的訂閱者,也就是 Subscriber。

  • Subscriber

Subscriber 是數據的訂閱者。Subscriber 接口有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到發布者之后,其 onSubscribe(Subscription s) 方法會被調用。

Subscription 表示的是當前的訂閱關系。

當訂閱成功后,可以使用 Subscription 的 request(long n) 方法來請求發布者發布 n 條數據。發布者可能產生3種不同的消息通知,分別對應 Subscriber 的另外3個回調方法。

數據通知:對應 onNext 方法,表示發布者產生的數據。

錯誤通知:對應 onError 方法,表示發布者產生了錯誤。

結束通知:對應 onComplete 方法,表示發布者已經完成了所有數據的發布。

在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知之后,不會再有其他通知產生。

  • Subscription

Subscription 表示的是一個訂閱關系。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要注意的是,在 cancel 方法調用之后,發布者仍然有可能繼續發布通知。但訂閱最終會被取消。

這幾個接口的關系如下圖所示:

圖片出處:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

 

MongoDB 的異步驅動為 mongo-java-driver-reactivestreams 組件,其實現了 Reactive Stream 的上述接口。

> 除了 reactivestream 之外,MongoDB 的異步驅動還包含 RxJava 等風格的版本,有興趣的讀者可以進一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下來,通過一個簡單的例子來演示一下 Reactive 方式的代碼風格:

A. 引入依賴

  1. org.mongodb 
  2.    mongodb-driver-reactivestreams 
  3.    1.11.0 

> 引入mongodb-driver-reactivestreams 將會自動添加 reactive-streams, bson, mongodb-driver-async組件

B. 連接數據庫

  1. //服務器實例表List servers =newArrayList(); 
  2. servers.add(newServerAddress("localhost",27018));//配置構建器MongoClientSettings.Builder settingsBuilder =MongoClientSettings.builder();//傳入服務器實例 
  3. settingsBuilder.applyToClusterSettings( 
  4.         builder -> builder.hosts(servers));//構建 Client 實例MongoClient mongoClient =MongoClients.create(settingsBuilder.build()); 

C. 實現文檔查詢

  1. //獲得數據庫對象MongoDatabase database = client.getDatabase(databaseName);//獲得集合MongoCollection collection = database.getCollection(collectionName);//異步返回PublisherFindPublisher publisher = collection.find();//訂閱實現 
  2. publisher.subscribe(newSubscriber(){ 
  3.     @Override 
  4.     publicvoid onSubscribe(Subscription s){ 
  5.         System.out.println("start..."); 
  6.         //執行請求 
  7.         s.request(Integer.MAX_VALUE); 
  8.  
  9.     } 
  10.     @Override 
  11.     publicvoid onNext(Document document){ 
  12.         //獲得文檔 
  13.         System.out.println("Document:"+ document.toJson()); 
  14.     } 
  15.  
  16.     @Override 
  17.     publicvoid onError(Throwable t){ 
  18.         System.out.println("error occurs."); 
  19.     } 
  20.  
  21.     @Override 
  22.     publicvoid onComplete(){ 
  23.         System.out.println("finished."); 
  24.     }}); 

注意到,與使用同步驅動不同的是,collection.find()方法返回的不是 Cursor,而是一個 FindPublisher對象,這是Publisher接口的一層擴展。

而且,在返回 Publisher 對象時,此時并沒有產生真正的數據庫IO請求。真正發起請求需要通過調用 Subscription.request()方法。

在上面的代碼中,為了讀取由 Publisher 產生的結果,通過自定義一個Subscriber,在onSubscribe 事件觸發時就執行 數據庫的請求,之后分別對 onNext、onError、onComplete進行處理。

盡管這種實現方式是純異步的,但在使用上比較繁瑣。試想如果對于每個數據庫操作都要完成一個Subscriber 邏輯,那么開發的工作量是巨大的。

為了盡可能復用重復的邏輯,可以對Subscriber的邏輯做一層封裝,包含如下功能:

  • 使用 List 容器對請求結果進行緩存
  • 實現阻塞等待結果的方法,可指定超時時間
  • 捕獲異常,在等待結果時拋出

代碼如下:

  1. publicclassObservableSubscriberimplementsSubscriber{ 
  2.  
  3.     //響應數據 
  4.     privatefinalList received; 
  5.     //錯誤信息 
  6.     privatefinalList errors; 
  7.     //等待對象 
  8.     privatefinalCountDownLatch latch; 
  9.     //訂閱器 
  10.     privatevolatileSubscription subscription; 
  11.     //是否完成 
  12.     privatevolatileboolean completed; 
  13.  
  14.     publicObservableSubscriber(){ 
  15.         this.received =newArrayList(); 
  16.         this.errors =newArrayList(); 
  17.         this.latch =newCountDownLatch(1); 
  18.     } 
  19.  
  20.     @Override 
  21.     publicvoid onSubscribe(finalSubscription s){ 
  22.         subscription = s; 
  23.     } 
  24.  
  25.     @Override 
  26.     publicvoid onNext(final T t){ 
  27.         received.add(t); 
  28.     } 
  29.  
  30.     @Override 
  31.     publicvoid onError(finalThrowable t){ 
  32.         errors.add(t); 
  33.         onComplete(); 
  34.     } 
  35.  
  36.     @Override 
  37.     publicvoid onComplete(){ 
  38.         completed =true
  39.         latch.countDown(); 
  40.     } 
  41.  
  42.     publicSubscription getSubscription(){ 
  43.         return subscription; 
  44.     } 
  45.  
  46.     publicList getReceived(){ 
  47.         return received; 
  48.     } 
  49.  
  50.     publicThrowable getError(){ 
  51.         if(errors.size()>0){ 
  52.             return errors.get(0); 
  53.         } 
  54.         returnnull; 
  55.     } 
  56.  
  57.     publicboolean isCompleted(){ 
  58.         return completed; 
  59.     } 
  60.  
  61.     /** 
  62.      * 阻塞一定時間等待結果 
  63.      * 
  64.      * @param timeout 
  65.      * @param unit 
  66.      * @return 
  67.      * @throws Throwable 
  68.      */ 
  69.     publicListget(finallong timeout,finalTimeUnit unit)throwsThrowable{ 
  70.         return await(timeout, unit).getReceived(); 
  71.     } 
  72.  
  73.     /** 
  74.      * 一直阻塞等待請求完成 
  75.      * 
  76.      * @return 
  77.      * @throws Throwable 
  78.      */ 
  79.     publicObservableSubscriber await()throwsThrowable{ 
  80.         return await(Long.MAX_VALUE,TimeUnit.MILLISECONDS); 
  81.     } 
  82.  
  83.     /** 
  84.      * 阻塞一定時間等待完成 
  85.      * 
  86.      * @param timeout 
  87.      * @param unit 
  88.      * @return 
  89.      * @throws Throwable 
  90.      */ 
  91.     publicObservableSubscriber await(finallong timeout,finalTimeUnit unit)throwsThrowable{ 
  92.         subscription.request(Integer.MAX_VALUE); 
  93.         if(!latch.await(timeout, unit)){ 
  94.             thrownewMongoTimeoutException("Publisher onComplete timed out"); 
  95.         } 
  96.         if(!errors.isEmpty()){ 
  97.             throw errors.get(0); 
  98.         } 
  99.         returnthis; 
  100.     }} 

借助這個基礎的工具類,我們對于文檔的異步操作就變得簡單多了。

比如對于文檔查詢的操作可以改造如下:

  1. ObservableSubscriber subscriber =newObservableSubscriber(); 
  2. collection.find().subscribe(subscriber);//結果處理 
  3. subscriber.get(15,TimeUnit.SECONDS).forEach( d ->{ 
  4.     System.out.println("Document:"+ d.toJson());}); 

當然,這個例子還有可以繼續完善,比如使用 List 作為緩存,則要考慮數據量的問題,避免將全部(或超量) 的文檔一次性轉入內存。

作者:唐卓章

華為技術專家,多年互聯網研發/架設經驗,關注NOSQL 中間件高可用及彈性擴展,在分布式系統架構性能優化方面有豐富的實踐經驗,目前從事物聯網平臺研發工作,致力于打造大容量高可用的物聯網服務。

本文轉載自微信公眾號「 Mongoing中文社區」,可以通過以下二維碼關注。轉載本文請聯系 Mongoing中文社區公眾號。

 

責任編輯:武曉燕 來源: Mongoing中文社區
相關推薦

2012-06-17 13:26:07

MongoDBJava

2020-05-29 07:20:00

Java8異步編程源碼解讀

2012-03-31 10:59:02

ASP.NET

2020-05-21 09:33:06

Reactive編程模型

2018-08-19 09:15:25

MongoDBGo 微服務

2024-05-06 00:00:00

RefReactive性能

2015-06-15 10:32:44

Java核心源碼解讀

2009-12-10 09:37:31

Linuxdriver編寫思考

2020-07-07 07:00:00

Spring WebFREST APIReactive AP

2009-09-10 14:18:59

Functional F#

2024-07-29 00:01:00

2011-12-21 10:46:17

Java

2012-03-26 10:14:25

JavaJava 8

2011-12-14 10:31:43

2009-11-09 09:56:46

Driver Stud

2023-03-27 10:46:53

SourceMap字符串代碼

2023-09-28 08:41:11

OpenAILLMLangChain

2024-11-26 07:20:25

2023-05-04 08:54:08

Toolformer語言模型

2021-03-22 08:45:30

異步編程Java
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品亚洲永久免费精品 | 欧美一区二区精品 | 亚洲一区在线日韩在线深爱 | 99国产精品久久久久 | 国产精品伦一区二区三级视频 | 国产中文一区二区三区 | 日韩欧美三区 | 4h影视| www.4虎影院| 日日夜夜天天久久 | 中日字幕大片在线播放 | 精品国产伦一区二区三区观看体验 | 激情欧美日韩一区二区 | 欧美日本韩国一区二区三区 | 高清久久久 | 91av在线免费观看 | 久久久青草婷婷精品综合日韩 | 亚洲福利一区二区 | 欧美激情在线精品一区二区三区 | 黑人精品欧美一区二区蜜桃 | 中文字幕一区二区三区乱码在线 | 国产精品1区2区 | 成人免费视频一区二区 | 久久精品国产精品青草 | 九九热这里只有精品在线观看 | 色接久久 | 国产精品一二区 | 国产日韩欧美 | 欧美精品1区2区3区 精品国产欧美一区二区 | 免费精品 | 在线播放精品视频 | 欧美一级在线观看 | 成人国产在线视频 | 欧美大片在线观看 | 国产亚洲精品久久久优势 | 久久一二三区 | 成人av激情| 国产一区二区小视频 | 激情六月丁香 | 国产在线观看不卡一区二区三区 | 91视频www.|