46°

聊聊rocketmq的suspendCurrentQueueTimeMillis

本文主要研究一下rocketmq的suspendCurrentQueueTimeMillis

suspendCurrentQueueTimeMillis

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
//......

/**
 * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
 */
private long suspendCurrentQueueTimeMillis = 1000;

public long getSuspendCurrentQueueTimeMillis() {
    return suspendCurrentQueueTimeMillis;
}

public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
    this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}

//......

}

  • DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
//......

private void submitConsumeRequestLater(
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final long suspendTimeMillis
) {
    long timeMillis = suspendTimeMillis;
    if (timeMillis == -1) {
        timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
    }

    if (timeMillis < 10) {
        timeMillis = 10;
    } else if (timeMillis > 30000) {
        timeMillis = 30000;
    }

    this.scheduledExecutorService.schedule(new Runnable() {

        @Override
        public void run() {
            ConsumeMessageOrderlyService.this.submitConsumeRequest(null, processQueue, messageQueue, true);
        }
    }, timeMillis, TimeUnit.MILLISECONDS);
}

//......

}

  • submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

小结

DefaultMQPushConsumer定义了suspendCurrentQueueTimeMillis属性,默认值为1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis为-1时会读取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果该值小于10则重置为10,如果该值大于30000则重置为30000;然后使用scheduledExecutorService延时timeMillis执行submitConsumeRequest方法

doc

本文由【go4it】发布于开源中国,原文链接:https://my.oschina.net/go4it/blog/3133103

全部评论: 0

    我有话说: