成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

真香,聊聊 RocketMQ 5.0 的 POP 消費(fèi)模式!

開(kāi)發(fā) 前端
可能還存在限制 Reef 實(shí)現(xiàn)更高性能的因素,我們后續(xù)將研究 Reef 凍結(jié)期間的潛在回歸,并繼續(xù)努力使 Reef 成為迄今為止最好的 Ceph 版本!

大家好,我是君哥。

大家都知道,RocketMQ 消費(fèi)模式有 PULL 模式和 PUSH 模式,不過(guò)本質(zhì)上都是 PULL 模式,而在實(shí)際使用時(shí),一般使用 PUSH 模式。

不過(guò),RocketMQ 的 PUSH 模式有明顯的不足,主要體現(xiàn)在以下幾個(gè)方面:

  1. 消息積壓了,增加消費(fèi)者不一定能解決。PUSH 模式如下圖:

圖片

上面的圖中,消費(fèi)組中的消費(fèi)者每個(gè)消費(fèi)者消費(fèi)兩個(gè) MessageQueue,這種情況下,增加消費(fèi)者是可以提高消費(fèi)能力的。

但是下面這張圖,每個(gè)消費(fèi)者消費(fèi)一個(gè) MessageQueue,因?yàn)橥粋€(gè) MessageQueue 只能被同一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi),所以增加消費(fèi)者并不能提高消費(fèi)能力。

圖片

  1. 客戶(hù)端的處理邏輯比較多,比如負(fù)載均衡、offset 管理、消費(fèi)失敗后的處理(比如失敗消息發(fā)送回 Broker),這些邏輯都在客戶(hù)端。
  2. 如果再支持其他語(yǔ)言,客戶(hù)端會(huì)變得越來(lái)越重。
  3. 消費(fèi)者機(jī)器 hang 住,可能會(huì)導(dǎo)致消息積壓,如下圖:

圖片

通過(guò)客戶(hù)端負(fù)責(zé)均衡,MessageQueue0 這個(gè)隊(duì)列分配給了 Consumer0 進(jìn)行獨(dú)占消費(fèi),如果 Consumer0 這個(gè)消費(fèi)者 hang 住了,但是服務(wù)沒(méi)有掛,不能從 Name Server 中下線(xiàn),因?yàn)?Consumer0 拉取到的消息不能消費(fèi),也就不能給 Broker 發(fā)送更新 Offset 的請(qǐng)求,最終導(dǎo)致消息積壓。這種情況只能手動(dòng)讓 Consumer0 下線(xiàn)或者讓 Consumer0 重啟。

RocketMQ 5.0 為了解決 PUSH Consumer 上面的問(wèn)題,引入了 POP Consumer。

1 POP 客戶(hù)端

POP 模式的客戶(hù)端引入的背景是 RocketMQ 5.0 為了更好地?fù)肀г圃蛻?hù)端要改造成無(wú)狀態(tài)的輕量級(jí)客戶(hù)端,RocketMQ 4.x 中客戶(hù)端具有的負(fù)載均衡、權(quán)限管理、消費(fèi)管理等功能都從客戶(hù)端移動(dòng)到了 Proxy。

POP 消費(fèi)模式如下圖:

圖片

四個(gè)消費(fèi)者都可以消費(fèi) Broker1 和 Broker2 上面的所有隊(duì)列,這樣即使某一個(gè)消費(fèi)者 hang 住了,其他消費(fèi)者也可以消費(fèi),并不會(huì)造成消息積壓。

同時(shí),從上圖中可以看到,POP 客戶(hù)端還有一個(gè)優(yōu)勢(shì),增加消費(fèi)者數(shù)量是可以提高消費(fèi)能力的,不受 MessageQueue 數(shù)量和消費(fèi)者數(shù)量的限制。

跟 PUSH 模式相比,POP 模式拉取到消息后,會(huì)設(shè)置一個(gè) POP_CK 屬性,代碼如下:

//MQClientAPIImpl.java
if (requestHeader instanceof PopMessageRequestHeader) {
 if (startOffsetInfo == null) {
  // we should set the check point info to extraInfo field , if the command is popMsg
  // find pop ck offset
  String key = messageExt.getTopic() + messageExt.getQueueId();
  if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
   map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
    messageExt.getTopic(), brokerName, messageExt.getQueueId()));

  }
  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
 } else {
  String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
  String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
  int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
  Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);

  messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
   ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
    responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
  );
  //...
 }
}

可以看到,POP_CK 屬性包含了 brokerName、Topic、QueueId、offset 等參數(shù),通過(guò)這個(gè)屬性可以唯一標(biāo)識(shí)一條消息了。

從上面的代碼還可以看到,responseHeader 中有一個(gè) invisibleTime 屬性,這個(gè)屬性的作用是消費(fèi)者通過(guò) POP 模式拉取到一條消息后,這段時(shí)間(invisibleTime)內(nèi)這條消息在 Broker 端是不可見(jiàn)的,消費(fèi)者再次拉取就不會(huì)重復(fù)拉取到。但是如果過(guò)了這段時(shí)間,消費(fèi)者還沒(méi)有給 Broker 返回 ACK,這條消息會(huì)變?yōu)榭梢?jiàn),再次被消費(fèi)者拉取到。

消費(fèi)完成后,向 Broker 發(fā)送 ACK 消息,見(jiàn)下面代碼:

public void ackMessageAsync(
 final String addr,
 final long timeOut,
 final AckCallback ackCallback,
 final AckMessageRequestHeader requestHeader //
) throws RemotingException, MQBrokerException, InterruptedException {
 final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
 this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {

  @Override
  public void onComplete(ResponseFuture responseFuture) {
   RemotingCommand response = responseFuture.getResponseCommand();
   if (response != null) {
    try {
     AckResult ackResult = new AckResult();
     if (ResponseCode.SUCCESS == response.getCode()) {
      ackResult.setStatus(AckStatus.OK);
     } //...
     assert ackResult != null;
     ackCallback.onSuccess(ackResult);
    } //...
   } else {
    //...
   }

  }
 });
}

2. Broker

從上面的介紹可以看到,每個(gè)消費(fèi)者都可以從 Broker 的所有 MessageQueue 上拉取消息,那如果多個(gè)消費(fèi)者都從一個(gè) MessageQueue 上面拉取,有沒(méi)有可能會(huì)重復(fù)消費(fèi)呢?

Broker 收到消息拉取請(qǐng)求,從 MessageStore 拉取消息時(shí),首先會(huì)給 MessageQueue 進(jìn)行加鎖,加鎖成功后,才會(huì)拉取消息,這是其他客戶(hù)端來(lái)拉取時(shí)就會(huì)加鎖失敗。

//PopMessageProcessor.java
String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
if (!queueLockManager.tryLock(lockKey)) {
 restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
 return restNum;
}

Broker 從 MessageStore 拉取到消息后,會(huì)定義一個(gè) CheckPoint 放入緩存,代碼如下:

//PopMessageProcessor.java
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
 PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
 Channel channel, long popTime,
 ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
 StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
 String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
  requestHeader.getConsumerGroup()) : requestHeader.getTopic();
 String lockKey =
  topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
 //...
 offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
 GetMessageResult getMessageTmpResult = null;
 try {
  //...

  restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
  if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {

   if (isOrder) {
    //...
   } else {
    appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
   }
  } //...
 } //...
 return restNum;
}

Broker 收到消費(fèi)者發(fā)來(lái)的 ACK 后,會(huì)把 CheckPoint 從緩存中移除。

如果 Broker 一直沒(méi)有收到 ACK,則會(huì)把 CheckPoint 從緩存中移除,同時(shí)把 CheckPoint 發(fā)送給 MessageStore,由 MessageStore 發(fā)送到重試隊(duì)列。代碼如下:

boolean removeCk = !this.serving;
 // ck will be timeout
 if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
  removeCk = true;
 }

 // the time stayed is too long
 if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
  removeCk = true;
 }

 // double check
 if (removeCk) {
  // put buffer ak to store
  if (pointWrapper.getReviveQueueOffset() < 0) {
   putCkToStore(pointWrapper, false);
  }
 }
}

3 總結(jié)

POP 客戶(hù)端有很多的優(yōu)勢(shì),總結(jié)如下:

  1. 無(wú)狀態(tài),更好地?fù)肀г圃?/li>
  2. 計(jì)算相關(guān)的功能下移到 Proxy,更加輕量級(jí);
  3. 消費(fèi)能力擴(kuò)展不受 MessageQueue 數(shù)量的限制;
  4. 消費(fèi)者 hang 住,并不會(huì)導(dǎo)致消息積壓。
責(zé)任編輯:武曉燕 來(lái)源: 君哥聊技術(shù)
相關(guān)推薦

2023-08-07 08:32:05

RocketMQ名字服務(wù)

2023-07-03 08:57:45

Master服務(wù)TCP

2022-05-23 09:18:55

RocketMQ存儲(chǔ)中間件

2022-07-07 09:00:49

RocketMQ消費(fèi)者消息消費(fèi)

2021-12-27 08:22:18

Kafka消費(fèi)模型

2024-08-19 04:00:00

2023-09-26 08:01:46

消費(fèi)者TopicRocketMQ

2023-12-25 19:28:59

RocketMQ大數(shù)據(jù)

2022-08-09 08:18:19

RocketMQpush消費(fèi)

2025-05-09 09:05:00

Spring框架設(shè)計(jì)模式

2023-04-11 08:35:22

RocketMQ云原生

2024-10-06 12:56:36

Golang策略設(shè)計(jì)模式

2021-08-09 10:31:33

自定義授權(quán)響應(yīng)

2024-01-24 09:00:31

SSD訂閱關(guān)系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點(diǎn)

2021-05-17 14:57:23

策略模式代碼

2023-06-12 08:49:12

RocketMQ消費(fèi)邏輯

2022-11-08 07:36:17

RocketMQ消費(fèi)者消息堆積

2021-02-07 23:58:10

單例模式對(duì)象

2023-03-21 07:57:37

Go語(yǔ)言設(shè)計(jì)模式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 午夜久久久| 91精品国产欧美一区二区 | 精品久久国产 | 国产日韩欧美 | 一区二区三区不卡视频 | 成人av一区二区在线观看 | 黄色在线免费观看 | 日韩欧美网 | 国产精品一区一区 | 日韩精品久久久久久 | 国产免费麻豆视频 | 免费在线观看成人av | 久久免费高清 | 日韩在线免费 | 狠狠久久综合 | 91麻豆精品国产91久久久更新资源速度超快 | 国际精品鲁一鲁一区二区小说 | 中日韩av | 日韩一二区在线 | 色婷婷影院 | а天堂中文最新一区二区三区 | 亚洲一区二区免费视频 | 狠狠操在线 | 久久久精品久久 | 99精品99| 日韩精品一区二区三区中文在线 | 日韩中文在线 | 麻豆天堂 | 99re国产精品 | 欧美极品在线视频 | www.色综合| 伊人伊人网| 国产丝袜一区二区三区免费视频 | 久久久久久蜜桃一区二区 | 91视频网 | 99精品久久| 日韩欧美国产精品一区二区三区 | 日韩久草 | 欧美一区日韩一区 | 日韩三级一区 | 成人国产精品久久久 |