Java 從零開始手寫 RPC -序列化
前面幾節我們實現了最基礎的客戶端調用服務端,這一節來學習一下通訊中的對象序列化。
為什么需要序列化
netty 底層都是基于 ByteBuf 進行通訊的。
前面我們通過編碼器/解碼器專門為計算的入參/出參進行處理,這樣方便我們直接使用 pojo。
但是有一個問題,如果想把我們的項目抽象為框架,那就需要為所有的對象編寫編碼器/解碼器。
顯然,直接通過每一個對象寫一對的方式是不現實的,而且用戶如何使用,也是未知的。
序列化的方式
基于字節的實現,性能好,可讀性不高。
基于字符串的實現,比如 json 序列化,可讀性好,性能相對較差。
ps: 可以根據個人還好選擇,相關序列化可參考下文,此處不做展開。
json 序列化框架簡介[1]
實現思路
可以將我們的 Pojo 全部轉化為 byte,然后 Byte 轉換為 ByteBuf 即可。
反之亦然。
代碼實現
maven
引入序列化包:
- <dependency>
- <groupId>com.github.houbb</groupId>
- <artifactId>json</artifactId>
- <version>0.1.1</version>
- </dependency>
服務端
核心
服務端的代碼可以大大簡化:
- serverBootstrap.group(workerGroup, bossGroup)
- .channel(NioServerSocketChannel.class)
- // 打印日志
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel ch) throws Exception {
- ch.pipeline()
- .addLast(new RpcServerHandler());
- }
- })
- // 這個參數影響的是還沒有被accept 取出的連接
- .option(ChannelOption.SO_BACKLOG, 128)
- // 這個參數只是過一段時間內客戶端沒有響應,服務端會發送一個 ack 包,以判斷客戶端是否還活著。
- .childOption(ChannelOption.SO_KEEPALIVE, true);
這里只需要一個實現類即可。
RpcServerHandler
服務端的序列化/反序列化調整為直接使用 JsonBs 實現。
- package com.github.houbb.rpc.server.handler;
- import com.github.houbb.json.bs.JsonBs;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.common.model.CalculateRequest;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import com.github.houbb.rpc.common.service.Calculator;
- import com.github.houbb.rpc.server.service.CalculatorService;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * @author binbin.hou
- * @since 0.0.1
- */
- public class RpcServerHandler extends SimpleChannelInboundHandler {
- private static final Log log = LogFactory.getLog(RpcServerHandler.class);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- final String id = ctx.channel().id().asLongText();
- log.info("[Server] channel {} connected " + id);
- }
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- final String id = ctx.channel().id().asLongText();
- ByteBuf byteBuf = (ByteBuf)msg;
- byte[] bytes = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(bytes);
- CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
- log.info("[Server] receive channel {} request: {} from ", id, request);
- Calculator calculator = new CalculatorService();
- CalculateResponse response = calculator.sum(request);
- // 回寫到 client 端
- byte[] responseBytes = JsonBs.serializeBytes(response);
- ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
- ctx.writeAndFlush(responseBuffer);
- log.info("[Server] channel {} response {}", id, response);
- }
- }
客戶端
核心
客戶端可以簡化如下:
- channelFuture = bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<Channel>(){
- @Override
- protected void initChannel(Channel ch) throws Exception {
- channelHandler = new RpcClientHandler();
- ch.pipeline()
- .addLast(new LoggingHandler(LogLevel.INFO))
- .addLast(channelHandler);
- }
- })
- .connect(RpcConstant.ADDRESS, port)
- .syncUninterruptibly();
RpcClientHandler
客戶端的序列化/反序列化調整為直接使用 JsonBs 實現。
- package com.github.houbb.rpc.client.handler;
- import com.github.houbb.json.bs.JsonBs;
- import com.github.houbb.log.integration.core.Log;
- import com.github.houbb.log.integration.core.LogFactory;
- import com.github.houbb.rpc.client.core.RpcClient;
- import com.github.houbb.rpc.common.model.CalculateResponse;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- /**
- * <p> 客戶端處理類 </p>
- *
- * <pre> Created: 2019/10/16 11:30 下午 </pre>
- * <pre> Project: rpc </pre>
- *
- * @author houbinbin
- * @since 0.0.2
- */
- public class RpcClientHandler extends SimpleChannelInboundHandler {
- private static final Log log = LogFactory.getLog(RpcClient.class);
- /**
- * 響應信息
- * @since 0.0.4
- */
- private CalculateResponse response;
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf byteBuf = (ByteBuf)msg;
- byte[] bytes = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(bytes);
- this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
- log.info("[Client] response is :{}", response);
- }
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // 每次用完要關閉,不然拿不到response,我也不知道為啥(目測得了解netty才行)
- // 個人理解:如果不關閉,則永遠會被阻塞。
- ctx.flush();
- ctx.close();
- }
- public CalculateResponse getResponse() {
- return response;
- }
- }