成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

RocketMQ Tag在實際業務中有什么作用?

開發
本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。

Tag 是 RocketMQ 提供的一種消息過濾機制,允許生產者在發送消息時指定一個或多個標簽,消費者則可以根據這些標簽來選擇性地消費消息。這篇文章,我們將詳細介紹 RocketMQ 中 Tag 的原理、源碼分析以及示例。

Tag 的原理

在 RocketMQ 中,Tag 主要用于消息過濾。每個消息可以攜帶一個 Tag,消費者可以根據 Tag 來訂閱特定的消息,從而實現消息的過濾和分類處理。

(1) 消息發送階段

生產者在發送消息時,可以指定一個 Tag。這個 Tag 會被附加到消息的元數據中,并存儲在 RocketMQ 的消息存儲系統中。

(2) 消息存儲階段

消息被存儲在 RocketMQ 的 Broker 中,消息的元數據(包括 Tag)也會被存儲。

(3) 消息消費階段

消費者在訂閱消息時,可以指定要消費的 Tag。Broker 會根據消費者訂閱的 Tag,將符合條件的消息投遞給消費者。

(4) 源碼分析

為了更好的理解 Tag的原理,我們通過 RocketMQ 中Tag 相關的幾個主要代碼片段進行演示。

生產者發送消息時的代碼:

// 創建消息實例,并指定Topic和Tag
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());

// 發送消息
SendResult sendResult = producer.send(msg);

在 Message 類中,Tag 是通過構造函數傳遞的,并存儲在 Message 對象的 tags 字段中。

消費者訂閱消息時的代碼:

// 創建消費者實例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");

// 訂閱Topic,并指定Tag
consumer.subscribe("TopicTest", "TagA");

// 注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 啟動消費者
consumer.start();

在 DefaultMQPushConsumer 類中,通過 subscribe 方法指定要訂閱的 Topic 和 Tag,RocketMQ 內部會根據訂閱的 Tag 進行消息過濾。

示例

下面是一個完整的示例,演示如何使用 RocketMQ 的 Tag 功能。

(1) 生產者代碼

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        // 創建生產者實例
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.setNamesrvAddr("localhost:9876");

        // 啟動生產者
        producer.start();

        // 發送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        // 關閉生產者
        producer.shutdown();
    }
}

(2) 消費者代碼

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        // 創建消費者實例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("localhost:9876");

        // 訂閱Topic,并指定Tag
        consumer.subscribe("TopicTest", "TagA");

        // 注冊消息監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 啟動消費者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

盡管 RocketMQ 的 Tag 功能在消息過濾和分類處理方面提供了極大的便利,但也有其優缺點。下面詳細分析一下:

優點

  • 簡單易用:Tag 的使用非常簡單,生產者只需在發送消息時指定 Tag,消費者在訂閱消息時指定相應的 Tag 即可。
  • 高效過濾:通過 Tag 進行消息過濾,減少了消費者處理不相關消息的開銷,從而提高了系統的性能。
  • 靈活性高:支持一個 Topic 下多個 Tag,使得消息的分類和過濾更加靈活。
  • 低延遲:Tag 過濾是在 Broker 端進行的,不會顯著增加消息傳遞的延遲。
  • 減少網絡帶寬:消費者只會接收到自己感興趣的消息,減少了不必要的網絡傳輸,從而節省了帶寬。

缺點

  • 單一維度過濾:Tag 只能提供單一維度的消息過濾,無法進行更復雜的多維度過濾。如果需要多維度過濾,需要結合其他機制(如消息屬性)來實現。
  • 有限的靈活性:Tag 的數量和種類在設計階段需要規劃好,靈活性有限。如果后期需要添加新的 Tag,可能需要重新設計和部署。
  • 不支持復雜邏輯:Tag 過濾支持的邏輯較為簡單,只能進行基于字符串匹配的過濾,無法支持復雜的過濾邏輯。
  • 管理復雜性:隨著系統規模的增大,Tag 的管理和維護可能變得復雜,尤其是在多個應用共享同一個 Topic 的情況下。
  • 潛在的性能瓶頸:雖然 Tag 過濾在大多數場景下性能良好,但在極端情況下(如大量不同 Tag 的消息和高并發消費),可能會帶來性能瓶頸。

適用場景

  • 日志和監控:不同類型的日志和監控數據可以通過 Tag 進行分類和過濾。
  • 電商系統:不同類型的訂單、商品信息等可以通過 Tag 進行分類和過濾,消費者只處理自己感興趣的消息。
  • 金融系統:不同類型的交易、通知等可以通過 Tag 進行分類和過濾,提高系統的處理效率。
  • 社交平臺:不同類型的消息(如評論、點贊、私信等)可以通過 Tag 進行分類和過濾,提供更精準的消息推送。

總結

本文分析了 RocketMQ 的 Tag 功能,它在消息過濾和分類處理方面提供了極大的便利,適用于各種需要高效、低延遲消息傳遞的場景。然而,它也有一些局限性,如單一維度過濾、管理復雜性等。

在實際應用中,需要根據具體需求和系統設計,合理使用 Tag 功能,結合其他機制來實現更復雜的消息過濾和處理。

責任編輯:趙寧寧 來源: 猿java
相關推薦

2023-06-12 07:02:53

物聯網數據決策

2019-04-28 17:39:06

大數據區塊鏈數據隱私安全

2018-11-06 10:51:07

Redis開發存儲系統

2022-03-02 14:08:35

區塊鏈供應鏈技術

2024-11-28 08:15:44

LLM大型語言模型人工智能

2010-02-25 17:22:39

WCF服務行為

2009-12-03 18:21:15

軟路由技術

2010-01-14 10:35:34

VB.NET指針

2010-01-08 18:02:33

VB.NET事件

2010-01-15 13:30:53

VB.NET Tool

2023-06-25 11:38:31

2010-01-07 16:16:03

VB.NET變量作用域

2009-11-19 15:14:43

路由器系統

2010-01-19 15:21:55

VB.NET區域性

2009-11-23 17:56:45

業務路由器

2022-03-21 08:55:53

RocketMQ客戶端過濾機制

2010-01-20 18:34:46

VB.NET Syst

2016-02-17 09:15:37

蘋果

2021-04-11 18:09:57

機器學習業務價值人工智能
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品欧美一区二区精品久久久 | 亚洲精品9999 | 在线国产99 | 一区免费观看 | 亚洲成人av | 亚洲在线一区二区 | 人妖av| 成人教育av | 欧美一区二区三区 | 亚洲视频免费一区 | 永久免费视频 | 在线中文字幕日韩 | 91文字幕巨乱亚洲香蕉 | 狠狠草视频 | 2022精品国偷自产免费观看 | 正在播放国产精品 | 久久精品中文 | 一级a性色生活片久久毛片 午夜精品在线观看 | 亚洲自拍偷拍欧美 | 日韩在线精品视频 | 久草视频在线播放 | 欧美日韩福利视频 | 激情欧美日韩一区二区 | 日韩欧美国产精品一区 | 精品videossex高潮汇编 | 成人在线观看免费爱爱 | 国产精品网址 | 视频在线日韩 | 人人鲁人人莫人人爱精品 | 久久网国产 | 国产精品免费在线 | 成人在线一区二区 | 免费av一区二区三区 | 国产剧情一区 | 中文字幕91 | 91国产在线视频在线 | 欧美精品一区二区三区在线 | 综合久久av | 精品久久久一区 | 国产高清无av久久 | 日本高清不卡视频 |