我向《RocketMQ技術內幕》一書的創始人請教了一個問題
是這樣的,我在學習rocketmq的時候遇到了一個奇怪的問題,就是同一個消費者組內的消費者訂閱同一個主題topic,不同的tag的時候看到一個消息丟失的問題
這個問題我也是向《RocketMQ技術內幕》一書的作者丁威大哥,然后他給我解釋了我對于這個問題的困惑,我來給大家解釋一下
先給大家描述一下這個具體的內容
兩個一樣的Consumer Group的Consumer訂閱同一個Topic,但是是不同的tag,Consumer1訂閱Topic的tag1,Consumer2訂閱Topic的tag2,然后分別啟動。
這時候往Topic的tag1里發送10條數據,Topic的tag2里發送10條。目測應該是Consumer1和Consumer2分別收到對應的10條消息。結果卻是只有Consumer2收到了消息,而且只收到了4-6條消息,不固定。
MQ底層數據結構之精妙
RocketMQ專門按照Topic為每一個topic建立索引,方便消費端按照topic進行消費,其具體實現為消息隊列。
在RocketMQ中,ConsumeQueue的引入并不是為了提高消息寫入的性能,而是為消費服務的。
消息消費隊列中的每一個條目是一個定長的,設計極具技巧性,其每個條目使用固定長度(8字節commitlog物理偏移量、4字節消息長度、8字節tag hashcode),這里不是存儲tag的原始字符串,而是存儲hashcode。
目的就是確保每個條目的長度固定,可以使用訪問類似數組下標的方式來快速定位條目,極大的提高了ConsumeQueue文件的讀取性能,這樣根據消費進度去訪問消息的方法為使用邏輯偏移量logicOffset * 20即可找到該條目的起始偏移量(consumequeue文件中的偏移量),然后讀取該偏移量后20個字節即得到了一個條目,無需遍歷consumequeue文件。
關于RocketMQ中的三個文件,來幫助RocketMQ完成如此高效率的偉業,我也寫了一個文章來介紹這三個文件,大家可以看一下通過這三個文件徹底搞懂rocketmq的存儲原理
消息過濾實現機制
消費端隊列存儲的是 tag 的 hashcode,眾所周知,不同的字符串得到的hashcode值可能一樣,故在服務端是無法精確對消息進行過濾的,所以在RocketMQ中會進行兩次消息過濾。
當客戶端向服務端拉取消息時,服務端在返回消息之前,會先根據hashcode進行過濾,然后客戶端收到服務端的消息后,再根據消息的tag字符串進行精確過濾。
上面的原理很好理解呀,那為什么會丟失消息呢?這其實和消息隊列負載機制有關。
在RocketMQ中使用集群模式消費時,同一個消費組中的多個消費者共同完成主題中的隊列的消費,即一個消費者只會分配到其中某幾個隊列,并且同一時間,一個隊列只會分配給一個消費者,這樣結合上面的的過濾機制,就會明顯有問題,請看示例圖:
其問題的核心關鍵是,同一個tag會分布在不同的隊列中,但消費者C1分配到的隊列為q0,q1,q0,q1中有taga和tagb的消息,但tagb的消息會被消費者C1過濾,但這部分消息卻不會被C2消費,造成了消息丟失。
所以在RocketMQ中,一個消費組內的所有消費這,其訂閱關系必須保持一致。
我們再來回過頭看這個問題
首先這是Broker決定的,而不是Consumer端決定的
Consumer端發心跳給Broker,Broker收到后存到consumerTable里(就是個Map),key是GroupName,value是ConsumerGroupInfo。
ConsumerGroupInfo里面是包含topic等信息的,但是問題就出在上一步驟,key是groupName,你同GroupName的話Broker心跳最后收到的Consumer會覆蓋前者的。相當于如下代碼:
map.put(groupName, ConsumerGroupInfo);
這樣同key,肯定產生了覆蓋。所以Consumer1不會收到任何消息,但是Consumer2為什么只收到了一半(不固定)消息呢?
那是因為:你是集群模式消費,它會負載均衡分配到各個節點去消費,所以一半消息(不固定個數)跑到了Consumer1上,結果Consumer1訂閱的是tag1,所以不會任何輸出。
如果換成BROADCASTING,那絕逼后者會收到全部消息,而不是一半,因為廣播是廣播全部Consumer。
/**
* Consumer信息
*/
public class ConsumerGroupInfo {
// 組名
private final String groupName;
// topic信息,比如topic、tag等
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
// 客戶端信息,比如clientId等
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
// PULL/PUSH
private volatile ConsumeType consumeType;
// 消費模式:BROADCASTING/CLUSTERING
private volatile MessageModel messageModel;
// 消費到哪了
private volatile ConsumeFromWhere consumeFromWhere;
}
/**
* 通過心跳將Consumer信息注冊到Broker端。
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
// consumerTable:維護所有的Consumer
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 如果沒有Consumer,則put到map里
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
// put到map里
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 更新Consumer信息,客戶端信息
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 更新訂閱Topic信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
從這一步可以看出消費者信息是以groupName為key,ConsumerGroupInfo為value存到map(consumerTable)里的,那很明顯了,后者肯定會覆蓋前者的,因為key是一樣的。
而后者的tag是tag2,那肯定覆蓋了前者的tag1,這部分是存到ConsumerGroupInfo的subscriptionTable里面的。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData包含了topic等信息
public class SubscriptionData implements Comparable<SubscriptionData> {
// topic
private String topic;
private String subString;
// tags
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
}
其實到這里,這個問題已經算是解決了七八成了,等同于是后來的消費者的注冊信息會把之前的消費者的注冊信息覆蓋掉,這也就導致了上述出現的現象。
先啟動訂閱了tag1的消費者,然后啟動了訂閱了tag2的消費者,這時最新的心跳信息是來源于tag2的這個消費者,這就導致了這個消費者的訂閱信息會覆蓋掉之前的訂閱信息,這是因為在RocketMQ中會認為同一個消費者組的消費者的訂閱信息是需要保持一致的,如果不保持一致是不被允許的做法。
如果真有那種,你去新建一個topic不就好了,或者新建一個消費者組不就好了,在使用的過程中一定要保持消費者組的訂閱信息保持一致。
這也就導致了發送者發送的tag1的消息壓根不會被這個消費者接收到,而兩個消費者自然不會消費這個的消息。
而為什么只收到tag2的部分消息
這是因為rocketMQ默認采用的是集群消費的模式,也就是生產者的消息會通過負載均衡將消息均勻的發送到多個consumerqueue隊列中,默認是4個,也就是我們啟動的兩個消費者會分別監聽兩個consumerqueue隊列
這也就意味著有大約一半的tag2的消息會被運送到消費者1的機器上消費,而消費者1監聽的是tag1,不滿足消息的條件,所以監聽不到消息
topic和tag信息是如何覆蓋的
/**
* 其實很簡單,就是以topic為key,SubscriptionData為value。而SubscriptionData里包含了tags信息,所以直接覆蓋掉
*/
public boolean updateSubscription(final Set<SubscriptionData> subList) {
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
} else if (sub.getSubVersion() > old.getSubVersion()) {
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
}
本文參考文章:
https://codingw.blog.csdn.net/article/details/116299837。
https://dalin.blog.csdn.net/article/details/107241375。