Java 從零開始手寫 RPC基于 Websocket 實現(xiàn)
RPC
解決的問題
RPC 主要是為了解決的兩個問題:
(1)解決分布式系統(tǒng)中,服務之間的調用問題。
(2)遠程調用時,要能夠像本地調用一樣方便,讓調用者感知不到遠程調用的邏輯。
這一節(jié)我們來學習下如何基于 websocket 實現(xiàn)最簡單的 rpc 調用,后續(xù)會實現(xiàn)基于 netty4 的版本。
開源地址: https://github.com/houbb/rpc
完整流程

其中左邊的Client,對應的就是前面的Service A,而右邊的Server,對應的則是Service B。
下面一步一步詳細解釋一下。
(1)Service A的應用層代碼中,調用了Calculator的一個實現(xiàn)類的add方法,希望執(zhí)行一個加法運算;
(2)這個Calculator實現(xiàn)類,內部并不是直接實現(xiàn)計算器的加減乘除邏輯,而是通過遠程調用Service B的RPC接口,來獲取運算結果,因此稱之為Stub;
(3)Stub怎么和Service B建立遠程通訊呢?這時候就要用到遠程通訊工具了,也就是圖中的Run-time Library,這個工具將幫你實現(xiàn)遠程通訊的功能,比如Java的Socket,就是這樣一個庫,當然,你也可以用基于Http協(xié)議的HttpClient,或者其他通訊工具類,都可以,RPC并沒有規(guī)定說你要用何種協(xié)議進行通訊;
(4)Stub通過調用通訊工具提供的方法,和Service B建立起了通訊,然后將請求數(shù)據(jù)發(fā)給Service B。需要注意的是,由于底層的網(wǎng)絡通訊是基于二進制格式的,因此這里Stub傳給通訊工具類的數(shù)據(jù)也必須是二進制,比如calculator.add(1,2),你必須把參數(shù)值1和2放到一個Request對象里頭(這個Request對象當然不只這些信息,還包括要調用哪個服務的哪個RPC接口等其他信息),然后序列化為二進制,再傳給通訊工具類,這一點也將在下面的代碼實現(xiàn)中體現(xiàn);
(5)二進制的數(shù)據(jù)傳到Service B這一邊了,Service B當然也有自己的通訊工具,通過這個通訊工具接收二進制的請求;
(6)既然數(shù)據(jù)是二進制的,那么自然要進行反序列化了,將二進制的數(shù)據(jù)反序列化為請求對象,然后將這個請求對象交給Service B的Stub處理;
(7)和之前的Service A的Stub一樣,這里的Stub也同樣是個“假玩意”,它所負責的,只是去解析請求對象,知道調用方要調的是哪個RPC接口,傳進來的參數(shù)又是什么,然后再把這些參數(shù)傳給對應的RPC接口,也就是Calculator的實際實現(xiàn)類去執(zhí)行。很明顯,如果是Java,那這里肯定用到了反射。
(8)RPC接口執(zhí)行完畢,返回執(zhí)行結果,現(xiàn)在輪到Service B要把數(shù)據(jù)發(fā)給Service A了,怎么發(fā)?一樣的道理,一樣的流程,只是現(xiàn)在Service B變成了Client,Service A變成了Server而已:Service B反序列化執(zhí)行結果->傳輸給Service A->Service A反序列化執(zhí)行結果 -> 將結果返回給Application,完畢。
簡單實現(xiàn)
假設服務 A,想調用服務 B 的一個方法。
因為不在同一個內存中,無法直接使用。如何可以實現(xiàn)類似 Dubbo 的功能呢?
這里不需要使用 HTTP 級別的通信,使用 TCP 協(xié)議即可。
common
公用模塊,定義通用對象。
- Rpc 常量
- public interface RpcConstant {
- /**
- * 地址
- */
- String ADDRESS = "127.0.0.1";
- /**
- * 端口號
- */
- int PORT = 12345;
- }
- 請求入?yún)?/li>
- public class RpcCalculateRequest implements Serializable {
- private static final long serialVersionUID = 6420751004355300996L;
- /**
- * 參數(shù)一
- */
- private int one;
- /**
- * 參數(shù)二
- */
- private int two;
- //getter & setter & toString()
- }
- 服務接口
- public interface Calculator {
- /**
- * 計算加法
- * @param one 參數(shù)一
- * @param two 參數(shù)二
- * @return 返回結果
- */
- int add(int one, int two);
- }
server
- 服務接口的實現(xiàn)
- public class CalculatorImpl implements Calculator {
- @Override
- public int add(int one, int two) {
- return one + two;
- }
- }
- 啟動服務
- public static void main(String[] args) throws IOException {
- Calculator calculator = new CalculatorImpl();
- try (ServerSocket listener = new ServerSocket(RpcConstant.PORT)) {
- System.out.println("Server 端啟動:" + RpcConstant.ADDRESS + ":" + RpcConstant.PORT);
- while (true) {
- try (Socket socket = listener.accept()) {
- // 將請求反序列化
- ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
- Object object = objectInputStream.readObject();
- System.out.println("Request is: " + object);
- // 調用服務
- int result = 0;
- if (object instanceof RpcCalculateRequest) {
- RpcCalculateRequest calculateRpcRequest = (RpcCalculateRequest) object;
- result = calculator.add(calculateRpcRequest.getOne(), calculateRpcRequest.getTwo());
- }
- // 返回結果
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
- objectOutputStream.writeObject(result);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
啟動日志:
- Server 端啟動:127.0.0.1:12345
client
•客戶端調用
- public static void main(String[] args) {
- Calculator calculator = new CalculatorProxy();
- int result = calculator.add(1, 2);
- System.out.println(result);
- }
- 計算的代理類
- public class CalculatorProxy implements Calculator {
- @Override
- public int add(int one, int two) {
- try {
- Socket socket = new Socket(RpcConstant.ADDRESS, RpcConstant.PORT);
- // 將請求序列化
- RpcCalculateRequest calculateRpcRequest = new RpcCalculateRequest(one, two);
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
- // 將請求發(fā)給服務提供方
- objectOutputStream.writeObject(calculateRpcRequest);
- // 將響應體反序列化
- ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
- Object response = objectInputStream.readObject();
- if (response instanceof Integer) {
- return (Integer) response;
- } else {
- throw new RuntimeException();
- }
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
- }
- 調用日志
client 端
- 3
server 端
- Server 端啟動:127.0.0.1:12345
- Request is: RpcCalculateRequest{one=1, two=2}