掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢(xún)/運(yùn)營(yíng)咨詢(xún)/技術(shù)建議/互聯(lián)網(wǎng)交流
大家好,我是君哥。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、微信小程序、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了播州免費(fèi)建站歡迎大家使用!
大家都知道,RocketMQ 消費(fèi)模式有 PULL 模式和 PUSH 模式,不過(guò)本質(zhì)上都是 PULL 模式,而在實(shí)際使用時(shí),一般使用 PUSH 模式。
不過(guò),RocketMQ 的 PUSH 模式有明顯的不足,主要體現(xiàn)在以下幾個(gè)方面:
上面的圖中,消費(fèi)組中的消費(fèi)者每個(gè)消費(fèi)者消費(fèi)兩個(gè) MessageQueue,這種情況下,增加消費(fèi)者是可以提高消費(fèi)能力的。
但是下面這張圖,每個(gè)消費(fèi)者消費(fèi)一個(gè) MessageQueue,因?yàn)橥粋€(gè) MessageQueue 只能被同一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi),所以增加消費(fèi)者并不能提高消費(fèi)能力。
通過(guò)客戶(hù)端負(fù)責(zé)均衡,MessageQueue0 這個(gè)隊(duì)列分配給了 Consumer0 進(jìn)行獨(dú)占消費(fèi),如果 Consumer0 這個(gè)消費(fèi)者 hang 住了,但是服務(wù)沒(méi)有掛,不能從 Name Server 中下線(xiàn),因?yàn)?Consumer0 拉取到的消息不能消費(fèi),也就不能給 Broker 發(fā)送更新 Offset 的請(qǐng)求,最終導(dǎo)致消息積壓。這種情況只能手動(dòng)讓 Consumer0 下線(xiàn)或者讓 Consumer0 重啟。
RocketMQ 5.0 為了解決 PUSH Consumer 上面的問(wèn)題,引入了 POP Consumer。
POP 模式的客戶(hù)端引入的背景是 RocketMQ 5.0 為了更好地?fù)肀г圃?,客?hù)端要改造成無(wú)狀態(tài)的輕量級(jí)客戶(hù)端,RocketMQ 4.x 中客戶(hù)端具有的負(fù)載均衡、權(quán)限管理、消費(fèi)管理等功能都從客戶(hù)端移動(dòng)到了 Proxy。
POP 消費(fèi)模式如下圖:
四個(gè)消費(fèi)者都可以消費(fèi) Broker1 和 Broker2 上面的所有隊(duì)列,這樣即使某一個(gè)消費(fèi)者 hang 住了,其他消費(fèi)者也可以消費(fèi),并不會(huì)造成消息積壓。
同時(shí),從上圖中可以看到,POP 客戶(hù)端還有一個(gè)優(yōu)勢(shì),增加消費(fèi)者數(shù)量是可以提高消費(fèi)能力的,不受 MessageQueue 數(shù)量和消費(fèi)者數(shù)量的限制。
跟 PUSH 模式相比,POP 模式拉取到消息后,會(huì)設(shè)置一個(gè) POP_CK 屬性,代碼如下:
//MQClientAPIImpl.java
if (requestHeader instanceof PopMessageRequestHeader) {
if (startOffsetInfo == null) {
// we should set the check point info to extraInfo field , if the command is popMsg
// find pop ck offset
String key = messageExt.getTopic() + messageExt.getQueueId();
if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
messageExt.getTopic(), brokerName, messageExt.getQueueId()));
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
} else {
String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
);
//...
}
}可以看到,POP_CK 屬性包含了 brokerName、Topic、QueueId、offset 等參數(shù),通過(guò)這個(gè)屬性可以唯一標(biāo)識(shí)一條消息了。
從上面的代碼還可以看到,responseHeader 中有一個(gè) invisibleTime 屬性,這個(gè)屬性的作用是消費(fèi)者通過(guò) POP 模式拉取到一條消息后,這段時(shí)間(invisibleTime)內(nèi)這條消息在 Broker 端是不可見(jiàn)的,消費(fèi)者再次拉取就不會(huì)重復(fù)拉取到。但是如果過(guò)了這段時(shí)間,消費(fèi)者還沒(méi)有給 Broker 返回 ACK,這條消息會(huì)變?yōu)榭梢?jiàn),再次被消費(fèi)者拉取到。
消費(fèi)完成后,向 Broker 發(fā)送 ACK 消息,見(jiàn)下面代碼:
public void ackMessageAsync(
final String addr,
final long timeOut,
final AckCallback ackCallback,
final AckMessageRequestHeader requestHeader //
) throws RemotingException, MQBrokerException, InterruptedException {
final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
this.remotingClient.invokeAsync(addr, request, timeOut, new BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
public void onComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
AckResult ackResult = new AckResult();
if (ResponseCode.SUCCESS == response.getCode()) {
ackResult.setStatus(AckStatus.OK);
} //...
assert ackResult != null;
ackCallback.onSuccess(ackResult);
} //...
} else {
//...
}
}
});
}
從上面的介紹可以看到,每個(gè)消費(fèi)者都可以從 Broker 的所有 MessageQueue 上拉取消息,那如果多個(gè)消費(fèi)者都從一個(gè) MessageQueue 上面拉取,有沒(méi)有可能會(huì)重復(fù)消費(fèi)呢?
Broker 收到消息拉取請(qǐng)求,從 MessageStore 拉取消息時(shí),首先會(huì)給 MessageQueue 進(jìn)行加鎖,加鎖成功后,才會(huì)拉取消息,這是其他客戶(hù)端來(lái)拉取時(shí)就會(huì)加鎖失敗。
//PopMessageProcessor.java
String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
long offset = getPopOffset(topic, requestHeader, queueId, false, lockKey);
if (!queueLockManager.tryLock(lockKey)) {
restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
return restNum;
}Broker 從 MessageStore 拉取到消息后,會(huì)定義一個(gè) CheckPoint 放入緩存,代碼如下:
//PopMessageProcessor.java
private long popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult,
PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid,
Channel channel, long popTime,
ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo,
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) {
String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(),
requestHeader.getConsumerGroup()) : requestHeader.getTopic();
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId;
//...
offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
GetMessageResult getMessageTmpResult = null;
try {
//...
restNum = getMessageTmpResult.getMaxOffset() - getMessageTmpResult.getNextBeginOffset() + restNum;
if (!getMessageTmpResult.getMessageMapedList().isEmpty()) {
if (isOrder) {
//...
} else {
appendCheckPoint(requestHeader, topic, reviveQid, queueId, offset, getMessageTmpResult, popTime, this.brokerController.getBrokerConfig().getBrokerName());
}
} //...
} //...
return restNum;
}Broker 收到消費(fèi)者發(fā)來(lái)的 ACK 后,會(huì)把 CheckPoint 從緩存中移除。
如果 Broker 一直沒(méi)有收到 ACK,則會(huì)把 CheckPoint 從緩存中移除,同時(shí)把 CheckPoint 發(fā)送給 MessageStore,由 MessageStore 發(fā)送到重試隊(duì)列。代碼如下:
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
removeCk = true;
}
// the time stayed is too long
if (now - point.getPopTime() > brokerController.getBrokerConfig().getPopCkStayBufferTime()) {
removeCk = true;
}
// double check
if (removeCk) {
// put buffer ak to store
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, false);
}
}
}
POP 客戶(hù)端有很多的優(yōu)勢(shì),總結(jié)如下:

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢(xún)/運(yùn)營(yíng)咨詢(xún)/技術(shù)建議/互聯(lián)網(wǎng)交流