Netty 如何駕馭 TCP 流式傳輸?粘包拆包問題全解與編解碼器優秀實踐
當Netty涉及網絡IO數據傳輸時,可能會涉及到下面這些面試題:
- 什么是TCP粘包和拆包?為什么UDP不會出現這個問題?
- 發生粘包和拆包的原因是什么?
- Netty是如何解決TCP粘包和拆包的?
一、詳解TCP粘包拆包問題
1. 問題復現
在正式講解問題之前,我們先來看一段示例,查看TCP粘包和拆包問題是如何發生的,下面這兩段代碼分別是服務端配置和業務處理器,它會在與客戶端建立連接之后,不斷輸出客戶端發送的數據:
public class NettyServer {
public static void main(String[] args) {
// 啟動一個netty服務端需要指定 線程模型 IO模型 業務處理邏輯
// 引導類負責引導服務端啟動工作
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下兩個對象可以看做是兩個線程組
// 負責監聽端口,接受新的連接
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 負責處理每一個連接讀寫的線程組
NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
// 配置線程組并指定NIO模型
serverBootstrap.group(bossGroup, workerGroup)
//設置IO模型,這里為NioServerSocketChannel,建議Linux服務器使用 EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
// 定義后續每個連接的數據讀寫,對于業務處理邏輯
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast(new FirstServerHandler());
}
});
bind(serverBootstrap, 8888);
}
/**
* 以端口號遞增的形式嘗試綁定端口號
*/
private static void bind(ServerBootstrap serverBootstrap, int port) {
serverBootstrap.bind(port);
}
}
服務端業務處理器核心代碼,邏輯也非常簡單,收到消息后直接打印輸出:
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到客戶端數據后會回調該方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(DateUtil.now() + ": 服務端讀到數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));
}
}
我們再來看看客戶端的業務處理器和配置類,業務處理器的代碼非常簡單,在建立連接后連續發送1000條數據,數據內容為:hello Netty Server!:
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
/**
* 客戶端連接服務端成功后會回調該方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1000; i++) {
// 獲取數據
ByteBuf byteBuf = getByteBuf(ctx);
// 把數據寫到服務端
ctx.channel().writeAndFlush(byteBuf);
}
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "hello Netty Server!".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
而配置類也是固定模板:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 整體即完成netty客戶端需要指定線程模型、IO模型、業務處理邏輯
// 負責客戶端的啟動
Bootstrap bootstrap = new Bootstrap();
// 客戶端的線程模型
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 指定線程組
bootstrap.group(workerGroup)
//指定NIO模型
.channel(NioSocketChannel.class)
// IO處理邏輯
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new FirstClientHandler());
}
});
// 建立連接
connect(bootstrap, "127.0.0.1", 8888);
}
/**
* 建立連接的方法,使用監聽器來進行重試
*/
private static Channel connect(Bootstrap bootstrap, String host, int port) {
return bootstrap.connect(host, port).channel();
}
}
將服務端和客戶端啟動后,我們可以看到下面這段輸出,可以看到大量的hello Netty Server!數據粘在一起構成一個個粘包。
2023-08-29 09:09:24: 服務端讀到數據 -> hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Server!hello Netty Serve
2023-08-29 09:09:24: 服務端讀到數據 -> r!hello Netty Server!hello Netty Server!hello Netty Ser
2. 原因剖析
在TCP編程中,在服務端與客戶端通信時消息都會有固定的消息格式,這種格式我們通常稱之為protocol即協議,例如我們常見的應用層協議:HTTP、FTP等。
而上述例子出現粘包的原因本質就是我們服務端與客戶端進行通信時,沒有確認協議的規范,因為TCP是面向連接、面向流的協議,它會因為各種原因導致完整的數據包被拆封無數個小的數據包進行發送,進而導致接收方收到數據后無法正確的處理數據,出現粘包和拆包:
而出現TCP數據包被拆分的原因大致有3個:
- socket緩沖區與滑動窗口
- nagle算法
- mss
先來說說socket緩沖區和滑動窗口的共同作用,我們都知道TCP是全雙工、面向流的協議。這意味發送時必須要保證收發正常,所以TCP就提出了一個滑動窗口機制,即以滑動窗口的大小為單位,讓雙方基于這個窗口的大小進行數據收發,發送方只有在滑動窗口以內的數據才能被發送,接收方也只有在窗口以內的數據被接收和處理,只有接收方的滑動窗口收到發送方的數據,且處理完成并發送確認信號ACK之后,發送方的窗口才能繼續向后移動:
由于TCP是面向流的協議,在此期間雙方收發的數據也都會會存放到socket緩沖區中。這意味這連個緩沖區是無法知曉這些數據是否屬于同一個數據包的。 同理socket緩沖區也分為發送緩沖區(SO_SNDBUF )和接收緩沖區(SO_RCVBUF),所有socket需要發送的數據也都是存放到socket的緩沖區中然后通過內核函數傳到內核協議棧進行數據發送,socket接收緩沖區也是通過操作系統的內核函數將數據拷貝至socket緩沖區。
所以。socket緩沖區和滑動窗口機制共同作用下就會出現以下兩種異常情況:
(1) 發送方發送的數據達到了滑動窗口的限制,停止發送,接收方的socket緩沖區拿到這些數據后,直接向應用層傳輸,因為包不是完整的,從接收方的角度來看,出現了拆包。
(2) 發送方發送多個數據包到接收方緩沖區,因為接收方socket緩沖區無法及時處理,導致真正開始處理時無法知曉數據包的邊界,只能一次性將數據包向上傳遞,導致粘包。
再來說說Nagle算法,考慮到每次發送數據包時都需要為數據加上TCP Header20字節和IP header 20字節,以及還得等待發送方的ACK確認包,這就很可能出現下面這種非常浪費網絡資源的情況:
為了1個字節的有用信息去組裝10字節的頭部信息!
對此,操作系統為了盡可能的利用網絡帶寬,就提出了Nagle算法,該算法要求所有已發送出去的小數據包(長度小于MSS)必須等到接收方的都回復ack信號之后,然后再將這些小數據段一并打包成一個打包發送,從而盡可能利用帶寬及盡可能避免因為大量小的網絡包的傳輸造成網絡擁塞。
很明顯如果將多個小的數據包合并發送,接收方也很可能因為無法確認數據包的邊界而出現粘包或拆包問題:
最后就是mss,也就是Maximum Segement Size的縮寫,代表傳輸一次性可以發送的數據最大長度,如果數據超過MSS的最大值,那么網絡數據包就會被拆成多個小包發送,這種情況下也很可能因為零零散散的數據包發送而會出現粘包和拆包問題。
對此我們不妨通過WireShark進行抓包分析,基于服務端端口鍵入如下指令進行過濾:
ip.src==127.0.0.1 and ip.dst==127.0.0.1 and tcp.port==8888
啟動客戶端和服務端之后,發現雙方交換得出的MSS遠大于每次發送的數據大小,所以首先排除分包問題:
查看每次服務端發送的數據,無論大小還是內容都沒有缺失,內核緩沖區空間也是充足的,所以原因很明顯,因為TCP協議是面向流傳輸,接收方從內核緩沖區讀取時,拿到了過多或者過少的數據導致粘包或拆包。
二、半包粘包的解決對策
1. 幾種解決對策簡介
其實上述的問題的原因都是因為TCP是面向流的協議,導致了數據包無法被正常切割成一個個正常數據包的流。就以上面的數據包為例,發送的數據為hello Netty Server!,其實我們做到下面這幾種分割方式:
- 如果發送的數據都是以"!"結尾,那我們的分割時就判斷收到的流是否包含"!",只有包含時再將數據裝成數據包發送。
- 上述發送的數據長度為19,我們也可以規定發送的數據長度為19字節,一旦收到的數據達到19個字節之后,就組裝成一個數據包。
- 自定義一個協議,要求發送方根據協議要求組裝數據包發送,例如要求數據包包含長度length和data兩個字段,其中length記錄數據包長度,以上述數據為例,這個字段的值為19,而data包含的就是數據內容。
2. 基于分隔符的解碼器DelimiterBasedFrameDecoder
先來看看基于分隔符的,可以看到每一個數據末尾都有一個感嘆號,所以我們可以通過判斷特殊符號完成數據拆包。
代碼如下,我們基于DelimiterBasedFrameDecoder完成基于特殊分隔符進行拆包,每個參數對應含義為:
- 數據包最大長度。
- 解碼時是否去掉分隔符。
- 分隔符。
ByteBuf delimiter = Unpooled.copiedBuffer("!".getBytes());
nioSocketChannel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,false,delimiter))
.addLast(new FirstServerHandler());
啟動之后可以看到問題也得以解決:
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
2023-08-29 09:19:44: 服務端讀到數據 -> hello Netty Server!
3. 基于數據長度的解碼器FixedLengthFrameDecoder
同理,我們也可以基于數據長度,對數據包進行分割:
由上文可知,我們發送的數據長度都是19,所以第一種方案是在服務端的pipeline配置一個基于長度拆包的解碼器,確保在每19個字節截取一次以確保數據包可以正確讀取和解析。 所以我們在pipeline添加一個FixedLengthFrameDecoder,長度設置為19。
nioSocketChannel.pipeline()
.addLast(new FixedLengthFrameDecoder(19))
.addLast(new FirstServerHandler());
4. 基于協議長度字段的解碼器LengthFieldBasedFrameDecoder
最后一種,也是筆者比較推薦的一種長度,即自定義協議,我們在傳輸過程中,可能數據的長度或者分隔符都無法保證,所以我們可以和客戶端協商一下,在傳輸的數據頭部添加一個數據包長度,例如用4字節表示數據包長度。
所以客戶端建立連接后寫數據的代碼就改為:
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "hello Netty Server!".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
//4個字節說明數據的長度
buffer.writeInt(bytes.length);
//寫入數據內容
buffer.writeBytes(bytes);
return buffer;
}
最終的數據包結構如下圖所示:
圖片
而服務端的處理器則改為使用LengthFieldBasedFrameDecoder,構造方法如下:
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
//.......
}
按照對應參數含義為:
- maxFrameLength:數據包最大長度,這里我們設置為Integer.MAX_VALUE,等同于不限制。
- lengthFieldOffset:該數值代表獲取描述數據包長度的字段的位置偏移量,以我們的數據包為例,就是0,即從最初始的位置讀取長度。
- lengthFieldLength:描述數據包長度的字段的字節數,以我們的數據包為例就是4字節。
- lengthAdjustment:要添加到長度字段值的補償值,這個字段比較有意思,我們還是舉個例子說明,以下面這個數據包為例,假如我們需要得到data的數據,而長度記錄的值為12字節(head+length+data),為了達到我們的預期即只取10字節的數據,我們就可以基于將這個字段的值設置為-2,將12減去10以得到實際的data數據長度。
對應的我們本次數據包長度記錄的值沒有錯,這里直接直接設置為0,無需調整。
- initialBytesToStrip:讀取時需要跳過數據包幾個字節,以我們的數據包為例就說4,代表我們要跳過4字節的length字段,只要data的數據,對應的我們也給出下面這個構造方法:
于是我們就有了下面這樣一個構造的解碼器,再次進行壓測后數據都是可以正常解析處理的:
nioSocketChannel.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4))
.addLast(new FirstServerHandler());
5. 更多關于Netty內置解碼器
設計者也在注釋上為我們提供更多的使用案例,先來看看第一個示例,該數據包長度字段2字節,偏移量為0。假如我們希望讀整個數據包,那么參數設置方式為:
- lengthFieldOffset即偏移量設置為0,即長度字段無需偏移就在數據包高位。
- lengthFieldLength為2,即讀取2字節的數據,即可獲得數據包長度。
- lengthAdjustment 為0,代表長度字段描述的數據就是后續數據的長度,無需調整。
- initialBytesToStrip 為0,即讀取時從數據包最開始位置讀取并加上長度字段里描述的長度的數據,無需跳過。
* <b>lengthFieldOffset</b> = <b>0</b>
* <b>lengthFieldLength</b> = <b>2</b>
* lengthAdjustment = 0
* initialBytesToStrip = 0 (= do not strip header)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* </pre>
再來看看示例2,數據包和上文相同,只不過希望讀取的數據不包含length字段,所以參數設置為:
- lengthFieldOffset即偏移量設置為0,即長度字段無需偏移就在數據包高位。
- lengthFieldLength為2,即讀取2字節的數據,即可獲得數據包長度。
- lengthAdjustment 為0,代表長度字段描述的數據就是后續數據的長度,無需調整。
- initialBytesToStrip 為2,即讀取時從數據包起始位置開始,跳過2字節數據,即跳過length字段。
* lengthFieldOffset = 0
* lengthFieldLength = 2
* lengthAdjustment = 0
* <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
*
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
* </pre>
再來看看情況3,2字節長度描述長度,只不過該長度包含了描述長度的字段長度,即length的值為length字段長度2+后續HELLO, WORLD字符串長度為14。如果我們希望獲取一個完整的數據包,那么參數就需要設置為:
- lengthFieldOffset即偏移量設置為0,即長度字段無需偏移就在數據包高位。
- lengthFieldLength為2,即讀取2字節的數據,即可獲得數據包長度。
- lengthAdjustment 為-2,代表長度字段描述的是整個包的長度,需要減去length字段的長度。
- initialBytesToStrip 為0,即讀取時從數據包最開始位置讀取并加上長度字段里描述的長度的數據,無需跳過。
* lengthFieldOffset = 0
* lengthFieldLength = 2
* <b>lengthAdjustment</b> = <b>-2</b> (= the length of the Length field)
* initialBytesToStrip = 0
*
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
* </pre>
示例4需要跳過header字段讀取到長度字段,最后需要得到一個包含所有部分的數據包,所以參數如下:
- lengthFieldOffset即偏移量設置為2,即跳過Header 。
- lengthFieldLength為3,即讀取3字節的數據,即可獲得數據包長度。
- lengthAdjustment 為0,代表長度字段描述的是就是后續數據的長度,無需調整。
- initialBytesToStrip 為0,即讀取時從數據包最開始位置讀取并加上長度字段里描述的長度的數據,無需跳過。
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* </pre>
示例5情況比較特殊,length描述后文數據的長度,卻不包含后文header的長度,若我們希望獲取到所有部分的數據包,則參數需要設置為:
- lengthFieldOffset即偏移量設置為0,即無需偏移,長度就在數據包高位。
- lengthFieldLength為3,即讀取3字節的數據,即可獲得數據包長度。
- lengthAdjustment 為2,即代表length字段僅僅記錄的Actual Content的長度,length字段后面還有一個header的長度需要計算,故設置為2,意味實際長度要+2。
- initialBytesToStrip 為0,即讀取時從數據包最開始位置讀取并加上長度字段里描述的長度的數據,無需跳過。
* <pre>
* lengthFieldOffset = 0
* lengthFieldLength = 3
* <b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1)
* initialBytesToStrip = 0
*
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
* </pre>
示例6,長度在hdr1后面,并且最終讀取的數據是hdr2和Actual Content。參數設置為:
- lengthFieldOffset即偏移量設置為1,即跳過HDR1。
- lengthFieldLength為2,即讀取2字節的數據,即可獲得數據包長度。
- lengthAdjustment 為1,即代表length字段僅僅記錄的Actual Content的長度,length字段后面還有一個HDR2 的長度需要計算,故設置為1,意味實際長度要+1。
- initialBytesToStrip 為3,即跳過HDR1和length開始讀取。
* lengthFieldOffset = 1 (= the length of HDR1)
* lengthFieldLength = 2
* <b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2)
* <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
*
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
* </pre>
示例7即可Length記錄的是整個包的長度,為了拿到HDR2和Actual Content的數據,對應參數設置如下:
- lengthFieldOffset即偏移量設置為1,即跳過HDR1。
- lengthFieldLength為2,即讀取2字節的數據,即可獲得數據包長度。
- lengthAdjustment 為-3,即代表減去HDR1和 LEN的字段長度。
- initialBytesToStrip 為3,即跳過HDR1和length開始讀取。
* lengthFieldOffset = 1
* lengthFieldLength = 2
* <b>lengthAdjustment</b> = <b>-3</b> (= the length of HDR1 + LEN, negative)
* <b>initialBytesToStrip</b> = <b> 3</b>
*
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
* </pre>
三、小結
以上便是筆者對于Netty如何解決半包與粘包問題的源碼解析與實踐的全部內容,希望對你有幫助。