成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

聊聊 IM 系統重構到 SDK 設計的最佳實踐

開發 前端
如果我們網絡環境發生了變化,比如從家里的 Wi-Fi 切換到了公司的,需要手動刪除下 FE/meta 下的所有文件再次啟動,BE 則是需要重啟一下容器。

SDK 設計

圖片圖片

在之前提到了 cim 在做集成測試的時候遇到的問題,需要提供一個 SDK 來解決,于是我花了一些時間編寫了 SDK,同時也將 cim-client 重構了。

重構后的代碼長這個樣子:

@Bean
    public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor callbackThreadPool,
                              Event event) {
        OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(3, TimeUnit.SECONDS)
                .readTimeout(3, TimeUnit.SECONDS)
                .writeTimeout(3, TimeUnit.SECONDS)
                .retryOnConnectionFailure(true).build();

        return Client.builder()
                .auth(ClientConfigurationData.Auth.builder()
                        .userName(appConfiguration.getUserName())
                        .userId(appConfiguration.getUserId())
                        .build())
                .routeUrl(appConfiguration.getRouteUrl())
                .loginRetryCount(appConfiguration.getReconnectCount())
                .event(event)
                .reconnectCheck(client -> !shutDownSign.checkStatus())
                .okHttpClient(okHttpClient)
                .messageListener(new MsgCallBackListener(msgLogger))
                .callbackThreadPool(callbackThreadPool)
                .build();
    }

配合 springboot 使用時只需要創建一個 Client 即可,這個 Client 里維護了核心的:

  • 長鏈接創建、狀態維護
  • 心跳檢測
  • 超時、網絡異常重連等

同時也提供了簡易的 API 可以直接收發消息:

圖片圖片

這樣在集成到業務代碼中時會更方便。

以前的代碼耦合度非常高,同時因為基礎代碼是 18 年寫的,現在真的沒有眼看了;

重構的過程中使用一些 Java8+ 的一些語法糖精簡了許多代碼,各個模塊間的組織關系也重新梳理,現在會更易維護了。

比如由于創建客戶端需要許多可選參數,于是就提供了 Builder 模式的創建選項:

public interface ClientBuilder {  
  
    Client build();  
    ClientBuilder auth(ClientConfigurationData.Auth auth);  
    ClientBuilder routeUrl(String routeUrl);  
    ClientBuilder loginRetryCount(int loginRetryCount);  
    ClientBuilder event(Event event);  
    ClientBuilder reconnectCheck(ReconnectCheck reconnectCheck);  
    ClientBuilder okHttpClient(OkHttpClient okHttpClient);  
    ClientBuilder messageListener(MessageListener messageListener);  
    ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);  
}


以上部分 API 的設計借鑒了 Pulsar。

Proxy 優化

除此之外還優化了請求代理,這個 Proxy 主要是用于方便在各個服務中發起 rest 調用,我這里為了輕量也沒有使用 Dubbo、SpringCloud 這類服務框架。

但如果都硬編碼 http client 去請求時會有許多重復冗余的代碼,比如創建連接、請求參數、響應解析、異常處理等。

于是在之前的版本中就提供了一個 ProxyManager 的基本實現:

@Override  
public List<OnlineUsersResVO.DataBodyBean> onlineUsers() throws Exception{  
    RouteApi routeApi = new ProxyManager<>(RouteApi.class, routeUrl, okHttpClient).getInstance();  
  
    Response response = null;  
    OnlineUsersResVO onlineUsersResVO = null;  
    try {  
        response = (Response) routeApi.onlineUser();  
        String json = response.body().string() ;  
        onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);  
  
    }catch (Exception e){  
        log.error("exception",e);  
    }finally {  
        response.body().close();  
    }  
    return onlineUsersResVO.getDataBody();  
}

雖然提供了一些連接管理和參數封裝等基礎功能,但只實現了一半。

從上面的代碼也可以看出序列化都得自己實現,這些代碼完全是冗余的。

經過重構后以上的代碼可以精簡到如下:

// 聲明接口
@Request(method = Request.GET)  
BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception;

// 初始化
routeApi = RpcProxyManager.create(RouteApi.class, routeUrl, okHttpClient);

public Set<CIMUserInfo> onlineUser() throws Exception {  
    BaseResponse<Set<CIMUserInfo>> onlineUsersResVO = routeApi.onlineUser();  
    return onlineUsersResVO.getDataBody();  
}

這個調整之后就非常類似于 Dubbo gRPC 這類 RPC 框架的使用,只需要把接口定義好,就和調用本地函數一樣的簡單。

為了方便后續可能調用一些外部系統,在此基礎上還支持了指定多種請求 method、指定 URL 、返回結果嵌套泛型等。

@Request(url = "sample-request?author=beeceptor")  
EchoGeneric<EchoResponse.HeadersDTO> echoGeneric(EchoRequest message);

@Test  
public void testGeneric() {  
    OkHttpClient client = new OkHttpClient();  
    String url = "http://echo.free.beeceptor.com";  
    Echo echo = RpcProxyManager.create(Echo.class, url, client);  
    EchoRequest request = new EchoRequest();  
    request.setName("crossoverJie");  
    request.setAge(18);  
    request.setCity("shenzhen");  
    // 支持泛型解析
    EchoGeneric<EchoResponse.HeadersDTO> response = echo.echoGeneric(request);  
    Assertions.assertEquals(response.getHeaders().getHost(), "echo.free.beeceptor.com");  
}

支持動態 URL 調用

圖片圖片

還有一個 todo:希望可以將 ProxyManager 交給 Spring 去管理,之前是在每次調用的地方都會創建一個 Proxy 對象,完全沒有必要,代碼也很冗余。

但有網友在實現過程中發現,有個場景的請求地址是動態的,如果是交給 Spring 管理為單例后是沒法修改 URL 地址的,因為這個地址是在創建對象的時候初始化的。

所以我就在這里新增了一個動態 URL 的特性:

EchoResponse echoTarget(EchoRequest message, @DynamicUrl(useMethodEndpoint = false) String url);

Echo echo = RpcProxyManager.create(Echo.class, client);
String url = "http://echo.free.beeceptor.com/sample-request?author=beeceptor";
EchoResponse response = echo.echoTarget(request, url);

在聲明接口的時候使用 @DynamicUrl 的方法參數注解,告訴代理這個參數是 URL。這樣就可以允許在創建  Proxy 對象的時候不指定 URL,而是在實際調用時候再傳入具體的 URL,更方便創建單例了。

集成測試優化

同時還優化了集成測試,支持了 server 的集群版測試。

https://github.com/crossoverJie/cim/blob/4c149f8bda78718e3ecae2c5759aa9732eff9132/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java#L210

@Test  
public void testReconnect() throws Exception {  
    super.startTwoServer();  
    super.startRoute();  
  
    String routeUrl = "http://localhost:8083";  
    String cj = "cj";  
    String zs = "zs";  
    Long cjId = super.registerAccount(cj);  
    Long zsId = super.registerAccount(zs);  
    var auth1 = ClientConfigurationData.Auth.builder()  
            .userName(cj)  
            .userId(cjId)  
            .build();  
    var auth2 = ClientConfigurationData.Auth.builder()  
            .userName(zs)  
            .userId(zsId)  
            .build();  
  
    @Cleanup  
    Client client1 = Client.builder()  
            .auth(auth1)  
            .routeUrl(routeUrl)  
            .build();  
    TimeUnit.SECONDS.sleep(3);  
    ClientState.State state = client1.getState();  
    Awaitility.await().atMost(10, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));  
  
  
    AtomicReference<String> client2Receive = new AtomicReference<>();  
    @Cleanup  
    Client client2 = Client.builder()  
            .auth(auth2)  
            .routeUrl(routeUrl)  
            .messageListener((client, message) -> client2Receive.set(message))  
            .build();  
    TimeUnit.SECONDS.sleep(3);  
    ClientState.State state2 = client2.getState();  
    Awaitility.await().atMost(10, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state2));  
  
    Optional<CIMServerResVO> serverInfo2 = client2.getServerInfo();  
    Assertions.assertTrue(serverInfo2.isPresent());  
    System.out.println("client2 serverInfo = " + serverInfo2.get());  
  
    // send msg  
    String msg = "hello";  
    client1.sendGroup(msg);  
    Awaitility.await()  
            .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));  
    client2Receive.set("");  
  
  
    System.out.println("ready to restart server");  
    TimeUnit.SECONDS.sleep(3);  
    Optional<CIMServerResVO> serverInfo = client1.getServerInfo();  
    Assertions.assertTrue(serverInfo.isPresent());  
    System.out.println("server info = " + serverInfo.get());  
  
    super.stopServer(serverInfo.get().getCimServerPort());  
    System.out.println("stop server success! " + serverInfo.get());  
  
  
    // Waiting server stopped, and client reconnect.  
    TimeUnit.SECONDS.sleep(30);  
    System.out.println("reconnect state: " + client1.getState());  
    Awaitility.await().atMost(15, TimeUnit.SECONDS)  
            .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));  
    serverInfo = client1.getServerInfo();  
    Assertions.assertTrue(serverInfo.isPresent());  
    System.out.println("client1 reconnect server info = " + serverInfo.get());  
  
    // Send message again.  
    log.info("send message again, client2Receive = {}", client2Receive.get());  
    client1.sendGroup(msg);  
    Awaitility.await()  
            .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));  
    super.stopTwoServer();  
}

比如在這里編寫了一個客戶端重連的單測,代碼有點長,但它的主要流程如下:

  • 啟動兩個 Server:Server1,Server2
  • 啟動 Route
  • 在啟動兩個 Client 發送消息
  • 校驗消息發送是否成功
  • 停止 Client1 連接的 Server
  • 等待 Client 自動重連到另一個 Server
  • 再次發送消息
  • 校驗消息發送是否成功

這樣就可以驗證在服務端 Server 宕機后整個服務是否可用,消息收發是否正常。

public void startTwoServer() {  
    if (!zooKeeperContainer.isRunning()){  
        zooKeeperContainer.start();  
    }    zookeeperAddr = String.format("%s:%d", zooKeeperContainer.getHost(), zooKeeperContainer.getMappedPort(ZooKeeperContainer.DEFAULT_CLIENT_PORT));  
    SpringApplication server = new SpringApplication(CIMServerApplication.class);  
    String[] args1 = new String[]{  
            "--cim.server.port=11211",  
            "--server.port=8081",  
            "--app.zk.addr=" + zookeeperAddr,  
    };    ConfigurableApplicationContext run1 = server.run(args1);  
    runMap.put(Integer.parseInt("11211"), run1);  
  
  
    SpringApplication server2 = new SpringApplication(CIMServerApplication.class);  
    String[] args2 = new String[]{  
            "--cim.server.port=11212",  
            "--server.port=8082",  
            "--app.zk.addr=" + zookeeperAddr,  
    };    ConfigurableApplicationContext run2 = server2.run(args2);  
    runMap.put(Integer.parseInt("11212"), run2);  
}

public void stopServer(Integer port) {  
    runMap.get(port).close();  
    runMap.remove(port);  
}

這里的啟動兩個 Server 就是創建了兩個 Server 應用,然后保存好端口和應用之間的映射關系。

這樣就可以根據客戶端連接的 Server 信息指定停止哪一個 Server,更方便做測試。

這次重啟 cim 的維護后會盡量維護下去,即便更新時間慢一點。

后續還會加上消息 ack、離線消息等之前呼聲很高的功能,感興趣的完全可以一起參與。

源碼地址:https://github.com/crossoverJie/cim

責任編輯:武曉燕 來源: crossoverJie
相關推薦

2023-09-11 08:50:03

Maven工具關系管理

2016-12-27 08:49:55

API設計策略

2013-06-13 09:21:31

RESTful APIRESTfulAPI

2024-10-14 14:28:19

支付系統設計

2010-12-28 10:12:39

PHP

2025-01-02 09:06:43

2017-10-20 08:25:10

數據收集工具數據源

2017-03-06 20:39:41

整潔代碼Clean Code

2020-06-08 18:41:07

Kafka微服務Web

2024-08-29 09:32:36

2009-06-22 14:48:21

DRY架構設計

2011-12-31 10:18:33

響應設計

2014-05-19 10:08:36

IM系統架構設計

2020-02-06 08:03:53

疫情設計IM系統

2020-08-07 09:41:00

微服務架構數據

2024-09-23 00:00:00

下拉菜單UI控件

2014-02-26 11:01:28

日志優化系統日志

2023-09-13 08:00:00

JavaScript循環語句

2015-09-15 16:01:40

混合IT私有云IT架構

2023-07-21 01:12:30

Reactfalse?變量
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 五月婷婷 六月丁香 | 国产精品一区在线观看你懂的 | 久久久女| 亚洲精品乱码久久久久久按摩观 | 精精精精xxxx免费视频 | 免费在线视频a | 中文字幕国产视频 | 欧美精品一区二区三区蜜桃视频 | 国产日韩欧美激情 | 美女天天操 | 日韩午夜精品 | 一级欧美黄色片 | 成人免费网站视频 | 99久久国产综合精品麻豆 | 精品国产99 | 伊人久久免费视频 | 久草在线影 | 欧美影院 | 久国产视频 | 亚洲欧美一区二区三区国产精品 | 国产乱码精品一区二区三区忘忧草 | 91porn在线观看 | 日韩视频精品在线 | 91福利网址 | 日韩看片| 久久国产精品-国产精品 | 久久精品国内 | 成人高清视频在线观看 | 日韩一区二区三区精品 | 国产精品s色 | 九九久久精品 | 天天色官网 | 亚洲三级视频 | av一二三区 | 日本福利一区 | 成人在线视频网 | 亚洲国产成人在线视频 | 精品人伦一区二区三区蜜桃网站 | 久久y| 色天天综合 | 97精品超碰一区二区三区 |