当前位置:首页 > IT科技

阿里二面:RocketMQ 消费失败了,怎么处理?

大家好,阿里我是费失君哥。今天来聊一聊 RocketMQ 客户端消息消费失败,败处怎么办?阿里

下面是 RocketMQ 推模式的一段代码:

public static void main(String[] args) throws InterruptedException, MQClientException {

Tracer tracer = initTracer();

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));

consumer.subscribe("TopicTest", "*");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.setConsumeTimestamp("20181109221800");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) {

try{

System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

}catch (Exception e){

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

}

从这段代码可以看出,消费者消费消息后会返回一个消费状态,费失那消费状态有哪些呢?败处参见类 ConsumeConcurrentlyStatus 中定义:

消费成功,返回 CONSUME_SUCCESS。阿里消费失败,费失返回 RECONSUME_LATER。败处

下面代码就是阿里返回上面两个状态的逻辑,对于消费状态,费失如果返回 null,败处会给它赋值 RECONSUME_LATER,阿里处理逻辑如下:

//ConsumeRequest 类

public void run() {

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;

//省略部分逻辑

long beginTimestamp = System.currentTimeMillis();

ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

try {

//省略部分逻辑

status = listener.consumeMessage(Collections.unmodifiableList(msgs),费失 context);

} catch (Throwable e) { }

//省略部分逻辑

if (null == status) {

//省略日志

status = ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

//省略部分逻辑

if (!processQueue.isDropped()) {

ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

} else { }

}

这部分代码的 UML 类图如下:

上面代码中的 processConsumeResult 方法就是消费失败后客户端的处理逻辑:

public void processConsumeResult(

final ConsumeConcurrentlyStatus status,

final ConsumeConcurrentlyContext context,

final ConsumeRequest consumeRequest

) {

//ackIndex 初始值是 Integer.MAX_VALUE;

int ackIndex = context.getAckIndex();

switch (status) {

case CONSUME_SUCCESS:

if (ackIndex >= consumeRequest.getMsgs().size()) {

ackIndex = consumeRequest.getMsgs().size() - 1;

}

//省略部分逻辑

break;

case RECONSUME_LATER:

ackIndex = -1;

//省略部分逻辑

break;

default:

break;

}

switch (this.defaultMQPushConsumer.getMessageModel()) {

case BROADCASTING:

//广播模式下这里只打印日志

break;

case CLUSTERING:

ListmsgBackFailed = new ArrayList(consumeRequest.getMsgs().size());

for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {

MessageExt msg = consumeRequest.getMsgs().get(i);

boolean result = this.sendMessageBack(msg, context);

if (!result) {

msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);

msgBackFailed.add(msg);

}

}

if (!msgBackFailed.isEmpty()) {

consumeRequest.getMsgs().removeAll(msgBackFailed);

//发送回 Broker 失败的消息,5s 后再次消费

this.submitConsumeRequestLater(msgBackFailed,败处 consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());

}

break;

default:

break;

}

//更新本地保存的偏移量

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());

if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

}

}

一 、消费成功

上面的代码逻辑中,如果消费成功,ackIndex 变量的值就是消息数量减 1,所以上面的云服务器 switch 逻辑是不会执行的,因为广播模式下,只是打印一段日志(没有其他逻辑),而集群模式下,for 循环的起始 i 变量已经等于消息数量,循环里面的代码不会执行

因此,如果消息消费成功,只会走最下面的逻辑,更新本地保存的消息偏移量。

二 、消费失败

ackIndex 变量值等于 -1。

1、广播模式

在消费失败的情况下,广播模式的代码只是打印了一段日志,之后更新了本地保存的消息偏移量,因此我们知道广播模式消息消费失败后就不会重新消费了,相当于丢弃了消息

2、集群模式

从上面代码的 for 循环中,会把所有的网站模板消息都发送回 Broker,这样这批消息还能再次被拉取到进行消费。

对于发送给 Broker 失败的消息,会延迟 5s 后再次消费。代码如下:

private void submitConsumeRequestLater(

final Listmsgs,

final ProcessQueue processQueue,

final MessageQueue messageQueue

) {

this.scheduledExecutorService.schedule(new Runnable() {

@Override

public void run() {

ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);

}

}, 5000, TimeUnit.MILLISECONDS);

}

更新本地保存的消息偏移量时,会从消息列表中把发送回 Broker 失败的消息先删除掉。

注意:从上面逻辑可以看到,在拉取到一批消息进行消费时,只要有一条消息消费失败,这批消息都会进行重试,因此消费端做好幂等是必要的。

下面再看一下发送失败消息给 Broker 的代码,发送消息是,请求的 code 码是 CONSUMER_SEND_MSG_BACK。根据这个请求码就能找 Broker 端的处理逻辑。

如果发送回 Broker 时抛出异常,需要重新发送一个新的消息,这里有四点需要注意:

新消息的 Topic 变成【 %RETRY% + consumerGroup】。新消息的 RETRY_TOPIC 这个属性赋值为之前的 Topic。香港云服务器新消息的重试次数属性加 1;新消息的 DELAY 属性等于重试次数 + 3。public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)

throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

try {

this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,

this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());

} catch (Exception e) {

//Topic 变成 %RETRY% + consumerGroup

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);

MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

//RETRY_TOPIC 赋值

MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

//重试次数+1

MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));

//最大重试次数

MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

//DELAY = 重试次数 + 3

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);

} finally {

msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));

}

}3、Broker 处理

上面已经讲过,对于处理失败的消息,消费端会发送回 Broker,不过这里有一点需要注意,发送回 Broker 时,消息的 Topic 变成【"%RETRY%" + namespace + "%" + 原始 topic】,封装逻辑在源码 ClientConfig.withNamespace。

根据请求码 CONSUMER_SEND_MSG_BACK 可以定位到 Broker 的处理逻辑在类 SendMessageProcessor,方法 asyncConsumerSendMsgBack。

(1)进死信队列

如果重试次数超过了最大重试次数(默认 16 次),或者 delayLevel 值小于0,则消息进死信队列,死信队列的 Topic 为【"%DLQ%" + 消费组】,代码如下:

//asyncConsumerSendMsgBack 方法

if (msgExt.getReconsumeTimes() >= maxReconsumeTimes

|| delayLevel < 0) {

newTopic = MixAll.getDLQTopic(requestHeader.getGroup());

queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,

DLQ_NUMS_PER_GROUP,

PermName.PERM_WRITE | PermName.PERM_READ, 0);

msgExt.setDelayTimeLevel(0);

}(2)发送 CommitLog

如果延迟级别(DELAY)等于 0,则延迟级别就等于重试次数加 3。

有个地方需要注意,发送到延迟队列的消息重新进行了封装,封装这个消息用的并不是客户端发来的那个消息,而是从 CommitLog 中根据偏移量查找的,代码如下:

MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());

if (null == msgExt) {

response.setCode(ResponseCode.SYSTEM_ERROR);

response.setRemark("look message by offset failed, " + requestHeader.getOffset());

return CompletableFuture.completedFuture(response);

}

如果查询失败,就会给客户端返回系统错误。

这里有个重要的细节,这个消息写入 CommitLog 时,会判断 DELAY 是否大于 0,如果大于 0,就会修改 Topic。代码如下:

//CommitLog 类 asyncPutMessage 方法

if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE

|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {

// Delay Delivery

if (msg.getDelayTimeLevel() > 0) {

if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {

//从源码看,这里最大值是18

msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());

}

topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;

//queueId = delayLevel - 1

int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));

msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);

msg.setQueueId(queueId);

}

}

这里把 Topic 修改为 SCHEDULE_TOPIC_XXXX,供延时队列来调度。进入延时队列后,延时队列会按照下面的时间进行调度:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

上面代码可以看到,延时消息的调度有 18 个等级,最小的 1s,最大的 2h。而从下面的代码我们可以看到,调度使用第三个等级开始的:

if (0 == delayLevel) {

delayLevel = 3 + msgExt.getReconsumeTimes();

}

msgExt.setDelayTimeLevel(delayLevel);(3)延时队列

延时队列的代码逻辑在类 ScheduleMessageService,这里的 start 方法触发延时队列的调度,而 start 方法的业务入口在 BrokerStartup 的初始化。

首先,会计算出每个延时等级对应的延时时间(处理到 ms 级别),放到 delayLevelTable,它是一个 ConcurrentHashMap,然后创建一个核心线程数等于 18 的定时线程池,依次对每个级别的延时进行调度。这个任务启动后,会每 100ms 执行一次。代码如下:

public void start() {

if (started.compareAndSet(false, true)) {

this.load();

this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));

//省略异步

for (Map.Entryentry : this.delayLevelTable.entrySet()) {

Integer level = entry.getKey();

Long timeDelay = entry.getValue();

Long offset = this.offsetTable.get(level);

if (null == offset) {

offset = 0L;

}

if (timeDelay != null) {

//省略异步

this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);

}

}

//省略其他逻辑

}

}

调度逻辑中,首先根据 Topic 和 queueId 找到对应的消费队列,然后从里面连续读取消息:

public void executeOnTimeup() {

ConsumeQueue cq =

ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,

delayLevel2QueueId(delayLevel));

//省略空处理

SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

//省略空处理

long nextOffset = this.offset;

try {

int i = 0;

ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();

//CQ_STORE_UNIT_SIZE = 20,因为 ConsumeQueue 中一个元素占 20 字节

for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {

//offset占8个字节

long offsetPy = bufferCQ.getByteBuffer().getLong();

//消息大小占4个字节

int sizePy = bufferCQ.getByteBuffer().getInt();

//ConsumeQueue中tagsCode是一个投递时间点

long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {

if (cq.getExt(tagsCode, cqExtUnit)) {

tagsCode = cqExtUnit.getTagsCode();

} else {

//cant find ext content.So re compute tags code.

long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);

tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);

}

}

long now = System.currentTimeMillis();

long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;

if (countdown > 0) {

//时间未到,等待下次调度

this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);

return;

}

MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);

MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);

//省略事务消息

boolean deliverSuc;

//同步异步都有,只保留同步代码

deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);

}

nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

} catch (Exception e) {

} finally {

bufferCQ.release();

}

//DELAY_FOR_A_WHILE是 100ms

this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);

}

因为 messageTimeup 方法使用了原始的 Topic 和 QueueId 新建了消息,所以上面的 syncDeliver 方式是将消息重新投递到原始的队列中,这样消费者可以再次拉取到这条消息进行消费。注意:上面 ConsumeQueue 的 tagsCode 是一个时间点,很容易误解为是 tag 的 hashCode,MessageQueue 的存储元素中最后 8 字节确实是 tag 的 hashCode。

三、总结

消费者消费失败后,会把消费发回给 Broker 进行处理。下图是客户端处理流程:

Broker 收到消息后,会把消息重新发送到 CommitLog,发送到 CommitLog 之前,首先会修改 Topic 为 SCHEDULE_TOPIC_XXXX,这样就发送到了延时队列,延时队列再根据延时级别把消息投递到原始的队列,这样消费者就能再次拉取到。流程如下图:

从流程来看,消费者批量拉取消息,如果部分消息消费失败,那就会整批全部重试。所以做好幂等是必要的。

分享到:

滇ICP备2023006006号-16