14°

聊聊rocketmq的registerProducer与unregisterProducer

本文主要研究一下rocketmq的registerProducer与unregisterProducer

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
//......

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }

    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

public void unregisterProducer(final String group) {
    this.producerTable.remove(group);
    this.unregisterClientWithLock(group, null);
}

private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
    try {
        if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                this.unregisterClient(producerGroup, consumerGroup);
            } catch (Exception e) {
                log.error("unregisterClient exception", e);
            } finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            log.warn("lock heartBeat, but failed.");
        }
    } catch (InterruptedException e) {
        log.warn("unregisterClientWithLock exception", e);
    }
}

private void unregisterClient(final String producerGroup, final String consumerGroup) {
    Iterator&lt;Entry&lt;String, HashMap&lt;Long, String&gt;&gt;&gt; it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry&lt;String, HashMap&lt;Long, String&gt;&gt; entry = it.next();
        String brokerName = entry.getKey();
        HashMap&lt;Long, String&gt; oneTable = entry.getValue();

        if (oneTable != null) {
            for (Map.Entry&lt;Long, String&gt; entry1 : oneTable.entrySet()) {
                String addr = entry1.getValue();
                if (addr != null) {
                    try {
                        this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
                        log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                    } catch (RemotingException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (InterruptedException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    } catch (MQBrokerException e) {
                        log.error("unregister client exception from broker: " + addr, e);
                    }
                }
            }
        }
    }
}

//......

}

  • MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

DefaultMQProducerImpl.start

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
//......

public void start() throws MQClientException {
    this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

//......

}

  • DefaultMQProducerImpl的start方法在serviceState为CREATE_JUST时会执行mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this),如果返回false则抛出MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null)

DefaultMQProducerImpl.shutdown

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {
//......

public void shutdown() {
    this.shutdown(true);
}

public void shutdown(final boolean shutdownFactory) {
    switch (this.serviceState) {
        case CREATE_JUST:
            break;
        case RUNNING:
            this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
            this.defaultAsyncSenderExecutor.shutdown();
            if (shutdownFactory) {
                this.mQClientFactory.shutdown();
            }

            log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
            this.serviceState = ServiceState.SHUTDOWN_ALREADY;
            break;
        case SHUTDOWN_ALREADY:
            break;
        default:
            break;
    }
}

//......

}

  • DefaultMQProducerImpl的shutdown方法在serviceState为RUNNING的时候会执行mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup())

小结

MQClientInstance定义了producerTable,其registerProducer方法会执行producerTable.putIfAbsent(group, producer),如果返回值不为null则返回false;其unregisterProducer方法会执行producerTable.remove(group)以及unregisterClientWithLock,而后者主要执行的是unregisterClient,它最后执行的是mQClientAPIImpl.unregisterClient

doc

本文由【go4it】发布于开源中国,原文链接:https://my.oschina.net/go4it/blog/3135985

全部评论: 0

    我有话说: