成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

基于okhttp和RxJava封裝的自動重連的WebSocket

系統
RxWebSocket是一個基于okhttp和RxJava封裝的WebSocket客戶端,此庫的核心特點是 除了手動關閉WebSocket(就是RxJava取消訂閱),WebSocket在異常關閉的時候(onFailure,發生異常,如WebSocketException等等)。

[[414380]]

想了解更多內容,請訪問:

51CTO和華為官方合作共建的鴻蒙技術社區

https://harmonyos.51cto.com

一. 概述

RxWebSocket是一個基于okhttp和RxJava封裝的WebSocket客戶端,此庫的核心特點是 除了手動關閉WebSocket(就是RxJava取消訂閱),WebSocket在異常關閉的時候(onFailure,發生異常,如WebSocketException等等),會自動重連,永不斷連.其次,對WebSocket做的緩存處理,同一個URL,共享一個WebSocket.

由于是基于RxJava封裝,所以帶來了無限可能,可以和RxBinding,Rxlifecycle一起使用,方便對WebSocket的管理.

效果圖


項目已經上傳Jcenter,依賴方法:

  1. //本項目 
  2. compile 'io.github.dzsf:RxWebSocket:1.0.0' 

二. 使用方法

0.初始化,可以也忽略直接使用.

如果你想使用自己的okhttpClient:

  1. OkHttpClient yourClient = new OkHttpClient(); 
  2. RxWebSocketUtil.getInstance().setClient(yourClient); 

 是否打印日志:

  1. RxWebSocketUtil.getInstance().setShowLog(BuildConfig.DEBUG); 

1.獲取一個WebSocket,接收消息,多種方式:

  1. RxWebSocketUtil.getInstance().getWebSocketInfo(url) 
  2.         .subscribe(new Action1<WebSocketInfo>() { 
  3.             @Override 
  4.             public void call(WebSocketInfo webSocketInfo) { 
  5.                 mWebSocket = webSocketInfo.getWebSocket(); 
  6.                 Log.d("MainActivity", webSocketInfo.getString()); 
  7.                 Log.d("MainActivity""ByteString:" + webSocketInfo.getByteString()); 
  8.             } 
  9.         }); 
  10.  
  11. mWebSocket.send("hello word"); 
  12.  
  13. //get StringMsg 
  14. RxWebSocketUtil.getInstance().getWebSocketString(url) 
  15.         .subscribe(new Action1<String>() { 
  16.             @Override 
  17.             public void call(String s) { 
  18.             } 
  19.         }); 
  20. // get ByteString 
  21. RxWebSocketUtil.getInstance().getWebSocketByteString(url) 
  22.         .subscribe(new Action1<ByteString>() { 
  23.             @Override 
  24.             public void call(ByteString byteString) { 
  25.             } 
  26.         }); 
  27. //get WebSocket 
  28. RxWebSocketUtil.getInstance().getWebSocket(url) 
  29.         .subscribe(new Action1<WebSocket>() { 
  30.             @Override 
  31.             public void call(WebSocket webSocket) { 
  32.             } 
  33.         }); 
  34. // 帶timeout的WebSocket,當在指定時間內沒有收到消息,就重連WebSocket.為了適配小米平板. 
  35. RxWebSocketUtil.getInstance().getWebSocketInfo(url,10, TimeUnit.SECONDS) 
  36.         .subscribe(new Action1<WebSocketInfo>() { 
  37.             @Override 
  38.             public void call(WebSocketInfo webSocketInfo) { 
  39.             } 
  40.         }); 

2.發送消息:

  1. //用WebSocket的引用直接發 
  2. mWebSocket.send("hello word"); 
  3.  
  4. //url 對應的WebSocket 必須打開,否則報錯 
  5. RxWebSocket.send(sendUrl, "hello"); 
  6. RxWebSocket.send(sendUrl, ByteString.EMPTY); 
  7.  
  8. //異步發送,若WebSocket已經打開,直接發送,若沒有打開,打開一個WebSocket發送完數據,直接關閉. 
  9. RxWebSocket.asyncSend(sendUrl, "hello"); 
  10. RxWebSocket.asyncSend(sendUrl, ByteString.EMPTY); 

3.關閉WebSocket:

項目是依托RxJava實現的,所以關閉WebSocket的方法也就是在適當的時候注銷 Observable,項目里的demo里,寫了一個簡單的lifecycle,將Observable生命綁定到Activity的onDestroy,自動注銷.代碼細節請看demo,因為內部實現了同一個URL的WebSocket共享機制,所以當外部所有持有這個URL的Observable都注銷后,這個WebSocket連接就會自動關閉.請看原理解析部分

  1. //注意取消訂閱,有多種方式,比如 rxlifecycle 
  2. mSubscription = RxWebSocketUtil.getInstance().getWebSocketInfo(url) 
  3.         .subscribe(new Action1<WebSocketInfo>() { 
  4.             @Override 
  5.             public void call(WebSocketInfo webSocketInfo) { 
  6.                 mWebSocket = webSocketInfo.getWebSocket(); 
  7.  
  8.                 if (webSocketInfo.isOnOpen()) { 
  9.                     Log.d("MainActivity"" on WebSocket open"); 
  10.                 } else { 
  11.                     String string = webSocketInfo.getString(); 
  12.  
  13.                     if (string != null) { 
  14.                         Log.d("MainActivity", string); 
  15.                         textview.setText(Html.fromHtml(string)); 
  16.                     } 
  17.  
  18.                     ByteString byteString = webSocketInfo.getByteString(); 
  19.  
  20.                     if (byteString != null) { 
  21.                         Log.d("MainActivity"
  22.                             "webSocketInfo.getByteString():" + 
  23.                             byteString); 
  24.                     } 
  25.                 } 
  26.             } 
  27.         }); 
  28.  
  29. //注銷 
  30. if (mSubscription != null) { 
  31.     mSubscription.unsubscribe(); 
  32.  
  33. //lifecycle注銷,詳情看demo 
  34. RxWebSocketUtil.getInstance().getWebSocketString(url) 
  35.             .compose(this.<String>bindOnActivityEvent(ActivityEvent.onDestory)) 
  36.             .subscribe(new Action1<String>() { 
  37.                 @Override 
  38.                 public void call(String s) { 
  39.                 } 
  40.             }); 

三. 原理解析

1. 首先需要將okhttp的WebSocket包裝成Observable,由于需要將WebSocket,Stringmsg,ByteString等信息一同發送給觀察者所以先構建一個WebSocketInfo類,將信息封裝:

  1. public class WebSocketInfo { 
  2.     private WebSocket mWebSocket; 
  3.     private String mString; 
  4.     private ByteString mByteString; 
  5.     private boolean onOpen; 
  6.     //其他省略 

onOpen字段主要用來判斷當前的這個WebSocketInfo是否是當WebSocket打開時發送的消息(onOpen),這時,Stringmsg和ByteString都是null.

2. 將WebSocketInfo包裝成Observable發出:

  1. private final class WebSocketOnSubscribe implements Observable.OnSubscribe<WebSocketInfo> { 
  2.     private String url; 
  3.     private WebSocket webSocket; 
  4.     private WebSocketInfo startInfo; 
  5.     private WebSocketInfo stringInfo; 
  6.     private WebSocketInfo byteStringInfo; 
  7.  
  8.     public WebSocketOnSubscribe(String url) { 
  9.         this.url = url; 
  10.         startInfo = new WebSocketInfo(true); 
  11.         stringInfo = new WebSocketInfo(); 
  12.         byteStringInfo = new WebSocketInfo(); 
  13.     } 
  14.  
  15.     @Override 
  16.     public void call(final Subscriber<?super WebSocketInfo> subscriber) { 
  17.         if (webSocket != null) { 
  18.             //降低重連頻率 
  19.             if (!"main".equals(Thread.currentThread().getName())) { 
  20.                 SystemClock.sleep(2000); 
  21.             } 
  22.         } 
  23.  
  24.         initWebSocket(subscriber); 
  25.     } 
  26.  
  27.     private void initWebSocket( 
  28.         final Subscriber<?super WebSocketInfo> subscriber) { 
  29.         webSocket = client.newWebSocket(getRequest(url), 
  30.                 new WebSocketListener() { 
  31.                     @Override 
  32.                     public void onOpen(final WebSocket webSocket, 
  33.                         Response response) { 
  34.                         if (showLog) { 
  35.                             Log.d("RxWebSocketUtil", url + " --> onOpen"); 
  36.                         } 
  37.  
  38.                         webSocketMap.put(url, webSocket); 
  39.                         AndroidSchedulers.mainThread().createWorker().schedule(new Action0() { 
  40.                                 @Override 
  41.                                 public void call() { 
  42.                                     if (!subscriber.isUnsubscribed()) { 
  43.                                         subscriber.onStart(); 
  44.                                         startInfo.setWebSocket(webSocket); 
  45.                                         subscriber.onNext(startInfo); 
  46.                                     } 
  47.                                 } 
  48.                             }); 
  49.                     } 
  50.  
  51.                     @Override 
  52.                     public void onMessage(WebSocket webSocket, String text) { 
  53.                         if (!subscriber.isUnsubscribed()) { 
  54.                             stringInfo.setWebSocket(webSocket); 
  55.                             stringInfo.setString(text); 
  56.                             subscriber.onNext(stringInfo); 
  57.                         } 
  58.                     } 
  59.  
  60.                     @Override 
  61.                     public void onMessage(WebSocket webSocket, ByteString bytes) { 
  62.                         if (!subscriber.isUnsubscribed()) { 
  63.                             byteStringInfo.setWebSocket(webSocket); 
  64.                             byteStringInfo.setByteString(bytes); 
  65.                             subscriber.onNext(byteStringInfo); 
  66.                         } 
  67.                     } 
  68.  
  69.                     @Override 
  70.                     public void onFailure(WebSocket webSocket, Throwable t, 
  71.                         Response response) { 
  72.                         if (showLog) { 
  73.                             Log.e("RxWebSocketUtil"
  74.                                 t.toString() + 
  75.                                 webSocket.request().url().uri().getPath()); 
  76.                         } 
  77.  
  78.                         if (!subscriber.isUnsubscribed()) { 
  79.                             subscriber.onError(t); 
  80.                         } 
  81.                     } 
  82.  
  83.                     @Override 
  84.                     public void onClosing(WebSocket webSocket, int code, 
  85.                         String reason) { 
  86.                         webSocket.close(1000, null); 
  87.                     } 
  88.  
  89.                     @Override 
  90.                     public void onClosed(WebSocket webSocket, int code, 
  91.                         String reason) { 
  92.                         if (showLog) { 
  93.                             Log.d("RxWebSocketUtil"
  94.                                 url + " --> onClosed:code= " + code); 
  95.                         } 
  96.                     } 
  97.                 }); 
  98.         subscriber.add(new MainThreadSubscription() { 
  99.                 @Override 
  100.                 protected void onUnsubscribe() { 
  101.                     webSocket.close(3000, "手動關閉"); 
  102.                 } 
  103.             }); 
  104.     } 

實現一個WebSocketOnSubscribe 將WebSocket的回調轉化成subscriber調用.發送給Observable下游.在onOpen時調用 subscriber.onStart(),并且發送一個onOpen的WebSocketInfo.在subscriber注銷的時候關閉WebSocket.在call方法最上面有個SystemClock.sleep(2000),這個主要是為了降低在斷連的時候的重連頻率,將在下面講到.

包裝成Observable:

  1. Observable.create(new WebSocketOnSubscribe(url)) 
  2.             .subscribeOn(Schedulers.io()) 
  3.             .observeOn(AndroidSchedulers.mainThread()); 

3. 實現自動重連:

  1. Observable.create(new WebSocketOnSubscribe(url)) 
  2.             //自動重連 
  3.             .timeout(timeout, timeUnit).retry() 
  4.             .subscribeOn(Schedulers.io()) 
  5.             .observeOn(AndroidSchedulers.mainThread()); 

RxJava retry操作符,很完美的實現了這個功能,當上游發出Throwable的時候,retry將錯誤吃掉,并重新調用 onSubscribe的call方法,也就是WebSocketOnSubscribe的call,就會重新初始化一個WebSocket連接,達到重連的目的,如果一直沒有網絡,這個retry的調用頻率非常高,所以在call方法里面,當是重連的時候,就SystemClock.sleep(2000),休眠2秒,這樣重連的頻率就是2秒重連一次. 當然在retry上面還有一個timeout操作符.當subscriber.onNext()在指定時間間隔里沒有調用,就發出一個timeoutException,讓retry重連WebSocket.這個主要是為了適配部分國產機型,當WebSocket發生連接異常時,不會及時發出錯誤,如小米平板.在每次重連都會把原來的WebSocket關閉.

4. 實現同一個URL的WebSocket共享

  1. Observable.create(new WebSocketOnSubscribe(url)) 
  2.         //自動重連 
  3.         .timeout(timeout, timeUnit) 
  4.         .retry() 
  5.         //共享 
  6.         .doOnUnsubscribe(new Action0() { 
  7.             @Override 
  8.             public void call() { 
  9.                 observableMap.remove(url); 
  10.                 webSocketMap.remove(url); 
  11.  
  12.                 if (showLog) { 
  13.                     Log.d("RxWebSocketUtil""注銷"); 
  14.                 } 
  15.             } 
  16.         }) 
  17.         .doOnNext(new Action1<WebSocketInfo>() { 
  18.             @Override 
  19.             public void call(WebSocketInfo webSocketInfo) { 
  20.                 if (webSocketInfo.isOnOpen()) { 
  21.                     webSocketMap.put(url, webSocketInfo.getWebSocket()); 
  22.                 } 
  23.             } 
  24.         }) 
  25.         .share() 
  26.         .subscribeOn(Schedulers.io()) 
  27.         .observeOn(AndroidSchedulers.mainThread()); 

實現共享功能,主要是為了防止一個URL的WebSocket,建立多個連接,這個主要是由RxJava的share操作符實現,share操作符,使得一個Observable可以有多個subscriber,當有多個subscriber時,當所有的subscriber都取消訂閱,這個Observable才會取消訂閱.

getWebSocketInfo()方法完整代碼:

  1. public Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) { 
  2.     Observable<WebSocketInfo> observable = observableMap.get(url); 
  3.     if (observable == null) { 
  4.         observable = Observable.create(new WebSocketOnSubscribe(url)) 
  5.                 //自動重連 
  6.                 .timeout(timeout, timeUnit) 
  7.                 .retry() 
  8.                 //共享 
  9.                 .doOnUnsubscribe(new Action0() { 
  10.                     @Override 
  11.                     public void call() { 
  12.                         observableMap.remove(url); 
  13.                         webSocketMap.remove(url); 
  14.                         if (showLog) { 
  15.                             Log.d("RxWebSocketUtil""注銷"); 
  16.                         } 
  17.                     } 
  18.                 }) 
  19.                 .doOnNext(new Action1<WebSocketInfo>() { 
  20.                     @Override 
  21.                     public void call(WebSocketInfo webSocketInfo) { 
  22.                         if (webSocketInfo.isOnOpen()) { 
  23.                             webSocketMap.put(url, webSocketInfo.getWebSocket()); 
  24.                         } 
  25.                     } 
  26.                 }) 
  27.                 .share() 
  28.                 .subscribeOn(Schedulers.io()) 
  29.                 .observeOn(AndroidSchedulers.mainThread()); 
  30.         observableMap.put(url, observable); 
  31.     } else { 
  32.         observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable); 
  33.     } 
  34.     return observable; 

doOnUnsubscribe作用:在Observable注銷,即 WebSocket關閉時,移除map中的緩存的Observable和WebSocket.

doOnNext作用: 判斷接收到的WebSocketInfo是否是WebSocket在onOpen的時候發的,然后將其緩存起來.作用就是:如果有一個相同的URL訂閱Observable,就從緩存中取,這個時候我們應該把一個WebSocket的onOpen事件也發給這個訂閱者:

  1. //使用merge操作符,將onOpen事件發給訂閱者 
  2. observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable); 

這樣的話,同一個URL的WebSocket,不管在什么地方什么時間訂閱,都能收到一個onOpen事件,外部表現的就像一個新的WebSocket.

getWebSocketInfo方法的幾種變體:

  1. /** 
  2.  * default timeout: 30 days 
  3.  * <p> 
  4.  * 若忽略小米平板,請調用這個方法 
  5.  * </p> 
  6.  */ 
  7. public Observable<WebSocketInfo> getWebSocketInfo(String url) { 
  8.     return getWebSocketInfo(url, 30, TimeUnit.DAYS); 
  9.  
  10. public Observable<String> getWebSocketString(String url) { 
  11.     return getWebSocketInfo(url) 
  12.             .map(new Func1<WebSocketInfo, String>() { 
  13.                 @Override 
  14.                 public String call(WebSocketInfo webSocketInfo) { 
  15.                     return webSocketInfo.getString(); 
  16.                 } 
  17.             }) 
  18.             .filter(new Func1<String, Boolean>() { 
  19.                 @Override 
  20.                 public Boolean call(String s) { 
  21.                     return s != null
  22.                 } 
  23.             }); 
  24.  
  25. public Observable<ByteString> getWebSocketByteString(String url) { 
  26.     return getWebSocketInfo(url) 
  27.             .map(new Func1<WebSocketInfo, ByteString>() { 
  28.                 @Override 
  29.                 public ByteString call(WebSocketInfo webSocketInfo) { 
  30.                     return webSocketInfo.getByteString(); 
  31.                 } 
  32.             }) 
  33.             .filter(new Func1<ByteString, Boolean>() { 
  34.                 @Override 
  35.                 public Boolean call(ByteString byteString) { 
  36.                     return byteString != null
  37.                 } 
  38.             }); 
  39.  
  40. public Observable<WebSocket> getWebSocket(String url) { 
  41.     return getWebSocketInfo(url) 
  42.             .map(new Func1<WebSocketInfo, WebSocket>() { 
  43.                 @Override 
  44.                 public WebSocket call(WebSocketInfo webSocketInfo) { 
  45.                     return webSocketInfo.getWebSocket(); 
  46.                 } 
  47.             }); 

5 . send信息到服務端

上面已經講到WebSocketInfo包含了WebSocket,所以在訂閱后,就可以拿到這個WebSocket引用就可以WebSocket.send發送消息到服務端.當然我們的RxWebSocketUtil已經將開啟的WebSocket已經緩存.所以我們也可以這樣發消息:

  1. /** 
  2.  * 如果url的WebSocket已經打開,可以直接調用這個發送消息. 
  3.  * 
  4.  * @param url 
  5.  * @param msg 
  6.  */ 
  7. public void send(String url, String msg) { 
  8.     WebSocket webSocket = webSocketMap.get(url); 
  9.     if (webSocket != null) { 
  10.         webSocket.send(msg); 
  11.     } else { 
  12.         throw new IllegalStateException("The WebSokcet not open"); 
  13.     } 
  14.  
  15. /** 
  16.  * 如果url的WebSocket已經打開,可以直接調用這個發送消息. 
  17.  * 
  18.  * @param url 
  19.  * @param byteString 
  20.  */ 
  21. public void send(String url, ByteString byteString) { 
  22.     WebSocket webSocket = webSocketMap.get(url); 
  23.     if (webSocket != null) { 
  24.         webSocket.send(byteString); 
  25.     } else { 
  26.         throw new IllegalStateException("The WebSokcet not open"); 
  27.     } 

當指定的URL的WebSocket沒有打開會直接報錯.

異步發送消息到服務端

  1. /** 
  2.  * 不用關心url 的WebSocket是否打開,可以直接發送 
  3.  * 
  4.  * @param url 
  5.  * @param msg 
  6.  */ 
  7. public void asyncSend(String url, final String msg) { 
  8.     getWebSocket(url) 
  9.             .first() 
  10.             .subscribe(new Action1<WebSocket>() { 
  11.                 @Override 
  12.                 public void call(WebSocket webSocket) { 
  13.                     webSocket.send(msg); 
  14.                 } 
  15.             }); 
  16.  
  17.  
  18. /** 
  19.  * 不用關心url 的WebSocket是否打開,可以直接發送 
  20.  * 
  21.  * @param url 
  22.  * @param byteString 
  23.  */ 
  24. public void asyncSend(String url, final ByteString byteString) { 
  25.     getWebSocket(url) 
  26.             .first() 
  27.             .subscribe(new Action1<WebSocket>() { 
  28.                 @Override 
  29.                 public void call(WebSocket webSocket) { 
  30.                     webSocket.send(byteString); 
  31.                 } 
  32.             }); 

這兩種發送方式,你不用關心URL的WebSocket是否打開,可以直接發送.實現思路也很簡單,getWebSocket(url)會獲取到Observable,或者是從緩存中取,或者是重新開啟一個WebSocket,但你都不需要關心,經過first操作符后,如果是從緩存取的Observable,就注銷的當前的Observable,當是新開的WebSocket,注銷掉當前的subscriber后,就沒有其他subscriber了,這個新開的WebSocket就會關閉(share操作符作用).

想了解更多內容,請訪問:

51CTO和華為官方合作共建的鴻蒙技術社區

https://harmonyos.51cto.com

 

責任編輯:jianghua 來源: 鴻蒙社區
相關推薦

2023-12-11 07:12:21

心跳檢測重連機制服務端

2021-02-26 12:37:39

WebSocketOkHttp連接

2017-05-25 11:49:30

Android網絡請求OkHttp

2024-03-19 08:45:45

WebSocketSpring應用開發

2021-12-07 10:23:27

鴻蒙HarmonyOS應用

2021-11-09 09:30:52

OkHttp面試Android

2014-09-01 10:22:11

Node.js技術架構

2020-03-19 10:13:13

OkHttpWebSocket

2024-04-03 15:40:14

WebSocketWeb應用Spring

2018-04-20 09:36:23

NettyWebSocket京東

2017-03-22 13:20:07

RxJavaSingleCompletable

2024-11-04 08:00:00

Netty客戶端

2024-08-07 08:22:27

2024-12-23 06:00:00

TCPC#網絡

2014-03-25 14:21:18

WebSocket實時

2012-06-04 14:41:16

Win7連網

2024-01-26 08:41:55

Fluent工具包高并發

2025-06-20 02:11:00

2021-10-25 10:30:12

JavaScript開發 代碼

2024-03-21 08:34:49

Vue3WebSocketHTTP
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品成人在线观看 | 日韩午夜网站 | 一区二区在线不卡 | 亚洲一区二区三区视频在线 | 精品区一区二区 | 日韩欧美天堂 | 国产成人在线一区 | 99精品一区二区三区 | 一区二区三区四区在线 | 精品视频一区二区三区在线观看 | 欧美日韩视频在线 | 99久久精品国产一区二区三区 | 国产美女特级嫩嫩嫩bbb片 | 国产精品自拍视频 | 色综合天天天天做夜夜夜夜做 | 中文一级片 | 国产精品日韩一区二区 | 激情久久网 | 欧美视频精品 | 亚洲精品久久 | 日韩在线中文 | 国产精品久久久久久久久久 | 久久国产精品久久国产精品 | 7777奇米影视 | 国产成人精品一区二区三区四区 | 欧美三级在线 | 亚洲成人免费观看 | 日日摸夜夜添夜夜添精品视频 | 亚洲成人一区 | 精品无码久久久久久国产 | 国产精品a久久久久 | 亚洲人人 | 美美女高清毛片视频免费观看 | 欧美视频免费在线 | 日韩1区 | 国产免费观看视频 | 91在线精品视频 | 在线播放国产一区二区三区 | 夜夜爽99久久国产综合精品女不卡 | 亚洲毛片在线 | 成人亚洲性情网站www在线观看 |