成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何基于Netty實現(xiàn)即時消息下發(fā)

開發(fā) 前端
采用輪詢拉取消息就會出現(xiàn)上面的場景,而如果采用長連接的方式,服務器可以與客戶端建立一條實時的連接,服務器有新消息可以直接推送給客戶端,不需要等待客戶端請求,這樣既保證了實時性,整個系統(tǒng)的抗壓能力也優(yōu)于大量輪詢的方式。

想象一個場景,你與女友在網(wǎng)上聊天,她問了一句:你愛我嗎?然后很忐忑地等你回答。可等了好一段時間,你才收到她的消息,趕緊回復了一句:愛,愛你一萬年。又過了好久,你女友才收到你的回復。這時,你說的是什么已經(jīng)不重要了,準備回去跪鍵盤吧。可以說這樣的用戶體驗非常的糟糕。

采用輪詢拉取消息就會出現(xiàn)上面的場景,而如果采用長連接的方式,服務器可以與客戶端建立一條實時的連接,服務器有新消息可以直接推送給客戶端,不需要等待客戶端請求,這樣既保證了實時性,整個系統(tǒng)的抗壓能力也優(yōu)于大量輪詢的方式。

什么是長連接通信

那么,什么是長連接呢?我們都知道短連接是什么,比如我們熟悉的 HTTP 協(xié)議,就是使用短連接的方式來請求數(shù)據(jù)的,它先是建立連接,然后進行數(shù)據(jù)傳輸,最后關閉連接。而且只能由客戶端主動發(fā)起請求,數(shù)據(jù)傳輸之后,連接就關閉了,服務端無法主動給客戶端發(fā)送數(shù)據(jù)。

而長連接是和短連接相對的,它的過程是:建立連接—>數(shù)據(jù)傳輸...(保持連接)……數(shù)據(jù)傳輸—>關閉連接。客戶端與服務端建立連接之后,客戶端和服務端保持住連接不斷開,就可以一直在這個連接上傳輸數(shù)據(jù),直到一方主動關閉連接。

圖片圖片

如何建立長連接通信

那怎么建立長連接通信呢?我們常見的網(wǎng)絡服務例如 Tomcat、Apache 等主要都是面向短連接的,對長連接支持不是很好。而且長連接需要服務端長期保持連接,如果有大量的連接同時在線,服務端的壓力會非常大,所以,就需要一套高性能的網(wǎng)絡框架來支撐。幸運的是,有 Netty 這樣的異步網(wǎng)絡框架來幫助我們管理連接。

你可能多少了解過 Netty,它是基于事件驅動的,易開發(fā)、易維護、高性能,完全滿足長連接通信的需求。我們使用 Netty 實現(xiàn)我們的服務端,當然也可以實現(xiàn)客戶端,但是我們的客戶端一般會根據(jù)不同的平臺采用不同的實現(xiàn)方案。

有服務端,客戶端之后,我們還不可以進行通信,因為缺少一個通信協(xié)議。我們知道,進行短連接通信的時候采用的是 HTTP 協(xié)議,而這次我們要采用 MQTT,一個物聯(lián)網(wǎng)的標準信息傳輸協(xié)議。它是一個十分輕量級的發(fā)布/訂閱模型協(xié)議,占用網(wǎng)絡帶寬極小,因為它的固定消息頭只占 2 字節(jié),已經(jīng)被廣泛應用于電信、汽車、工業(yè)制造等領域。

服務端、協(xié)議、客戶端,我們都已經(jīng)知道采用的方案了,來看下整體系統(tǒng)結構:

圖片圖片

現(xiàn)在,我們一起動手實現(xiàn)這樣一個消息下發(fā)服務端吧。我們只需要引入 Netty 的 jar 包即可,代碼如下:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.54.Final</version>
</dependency>

啟動一個 Netty 服務,如同我們正常啟動 Java 程序一樣:

public static void main(String[] args) {
    int port = 1883;
    if (args.length >= 1) {
        port = Integer.parseInt(args[1]);
    }
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(MqttEncoder.INSTANCE);
                        pipeline.addLast(new MqttDecoder());
                        //處理MQTT消息
                        pipeline.addLast(MyMqttHandlers.INSTANCE);
                    }
                });
        //啟動服務
        ChannelFuture future = serverBootstrap.bind(port).sync();
        System.out.println("MQTT server start success,port=" + port);
        future.channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

通過這段代碼我們啟動了 Netty 長連接服務,服務端口是 1883,客戶端可以通過這個端口連接到服務端,Netty 本身已經(jīng)實現(xiàn)了 TCP 連接的建立、管理以及 MQTT 協(xié)議的編碼和解碼等,我們只需要按照需求實現(xiàn)自己的業(yè)務邏輯即可。上述代碼中,我們只需要實現(xiàn) MyMqttHandlers.INSTANCE,來完成我們對客戶端連接的認證、Topic 訂閱、消息發(fā)布、心跳檢測等等。

上面的 MyMqttHandlers 類,是我們自定義的 Netty Handler,用來處理 MQTT 業(yè)務數(shù)據(jù),它繼承自 Netty 的適配器 SimpleChannelInboundHandler,僅需覆寫我們關心的方法 channelRead0,根據(jù)不同的 MQTT 報文類型做處理。

.....
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {
    switch (msg.fixedHeader().messageType()) {
        case CONNECT:
            connect(ctx, (MqttConnectMessage) msg);
            break;
        case SUBSCRIBE:
            subscribe(ctx, (MqttSubscribeMessage) msg);
            break;
        case PINGREQ:
            pingReq(ctx);
            break;
        //...處理其他報文
        default:
    }
}
....

通信報文處理

怎么處理這些報文呢?MQTT 采用的是發(fā)布訂閱模式的消息通信協(xié)議,通過交換預定義的 MQTT 控制報文來通信。這里簡單介紹下 MQTT 協(xié)議的內容,因為在我們進行編碼的時候需要解析消息內容、回復 ACK 消息、發(fā)布消息,了解消息結構,可以更好地編碼。

MQTT 控制報文由固定報頭、可變報頭、有效載荷三部分組成,具體格式如下表:

圖片

根據(jù) MQTT 3.1.1 規(guī)定,固定報頭的控制報文類型共有 14 種,我們這次主要使用 CONNECT(連接服務端)、SUBSCRIBE(訂閱主題)、PUBLISH(發(fā)布消息)、PINGRESP(心跳響應)這四種報文以及對應的 ACK 報文。

CONNECT 報文如何處理?客戶端到服務端的網(wǎng)絡連接建立后,客戶端發(fā)送給服務端的第一個報文必須是 CONNECT 報文,這個報文傳輸設備標識、用戶標識、密碼等信息,通過這個報文,服務端需判斷要不要和客戶端連接,常用的方法就是鑒權。如果校驗失敗,就可以在 ACK 報文中設置狀態(tài)碼 CONNECTION_REFUSED_xxx;檢驗成功,則設置為 CONNECTION_ACCEPTED。鑒權成功之后,我們就可以把該設備的信息入庫保存,實際場景中,我們把設備的實時狀態(tài)維護在 Redis 中,保證高的吞吐量。

代碼如下:

private void connect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
    String clientIdentifier = msg.payload().clientIdentifier();
    String userName = msg.payload().userName();
    String password = new String(msg.payload().passwordInBytes());
    //此處可以鑒權
    System.out.println(clientIdentifier + " " + userName + " " + password);
    //此處保存用戶和連接之間的關系
    ChannelManager.saveChannelMapping(clientIdentifier, ctx.channel());
    MqttFixedHeader connAckFixedHeaderRes = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
    //連接成功設置為MqttConnectReturnCode.CONNECTION_ACCEPTED,失敗可以返回其他狀態(tài)碼
    MqttConnAckVariableHeader connAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
    MqttConnAckMessage ackMessage = new MqttConnAckMessage(connAckFixedHeaderRes, connAckVariableHeader);
    ctx.channel().writeAndFlush(ackMessage);
}

SUBSCRIBE 報文一般作為 CONNECT 之后的下一個報文,客戶端上報它需要的 Topic,服務端可以根據(jù)客戶端的訂閱情況,針對性地推送消息,這個可以是廣播的 Topic(所有用戶都可以收到同一個消息的副本),也可以是點對點的(只有一個用戶收到此消息)。同時,服務端需要存儲 Topic 到 Channel 的關系。代碼如下:

private void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
    List<MqttTopicSubscription> topics = msg.payload().topicSubscriptions();
    //存儲客戶端訂閱的主題
    ChannelManager.saveTopics(ctx.channel(),
            topics.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList()));
    System.out.println("訂閱成功:" + topics);
    MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
    MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(msg.variableHeader().messageId(), null);
    MqttSubAckPayload payload = new MqttSubAckPayload();
    MqttSubAckMessage ackMessage = new MqttSubAckMessage(header, variableHeader, payload);
    ctx.writeAndFlush(ackMessage);
}

PUBLISH 報文是我們最終的目標報文,服務端需要根據(jù)客戶端訂閱的 Topic 發(fā)送這個報文,由于在處理訂閱消息時,已經(jīng)保存了 Topic 和 Channel 的映射,所以推送消息就簡單了,只需要找到 Topic 下所有的 Channel,就可以直接寫消息到 Channel 中即可,代碼如下:

List<Channel> channels = ChannelManager.listChannels(topic);
channels.forEach(channel -> {
    MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, 0);
    ByteBuf payload = Unpooled.copiedBuffer(messageData, StandardCharsets.UTF_8);
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, false, 0);
    MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(fixedHeader, variableHeader, payload);
    channel.writeAndFlush(mqttPublishMessage);
});

為何要處理 PINGREQ 報文?鏈路上如果長時間沒有數(shù)據(jù)傳輸,可能會被運營商把鏈路回收了,所以設備需要在保活期間內至少發(fā)送一個報文,如果沒有實際的數(shù)據(jù)需要傳輸,那么較小的 PINGREQ 就是最佳選擇。連接保活時間的取值范圍一般為 30 秒~1200 秒。這個可以根據(jù)實際情況,不斷調整這個值,我的選擇是是 60 秒,如果網(wǎng)絡環(huán)境好,可以設置 500 秒以上。

實際在線上運行的時候,我發(fā)現(xiàn)有些客戶端就是一直無法連接上,這時可以再結合短輪詢做個備用方案,當多次嘗試之后,無法連接上 MQTT 服務,可以暫時啟動短輪詢,保證用戶可以收到消息。

好了,到目前我們已經(jīng)處理完核心功能了,其它類型的控制報文和處理流程也類似,這里就不再贅述了,總體報文交換流程如下圖:

圖片圖片

長連接通信測試

我們來試一下效果吧,首先我們需要模擬一個客戶端,同樣的,也可以使用 Netty 實現(xiàn)一個客戶端,主要流程和服務端差不多,有一點需要注意的是,客戶端需要定時發(fā)送心跳到服務端,以保證鏈路不會因為長時間空閑被系統(tǒng)斷開。

測試流程如下:

  1. 啟動服務端,端口在 1883
  2. 啟動客戶端,連接到服務端 127.0.0.1:1883
  3. 客戶端訂閱 Topic,名稱為 demo
  4. 服務端每隔 1 秒向 demo 發(fā)送一個當前時間的消息

看下運行效果:

圖片圖片

圖片

總結

以上就是我今天的分享,通過 Netty 和 MQTT,我們可以實現(xiàn)一個高性能的消息下發(fā)系統(tǒng),當然,我今天講的是最基本的功能實現(xiàn),當連接數(shù)超過一臺機器的上限時,就需要設計一個可擴展的架構。我把整體的知識點匯總成一張思維導圖,供你參考。

圖片圖片

責任編輯:武曉燕 來源: 程序員技術充電站
相關推薦

2021-03-25 08:29:33

SpringBootWebSocket即時消息

2023-08-14 08:01:12

websocket8g用戶

2020-10-09 12:45:19

創(chuàng)建消息即時消息編程語言

2020-10-09 15:00:56

實時消息編程語言

2019-09-29 15:25:13

CockroachDBGoJavaScript

2019-10-28 20:12:40

OAuthGuard中間件編程語言

2020-03-31 12:21:20

JSON即時消息編程語言

2020-10-12 09:20:13

即時消息Access頁面編程語言

2020-10-19 16:20:38

即時消息Conversatio編程語言

2020-10-16 14:40:20

即時消息Home頁面編程語言

2015-03-18 15:37:19

社交APP場景

2020-10-10 20:51:10

即時消息編程語言

2009-06-29 09:06:42

微軟Web版MSN

2010-05-24 09:51:37

System Cent

2021-02-05 07:28:11

SpringbootNettyWebsocke

2010-05-20 17:45:46

OCS 2007 R2

2021-12-03 00:02:01

通訊工具即時

2022-08-30 11:41:53

網(wǎng)絡攻擊木馬

2021-11-24 08:55:38

代理網(wǎng)關Netty

2009-05-20 14:49:16

ibmdwAjaxWeb開發(fā)
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕亚洲精品 | 日日做夜夜爽毛片麻豆 | 古装人性做爰av网站 | 中文字幕一区二区三区精彩视频 | 国产精品成人在线观看 | 日日干夜夜操天天操 | 伊人久久大香线 | 亚洲国产成人av好男人在线观看 | 国产九九九| 精品久久久久久久久久久下田 | 亚洲乱码一区二区三区在线观看 | 欧美成年网站 | 91在线精品视频 | 久久国产欧美日韩精品 | 在线一区视频 | 亚洲免费一区二区 | 免费能直接在线观看黄的视频 | 国产精品福利网 | 久久人人爽人人爽人人片av免费 | 久久久国产精品 | 久久精品亚洲 | 精品国产亚洲一区二区三区大结局 | 日韩不卡在线观看 | 国产成都精品91一区二区三 | 久久久一区二区三区四区 | 久久久久久a | 91久久精品一区二区二区 | 97热在线 | 亚洲高清视频一区二区 | 欧美精品久久久 | 国产成人自拍一区 | 欧美精品久久久 | 久久久精品国产 | 久久99精品久久久久久国产越南 | 欧美一区不卡 | 蜜桃视频在线观看免费视频网站www | 国产精品久久久久久一区二区三区 | 精品国产一区二区国模嫣然 | 欧美a视频 | 精品一区二区三区不卡 | 精品国产网 |