成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Java 從零開始手寫 RPC -序列化

開發 后端
序列化 (Serialization)是將對象的狀態信息轉換為可以存儲或傳輸的形式的過程。在序列化期間,對象將其當前狀態寫入到臨時或持久性存儲區。

[[429947]]

前面幾節我們實現了最基礎的客戶端調用服務端,這一節來學習一下通訊中的對象序列化。

為什么需要序列化

netty 底層都是基于 ByteBuf 進行通訊的。

前面我們通過編碼器/解碼器專門為計算的入參/出參進行處理,這樣方便我們直接使用 pojo。

但是有一個問題,如果想把我們的項目抽象為框架,那就需要為所有的對象編寫編碼器/解碼器。

顯然,直接通過每一個對象寫一對的方式是不現實的,而且用戶如何使用,也是未知的。

序列化的方式

基于字節的實現,性能好,可讀性不高。

基于字符串的實現,比如 json 序列化,可讀性好,性能相對較差。

ps: 可以根據個人還好選擇,相關序列化可參考下文,此處不做展開。

json 序列化框架簡介[1]

實現思路

可以將我們的 Pojo 全部轉化為 byte,然后 Byte 轉換為 ByteBuf 即可。

反之亦然。

代碼實現

maven

引入序列化包:

  1. <dependency> 
  2.     <groupId>com.github.houbb</groupId> 
  3.     <artifactId>json</artifactId> 
  4.     <version>0.1.1</version> 
  5. </dependency> 

服務端

核心

服務端的代碼可以大大簡化:

  1. serverBootstrap.group(workerGroup, bossGroup) 
  2.     .channel(NioServerSocketChannel.class) 
  3.     // 打印日志 
  4.     .handler(new LoggingHandler(LogLevel.INFO)) 
  5.     .childHandler(new ChannelInitializer<Channel>() { 
  6.         @Override 
  7.         protected void initChannel(Channel ch) throws Exception { 
  8.             ch.pipeline() 
  9.                     .addLast(new RpcServerHandler()); 
  10.         } 
  11.     }) 
  12.     // 這個參數影響的是還沒有被accept 取出的連接 
  13.     .option(ChannelOption.SO_BACKLOG, 128) 
  14.     // 這個參數只是過一段時間內客戶端沒有響應,服務端會發送一個 ack 包,以判斷客戶端是否還活著。 
  15.     .childOption(ChannelOption.SO_KEEPALIVE, true); 

這里只需要一個實現類即可。

RpcServerHandler

服務端的序列化/反序列化調整為直接使用 JsonBs 實現。

  1. package com.github.houbb.rpc.server.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.common.model.CalculateRequest; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9. import com.github.houbb.rpc.common.service.Calculator; 
  10. import com.github.houbb.rpc.server.service.CalculatorService; 
  11.  
  12.  
  13. import io.netty.buffer.ByteBuf; 
  14. import io.netty.buffer.Unpooled; 
  15. import io.netty.channel.ChannelHandlerContext; 
  16. import io.netty.channel.SimpleChannelInboundHandler; 
  17.  
  18.  
  19. /** 
  20.  * @author binbin.hou 
  21.  * @since 0.0.1 
  22.  */ 
  23. public class RpcServerHandler extends SimpleChannelInboundHandler { 
  24.  
  25.  
  26.     private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
  27.  
  28.  
  29.     @Override 
  30.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  31.         final String id = ctx.channel().id().asLongText(); 
  32.         log.info("[Server] channel {} connected " + id); 
  33.     } 
  34.  
  35.  
  36.     @Override 
  37.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  38.         final String id = ctx.channel().id().asLongText(); 
  39.  
  40.  
  41.         ByteBuf byteBuf = (ByteBuf)msg; 
  42.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  43.         byteBuf.readBytes(bytes); 
  44.         CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 
  45.         log.info("[Server] receive channel {} request: {} from ", id, request); 
  46.  
  47.  
  48.         Calculator calculator = new CalculatorService(); 
  49.         CalculateResponse response = calculator.sum(request); 
  50.  
  51.  
  52.         // 回寫到 client 端 
  53.         byte[] responseBytes = JsonBs.serializeBytes(response); 
  54.         ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 
  55.         ctx.writeAndFlush(responseBuffer); 
  56.         log.info("[Server] channel {} response {}", id, response); 
  57.     } 
  58.  
  59.  

客戶端

核心

客戶端可以簡化如下:

  1. channelFuture = bootstrap.group(workerGroup) 
  2.     .channel(NioSocketChannel.class) 
  3.     .option(ChannelOption.SO_KEEPALIVE, true
  4.     .handler(new ChannelInitializer<Channel>(){ 
  5.         @Override 
  6.         protected void initChannel(Channel ch) throws Exception { 
  7.             channelHandler = new RpcClientHandler(); 
  8.             ch.pipeline() 
  9.                     .addLast(new LoggingHandler(LogLevel.INFO)) 
  10.                     .addLast(channelHandler); 
  11.         } 
  12.     }) 
  13.     .connect(RpcConstant.ADDRESS, port) 
  14.     .syncUninterruptibly(); 

RpcClientHandler

客戶端的序列化/反序列化調整為直接使用 JsonBs 實現。

  1. package com.github.houbb.rpc.client.handler; 
  2.  
  3.  
  4. import com.github.houbb.json.bs.JsonBs; 
  5. import com.github.houbb.log.integration.core.Log; 
  6. import com.github.houbb.log.integration.core.LogFactory; 
  7. import com.github.houbb.rpc.client.core.RpcClient; 
  8. import com.github.houbb.rpc.common.model.CalculateResponse; 
  9.  
  10.  
  11. import io.netty.buffer.ByteBuf; 
  12. import io.netty.channel.ChannelHandlerContext; 
  13. import io.netty.channel.SimpleChannelInboundHandler; 
  14.  
  15.  
  16. /** 
  17.  * <p> 客戶端處理類 </p> 
  18.  * 
  19.  * <pre> Created: 2019/10/16 11:30 下午  </pre> 
  20.  * <pre> Project: rpc  </pre> 
  21.  * 
  22.  * @author houbinbin 
  23.  * @since 0.0.2 
  24.  */ 
  25. public class RpcClientHandler extends SimpleChannelInboundHandler { 
  26.  
  27.  
  28.     private static final Log log = LogFactory.getLog(RpcClient.class); 
  29.  
  30.  
  31.     /** 
  32.      * 響應信息 
  33.      * @since 0.0.4 
  34.      */ 
  35.     private CalculateResponse response; 
  36.  
  37.  
  38.     @Override 
  39.     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
  40.         ByteBuf byteBuf = (ByteBuf)msg; 
  41.         byte[] bytes = new byte[byteBuf.readableBytes()]; 
  42.         byteBuf.readBytes(bytes); 
  43.  
  44.  
  45.         this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class); 
  46.         log.info("[Client] response is :{}", response); 
  47.     } 
  48.  
  49.  
  50.     @Override 
  51.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  52.         // 每次用完要關閉,不然拿不到response,我也不知道為啥(目測得了解netty才行) 
  53.         // 個人理解:如果不關閉,則永遠會被阻塞。 
  54.         ctx.flush(); 
  55.         ctx.close(); 
  56.     } 
  57.  
  58.  
  59.     public CalculateResponse getResponse() { 
  60.         return response; 
  61.     } 
  62.  
  63.  

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2021-10-13 08:21:52

Java websocket Java 基礎

2021-10-29 08:07:30

Java timeout Java 基礎

2021-10-19 08:58:48

Java 語言 Java 基礎

2021-10-21 08:21:10

Java Reflect Java 基礎

2019-09-23 19:30:27

reduxreact.js前端

2021-10-14 08:39:17

Java Netty Java 基礎

2017-02-10 09:30:33

數據化運營流量

2015-11-17 16:11:07

Code Review

2019-01-18 12:39:45

云計算PaaS公有云

2018-04-18 07:01:59

Docker容器虛擬機

2024-12-06 17:02:26

2020-07-02 15:32:23

Kubernetes容器架構

2022-08-06 08:41:18

序列化反序列化Hessian

2018-03-19 10:20:23

Java序列化反序列化

2010-05-26 17:35:08

配置Xcode SVN

2018-09-14 17:16:22

云計算軟件計算機網絡

2024-05-15 14:29:45

2021-10-27 08:10:15

Java 客戶端 Java 基礎

2023-03-06 07:28:57

RPC框架序列化

2013-03-11 13:55:03

JavaJSON
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 四虎影院新地址 | 国产精品一二三区 | 日韩成人在线播放 | 中文字幕日韩专区 | 91精品国产91久久综合桃花 | 日韩欧美精品一区 | 欧美一区二区成人 | 午夜精品影院 | 伊人精品在线 | 国产精品日韩一区二区 | 日本三级做a全过程在线观看 | 成人欧美一区二区三区在线观看 | 久久精品1 | 羞羞色视频 | 伊人青青久久 | 午夜影院在线观看 | 精品在线一区 | 亚洲成人一区二区 | 久久在线| 久久精品亚洲精品 | 国产一级一级毛片 | 91精品综合久久久久久五月天 | 一区二区三区在线 | 欧 | av福利网站 | 欧美精品一区二区在线观看 | 亚洲男人网| 成人福利视频网站 | 日韩成人免费 | 中文字幕国产精品 | aa级毛片毛片免费观看久 | 亚洲视频在线免费 | 日韩二区三区 | 久久精品色欧美aⅴ一区二区 | 欧洲一级毛片 | 第一色在线 | jlzzjlzz国产精品久久 | 日韩视频在线免费观看 | 亚洲视频免费在线播放 | 欧美韩一区二区 | 欧美二区在线 | 国产精品久久久久久吹潮 |