成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Netty是如何解析Redis的RESP協議—響應篇

數據庫 其他數據庫
經過上面的層層處理,foobar 這個 FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?可以看到這里繼承了 DefaultChannelPipeline,并重寫了 onUnhandledInboundMessage 方法。

這篇是響應篇,一起來看看  RedisDecoderTest 中,是怎么模擬 client-cli 接受處理 server 響應的??

RedisDecoderTest

public class RedisDecoderTest {
    public static void main(String[] args) {
         EmbeddedChannel channel = newChannel(false);


        System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
        System.out.println(channel.writeInbound(byteBufOf("\n")));


        RedisMessage msg = channel.readInbound();
        System.out.println(msg instanceof FullBulkStringRedisMessage);

        String bytes = stringOf(((FullBulkStringRedisMessage) msg).content());
        System.out.println(bytes);


        ReferenceCountUtil.release(msg);

        channel.finish();
    }
    private static EmbeddedChannel newChannel(boolean decodeInlineCommands) {
        return new EmbeddedChannel(
                new RedisDecoder(decodeInlineCommands),
                new RedisBulkStringAggregator(),
                new RedisArrayAggregator());
    }
}

圖解

這里的重點就是這 3 個 ChannelInboundHandler 了。

圖片圖片

具備 decode 能力 ??

圖片圖片

下面進入源碼解讀:

何時調用到 decode 方法

當進行 channelRead  時進行 decode,比如  MessageToMessageDecoder  ??

圖片圖片

RedisDecoder

里面定義了 5 種 State

圖片圖片

比如上面例子中,傳輸的  $6\r\nfoobar\r\n  ,就屬于 RESP 協議中的 Bulk strings  大字符串,需要解析出 length 和 content,格式如下 :

$<length>\r\n<data>\r\n
比如
$5\r\nhello\r\n
$0\r\n\r\n

關鍵步驟

圖片圖片

decode 時,由于默認的 state 都是  DECODE_TYPE ,所以會先調用 decodeType 方法。

圖片圖片

decodeType

看看是不是 inline 的,默認是 false,我們也是設置了 false。

圖片圖片

decodeLength

圖片圖片

這里可以看到官網 Fast to parse 的影子。

圖片圖片

圖片圖片

decodeBulkString

創建 BulkStringHeaderRedisMessage,再把 state 切換到 DECODE_BULK_STRING_CONTENT ,最后調用 decodeBulkStringContent 。

圖片圖片

decodeBulkStringContent

創建 DefaultBulkStringRedisContent,并添加到 out 這個 list 中(2個)

圖片圖片

接著,就來到第二個 handler 了 ,RedisBulkStringAggregator

RedisBulkStringAggregator

起到一個聚合的作用,將消息包裝成 FullBulkStringRedisMessage。

圖片圖片

這個 decode 方法超過 100 行了,就粗略講一下。

在上面的方法中,我們往 out 中添加了 BulkStringHeaderRedisMessage 和 DefaultBulkStringRedisContent 這兩個。

圖片圖片

消息頭處理

先處理 BulkStringHeaderRedisMessage ,

圖片圖片

包裝成 FullBulkStringRedisMessage 。

圖片圖片

消息體處理

圖片圖片

appendPartialContent,把這個 ByteBuf 整合到 CompositeByteBuf 中。

圖片圖片

aggregate,擴展方法,目前是空實現。

最后,判斷是不是消息尾

圖片圖片

到了這里,handler 就處理完了,因為這個消息不是數組類型的,用不到 RedisArrayAggregator 。

第二次 writeInbound

上面代碼中共調用了兩次 writeInbound

System.out.println(channel.writeInbound(byteBufOf("$6\r\nfoobar\r")));
 System.out.println(channel.writeInbound(byteBufOf("\n")));

第二次時,會把之前的 bytebuf 拿出來計算下。

圖片圖片

可以看到,oldBytes 是 \r ,newBytes 則是 \n ,重新組合成新的 ByteBuf。

圖片圖片

這樣才能去創建這個 DefaultLastBulkStringRedisContent

圖片圖片

進而完成  RedisBulkStringAggregator 中的 last 條件分支。

圖片圖片

最后消息被包裝成 FullBulkStringRedisMessage。

尾節點  TailContext

經過上面的層層處理,foobar 這個 FullBulkStringRedisMessage 消息是怎么存到 EmbeddedChannel 中呢?

可以看到這里繼承了 DefaultChannelPipeline,并重寫了 onUnhandledInboundMessage 方法。

圖片圖片

DefaultChannelPipeline 中有尾節點 TailContext,它會去調用這個 onUnhandledInboundMessage 。

圖片圖片

進而將消息存到隊列中。

圖片圖片

最后,readInbound 就是從里面 poll 出來這個消息,再進行打印等操作即可。

圖片圖片

官方例子

我從 Netty 的 example 里 CV 了一份,大家可以快速上手。

使用時,主要還是注意這個 inbound ,outbound 的順序問題(如圖)。

圖片圖片

/**
 * Simple Redis client that demonstrates Redis commands against a Redis server.
 */
public class RedisClient {
    private static final String HOST = System.getProperty("host", "192.168.200.128");
    private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new RedisDecoder());
                     p.addLast(new RedisBulkStringAggregator());
                     p.addLast(new RedisArrayAggregator());
                     p.addLast(new RedisEncoder());
                     p.addLast(new RedisClientHandler());
                 }
             });

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();

            // Read commands from the stdin.
            System.out.println("Enter Redis commands (quit to end)");
            ChannelFuture lastWriteFuture = null;
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (line == null || "quit".equalsIgnoreCase(line)) { // EOF or "quit"
                    ch.close().sync();
                    break;
                } else if (line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
                // Sends the received line to the server.
                lastWriteFuture = ch.writeAndFlush(line);
                lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.err.print("write failed: ");
                            future.cause().printStackTrace(System.err);
                        }
                    }
                });
            }

            // Wait until all messages are flushed before closing the channel.
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            group.shutdownGracefully();
        }
    }
}


/**
 * An example Redis client handler. This handler read input from STDIN and write output to STDOUT.
 */
public class RedisClientHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        String[] commands = ((String) msg).split("\\s+");
        List<RedisMessage> children = new ArrayList<RedisMessage>(commands.length);
        for (String cmdString : commands) {
            children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
        }
        RedisMessage request = new ArrayRedisMessage(children);
        ctx.write(request, promise);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RedisMessage redisMessage = (RedisMessage) msg;
        printAggregatedRedisResponse(redisMessage);
        ReferenceCountUtil.release(redisMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }

    private static void printAggregatedRedisResponse(RedisMessage msg) {
        if (msg instanceof SimpleStringRedisMessage) {
            System.out.println(((SimpleStringRedisMessage) msg).content());
        } else if (msg instanceof ErrorRedisMessage) {
            System.out.println(((ErrorRedisMessage) msg).content());
        } else if (msg instanceof IntegerRedisMessage) {
            System.out.println(((IntegerRedisMessage) msg).value());
        } else if (msg instanceof FullBulkStringRedisMessage) {
            System.out.println(getString((FullBulkStringRedisMessage) msg));
        } else if (msg instanceof ArrayRedisMessage) {
            for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                printAggregatedRedisResponse(child);
            }
        } else {
            throw new CodecException("unknown message type: " + msg);
        }
    }

    private static String getString(FullBulkStringRedisMessage msg) {
        if (msg.isNull()) {
            return "(null)";
        }
        return msg.content().toString(CharsetUtil.UTF_8);
    }
}

結尾

這篇比請求篇稍微復雜些,還有 TailContext 這個隱藏的細節。

責任編輯:武曉燕 來源: Java4ye
相關推薦

2024-05-20 08:45:46

2021-07-15 10:35:16

NettyTCPJava

2024-08-16 21:47:18

2018-07-06 15:58:34

SpringSchemaJava

2011-08-24 10:41:04

網絡協議DNSARP協議

2009-04-03 08:26:02

2022-05-16 08:22:37

零拷貝Netty

2020-12-15 08:03:57

Mybatis配置文件

2013-03-11 10:17:13

路由協議路由器設置網絡連接設置

2013-02-26 17:31:50

思科路由協議路由器設置

2024-06-12 13:36:24

2010-08-02 16:56:03

ICMP協議

2021-08-03 14:29:30

ARPANET互聯網協議TCP

2021-07-08 21:19:04

BashLinux

2023-11-09 23:31:02

C++函數調用

2017-11-09 10:42:11

Nginx負載均衡策略

2021-01-30 19:35:44

HDFS單點Hadoop

2020-12-29 08:34:08

spring循環依賴開發

2024-06-03 08:09:46

2018-11-05 08:10:30

Netty架構模型
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲bt 欧美bt 日本bt | 日本在线一区二区三区 | 99精品欧美一区二区三区综合在线 | 国产免费高清 | 成人欧美一区二区三区在线观看 | 免费性视频 | 成人二区| 亚洲小视频在线播放 | 91极品视频 | 亚洲福利一区二区 | 久久1区 | 精品一二区 | 亚洲欧美在线免费观看 | 伊人伊成久久人综合网站 | 国产精品欧美一区二区三区不卡 | jlzzjlzz欧美大全 | 亚洲a在线观看 | 九九九久久国产免费 | 羞羞视频在线观看 | 91精品国产综合久久久久久漫画 | 国产成人高清在线观看 | 中文字幕一区二区三区在线观看 | 亚洲成人精品国产 | 成人午夜高清 | 日批日韩在线观看 | 久久毛片 | 精品国产乱码久久久久久88av | 玖玖爱365 | 国产农村妇女精品一二区 | 国产精品久久久久久久久久久久午夜片 | 日本免费一区二区三区 | 热99在线 | 一级黄色片网址 | 亚洲网址在线观看 | 二区三区视频 | 综合五月婷 | 一区二区在线免费播放 | 国产欧美一区二区三区在线看蜜臀 | 国产片侵犯亲女视频播放 | 91视频进入 | 国内激情av片 |