当前位置:首页 > 系统运维

警惕!这八个场景下 RocketMQ 会发生流量控制

大家好,警惕景下我是个场君哥。

在使用 RocketMQ 的生流过程中,有时候我们会看到下面的量控日志:

[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 206ms, size of queue: 5

这是因为 RocketMQ 触发了流量控制。今天我们来聊一聊哪些场景下 RocketMQ 会触发流量控制。警惕景下

如上图,个场生产者把消息写入 Broker,生流Consumer 从 Broker 拉取消息。量控Broker 是警惕景下 RocketMQ 的核心 ,触发流量控制主要就是个场为了防止 Broker 压力过大而宕机。

一、生流 Broker 流控

1、量控 broker busy

RockerMQ 默认采用异步刷盘策略,警惕景下Producer 把消息发送到 Broker 后,个场Broker 会先把消息写入 Page Cache,生流刷盘线程定时地把数据从 Page Cache 刷到磁盘上,如下图:

那 broker busy 是怎么导致的呢?

Broker 默认是开启快速失败的,处理逻辑类是 BrokerFastFailure,这个类中有一个定时任务用来清理过期的请求,每 10 ms 执行一次,代码如下:

public void start() {

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {

cleanExpiredRequest();

}

}

}, 1000, 10, TimeUnit.MILLISECONDS);

}(1)Page Cache 繁忙

清理过期请求之前首先会判断 Page Cache 是香港云服务器否繁忙,如果繁忙,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d"),也就是本文开头的异常日志。那怎么判断 Page Cache 繁忙呢?Broker 收到一条消息后会追加到 Page Cache 或者内存映射文件,这个过程首先获取一个 CommitLog 写入锁,如果持有锁的时间大于 osPageCacheBusyTimeOutMills(默认 1s,可以配置),就认为 Page Cache 繁忙。具体代码见 DefaultMessageStore 类 isOSPageCacheBusy 方法。

(2)清理过期请求

清理过期请求时,如果请求线程的创建时间到当前系统时间间隔大于 waitTimeMillsInSendQueue(默认 200ms,可以配置)就会清理这个请求,然后给 Producer 返回一个系统繁忙的状态码(code=2,remark="[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d")。

system busy

这个异常在 NettyRemotingAbstract#processRequestCommand 方法。

拒绝请求

如果 NettyRequestProcessor 拒绝了请求,就会给 Producer 返回一个系统繁忙的状态码(code=2,remark="[REJECTREQUEST]system busy, start flow control for a while")。那什么情况下请求会被拒绝呢?看下面这段代码:

//SendMessageProcessor类

public boolean rejectRequest() {

return this.brokerController.getMessageStore().isOSPageCacheBusy() ||

this.brokerController.getMessageStore().isTransientStorePoolDeficient();

}

从代码中可以看到,请求被拒绝的情况有两种可能,一个是 Page Cache 繁忙,另一个是云南idc服务商 TransientStorePoolDeficient。

跟踪 isTransientStorePoolDeficient 方法,发现判断依据是在开启 transientStorePoolEnable 配置的情况下,是否还有可用的 ByteBuffer。

注意:在开启 transientStorePoolEnable 的情况下,写入消息时会先写入堆外内存(DirectByteBuffer),然后刷入 Page Cache,最后刷入磁盘。而读取消息是从 Page Cache,这样可以实现读写分离,避免读写都在 Page Cache 带来的问题。如下图:

线程池拒绝

Broker 收到请求后,会把处理逻辑封装成到 Runnable 中,由线程池来提交执行,如果线程池满了就会拒绝请求(这里线程池中队列的大小默认是 10000,可以通过参数 sendThreadPoolQueueCapacity 进行配置),线程池拒绝后会抛出异常 RejectedExecutionException,程序捕获到异常后,会判断是不是单向请求(OnewayRPC),如果不是,就会给 Producer 返回一个系统繁忙的源码库状态码(code=2,remark="[OVERLOAD]system busy, start flow control for a while")。

判断 OnewayRPC 的代码如下,flag = 2 或者 3 时是单向请求:

public boolean isOnewayRPC() {

int bits = 1 << RPC_ONEWAY;

return (this.flag & bits) == bits;

}(3) 消息重试

Broker 发生流量控制的情况下,返回给 Producer 系统繁忙的状态码(code=2),Producer 收到这个状态码是不会进行重试的。下面是会进行重试的响应码:

//DefaultMQProducer类

private final SetretryResponseCodes = new CopyOnWriteArraySet(Arrays.asList(

ResponseCode.TOPIC_NOT_EXIST,

ResponseCode.SERVICE_NOT_AVAILABLE,

ResponseCode.SYSTEM_ERROR,

ResponseCode.NO_PERMISSION,

ResponseCode.NO_BUYER_ID,

ResponseCode.NOT_IN_CURRENT_UNIT

));

二、 Consumer 流控

DefaultMQPushConsumerImpl 类中有 Consumer 流控的逻辑 。

1、 缓存消息数量超过阈值

ProcessQueue 保存的消息数量超过阈值(默认 1000,可以配置),源码如下:

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueFlowControlTimes++ % 1000) == 0) {

log.warn(

"the cached message count exceeds the threshold { }, so do flow control, minOffset={ }, maxOffset={ }, count={ }, size={ } MiB, pullRequest={ }, flowControlTimes={ }",

this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

}

return;

}

2、缓存消息大小超过阈值

ProcessQueue 保存的消息大小超过阈值(默认 100M,可以配置),源码如下:

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueFlowControlTimes++ % 1000) == 0) {

log.warn(

"the cached message size exceeds the threshold { } MiB, so do flow control, minOffset={ }, maxOffset={ }, count={ }, size={ } MiB, pullRequest={ }, flowControlTimes={ }",

this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);

}

return;

}3、 缓存消息跨度超过阈值

对于非顺序消费的场景,ProcessQueue 中保存的最后一条和第一条消息偏移量之差超过阈值(默认 2000,可以配置)。源代码如下:

if (!this.consumeOrderly) {

if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {

this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);

if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {

log.warn(

"the queues messages, span too long, so do flow control, minOffset={ }, maxOffset={ }, maxSpan={ }, pullRequest={ }, flowControlTimes={ }",

processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),

pullRequest, queueMaxSpanFlowControlTimes);

}

return;

}

}4、获取锁失败

对于顺序消费的情况,ProcessQueue 加锁失败,也会延迟拉取,这个延迟时间默认是 3s,可以配置。

三、总结

本文介绍了 RocketMQ 发生流量控制的 8 个场景,其中 Broker 4 个场景,Consumer 4 个场景。Broker 的流量控制,本质是对 Producer 的流量控制,最好的解决方法就是给 Broker 扩容,增加 Broker 写入能力。而对于 Consumer 端的流量控制,需要解决 Consumer 端消费慢的问题,比如有第三方接口响应慢或者有慢 SQL。

在使用的时候,根据打印的日志可以分析具体是哪种情况的流量控制,并采用相应的措施。

分享到:

滇ICP备2023006006号-16