聊聊 IM 系統重構到 SDK 設計的最佳實踐
SDK 設計
圖片
在之前提到了 cim 在做集成測試的時候遇到的問題,需要提供一個 SDK 來解決,于是我花了一些時間編寫了 SDK,同時也將 cim-client 重構了。
重構后的代碼長這個樣子:
@Bean
public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor callbackThreadPool,
Event event) {
OkHttpClient okHttpClient = new OkHttpClient.Builder().connectTimeout(3, TimeUnit.SECONDS)
.readTimeout(3, TimeUnit.SECONDS)
.writeTimeout(3, TimeUnit.SECONDS)
.retryOnConnectionFailure(true).build();
return Client.builder()
.auth(ClientConfigurationData.Auth.builder()
.userName(appConfiguration.getUserName())
.userId(appConfiguration.getUserId())
.build())
.routeUrl(appConfiguration.getRouteUrl())
.loginRetryCount(appConfiguration.getReconnectCount())
.event(event)
.reconnectCheck(client -> !shutDownSign.checkStatus())
.okHttpClient(okHttpClient)
.messageListener(new MsgCallBackListener(msgLogger))
.callbackThreadPool(callbackThreadPool)
.build();
}
配合 springboot 使用時只需要創建一個 Client 即可,這個 Client 里維護了核心的:
- 長鏈接創建、狀態維護
- 心跳檢測
- 超時、網絡異常重連等
同時也提供了簡易的 API 可以直接收發消息:
圖片
這樣在集成到業務代碼中時會更方便。
以前的代碼耦合度非常高,同時因為基礎代碼是 18 年寫的,現在真的沒有眼看了;
重構的過程中使用一些 Java8+ 的一些語法糖精簡了許多代碼,各個模塊間的組織關系也重新梳理,現在會更易維護了。
比如由于創建客戶端需要許多可選參數,于是就提供了 Builder 模式的創建選項:
public interface ClientBuilder {
Client build();
ClientBuilder auth(ClientConfigurationData.Auth auth);
ClientBuilder routeUrl(String routeUrl);
ClientBuilder loginRetryCount(int loginRetryCount);
ClientBuilder event(Event event);
ClientBuilder reconnectCheck(ReconnectCheck reconnectCheck);
ClientBuilder okHttpClient(OkHttpClient okHttpClient);
ClientBuilder messageListener(MessageListener messageListener);
ClientBuilder callbackThreadPool(ThreadPoolExecutor callbackThreadPool);
}
以上部分 API 的設計借鑒了 Pulsar。
Proxy 優化
除此之外還優化了請求代理,這個 Proxy 主要是用于方便在各個服務中發起 rest 調用,我這里為了輕量也沒有使用 Dubbo、SpringCloud 這類服務框架。
但如果都硬編碼 http client 去請求時會有許多重復冗余的代碼,比如創建連接、請求參數、響應解析、異常處理等。
于是在之前的版本中就提供了一個 ProxyManager 的基本實現:
@Override
public List<OnlineUsersResVO.DataBodyBean> onlineUsers() throws Exception{
RouteApi routeApi = new ProxyManager<>(RouteApi.class, routeUrl, okHttpClient).getInstance();
Response response = null;
OnlineUsersResVO onlineUsersResVO = null;
try {
response = (Response) routeApi.onlineUser();
String json = response.body().string() ;
onlineUsersResVO = JSON.parseObject(json, OnlineUsersResVO.class);
}catch (Exception e){
log.error("exception",e);
}finally {
response.body().close();
}
return onlineUsersResVO.getDataBody();
}
雖然提供了一些連接管理和參數封裝等基礎功能,但只實現了一半。
從上面的代碼也可以看出序列化都得自己實現,這些代碼完全是冗余的。
經過重構后以上的代碼可以精簡到如下:
// 聲明接口
@Request(method = Request.GET)
BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception;
// 初始化
routeApi = RpcProxyManager.create(RouteApi.class, routeUrl, okHttpClient);
public Set<CIMUserInfo> onlineUser() throws Exception {
BaseResponse<Set<CIMUserInfo>> onlineUsersResVO = routeApi.onlineUser();
return onlineUsersResVO.getDataBody();
}
這個調整之后就非常類似于 Dubbo gRPC 這類 RPC 框架的使用,只需要把接口定義好,就和調用本地函數一樣的簡單。
為了方便后續可能調用一些外部系統,在此基礎上還支持了指定多種請求 method、指定 URL 、返回結果嵌套泛型等。
@Request(url = "sample-request?author=beeceptor")
EchoGeneric<EchoResponse.HeadersDTO> echoGeneric(EchoRequest message);
@Test
public void testGeneric() {
OkHttpClient client = new OkHttpClient();
String url = "http://echo.free.beeceptor.com";
Echo echo = RpcProxyManager.create(Echo.class, url, client);
EchoRequest request = new EchoRequest();
request.setName("crossoverJie");
request.setAge(18);
request.setCity("shenzhen");
// 支持泛型解析
EchoGeneric<EchoResponse.HeadersDTO> response = echo.echoGeneric(request);
Assertions.assertEquals(response.getHeaders().getHost(), "echo.free.beeceptor.com");
}
支持動態 URL 調用
圖片
還有一個 todo:希望可以將 ProxyManager 交給 Spring 去管理,之前是在每次調用的地方都會創建一個 Proxy 對象,完全沒有必要,代碼也很冗余。
但有網友在實現過程中發現,有個場景的請求地址是動態的,如果是交給 Spring 管理為單例后是沒法修改 URL 地址的,因為這個地址是在創建對象的時候初始化的。
所以我就在這里新增了一個動態 URL 的特性:
EchoResponse echoTarget(EchoRequest message, @DynamicUrl(useMethodEndpoint = false) String url);
Echo echo = RpcProxyManager.create(Echo.class, client);
String url = "http://echo.free.beeceptor.com/sample-request?author=beeceptor";
EchoResponse response = echo.echoTarget(request, url);
在聲明接口的時候使用 @DynamicUrl 的方法參數注解,告訴代理這個參數是 URL。這樣就可以允許在創建 Proxy 對象的時候不指定 URL,而是在實際調用時候再傳入具體的 URL,更方便創建單例了。
集成測試優化
同時還優化了集成測試,支持了 server 的集群版測試。
@Test
public void testReconnect() throws Exception {
super.startTwoServer();
super.startRoute();
String routeUrl = "http://localhost:8083";
String cj = "cj";
String zs = "zs";
Long cjId = super.registerAccount(cj);
Long zsId = super.registerAccount(zs);
var auth1 = ClientConfigurationData.Auth.builder()
.userName(cj)
.userId(cjId)
.build();
var auth2 = ClientConfigurationData.Auth.builder()
.userName(zs)
.userId(zsId)
.build();
@Cleanup
Client client1 = Client.builder()
.auth(auth1)
.routeUrl(routeUrl)
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state = client1.getState();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));
AtomicReference<String> client2Receive = new AtomicReference<>();
@Cleanup
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state2));
Optional<CIMServerResVO> serverInfo2 = client2.getServerInfo();
Assertions.assertTrue(serverInfo2.isPresent());
System.out.println("client2 serverInfo = " + serverInfo2.get());
// send msg
String msg = "hello";
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
client2Receive.set("");
System.out.println("ready to restart server");
TimeUnit.SECONDS.sleep(3);
Optional<CIMServerResVO> serverInfo = client1.getServerInfo();
Assertions.assertTrue(serverInfo.isPresent());
System.out.println("server info = " + serverInfo.get());
super.stopServer(serverInfo.get().getCimServerPort());
System.out.println("stop server success! " + serverInfo.get());
// Waiting server stopped, and client reconnect.
TimeUnit.SECONDS.sleep(30);
System.out.println("reconnect state: " + client1.getState());
Awaitility.await().atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state));
serverInfo = client1.getServerInfo();
Assertions.assertTrue(serverInfo.isPresent());
System.out.println("client1 reconnect server info = " + serverInfo.get());
// Send message again.
log.info("send message again, client2Receive = {}", client2Receive.get());
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
super.stopTwoServer();
}
比如在這里編寫了一個客戶端重連的單測,代碼有點長,但它的主要流程如下:
- 啟動兩個 Server:Server1,Server2
- 啟動 Route
- 在啟動兩個 Client 發送消息
- 校驗消息發送是否成功
- 停止 Client1 連接的 Server
- 等待 Client 自動重連到另一個 Server
- 再次發送消息
- 校驗消息發送是否成功
這樣就可以驗證在服務端 Server 宕機后整個服務是否可用,消息收發是否正常。
public void startTwoServer() {
if (!zooKeeperContainer.isRunning()){
zooKeeperContainer.start();
} zookeeperAddr = String.format("%s:%d", zooKeeperContainer.getHost(), zooKeeperContainer.getMappedPort(ZooKeeperContainer.DEFAULT_CLIENT_PORT));
SpringApplication server = new SpringApplication(CIMServerApplication.class);
String[] args1 = new String[]{
"--cim.server.port=11211",
"--server.port=8081",
"--app.zk.addr=" + zookeeperAddr,
}; ConfigurableApplicationContext run1 = server.run(args1);
runMap.put(Integer.parseInt("11211"), run1);
SpringApplication server2 = new SpringApplication(CIMServerApplication.class);
String[] args2 = new String[]{
"--cim.server.port=11212",
"--server.port=8082",
"--app.zk.addr=" + zookeeperAddr,
}; ConfigurableApplicationContext run2 = server2.run(args2);
runMap.put(Integer.parseInt("11212"), run2);
}
public void stopServer(Integer port) {
runMap.get(port).close();
runMap.remove(port);
}
這里的啟動兩個 Server 就是創建了兩個 Server 應用,然后保存好端口和應用之間的映射關系。
這樣就可以根據客戶端連接的 Server 信息指定停止哪一個 Server,更方便做測試。
這次重啟 cim 的維護后會盡量維護下去,即便更新時間慢一點。
后續還會加上消息 ack、離線消息等之前呼聲很高的功能,感興趣的完全可以一起參與。
源碼地址:https://github.com/crossoverJie/cim