13°

聊聊rocketmq的updateTopicRouteInfoFromNameServer

本文主要研究一下rocketmq的updateTopicRouteInfoFromNameServer

updateTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
//......

public void updateTopicRouteInfoFromNameServer() {
    Set&lt;String&gt; topicList = new HashSet&lt;String&gt;();

    // Consumer
    {
        Iterator&lt;Entry&lt;String, MQConsumerInner&gt;&gt; it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry&lt;String, MQConsumerInner&gt; entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                Set&lt;SubscriptionData&gt; subList = impl.subscriptions();
                if (subList != null) {
                    for (SubscriptionData subData : subList) {
                        topicList.add(subData.getTopic());
                    }
                }
            }
        }
    }

    // Producer
    {
        Iterator&lt;Entry&lt;String, MQProducerInner&gt;&gt; it = this.producerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry&lt;String, MQProducerInner&gt; entry = it.next();
            MQProducerInner impl = entry.getValue();
            if (impl != null) {
                Set&lt;String&gt; lst = impl.getPublishTopicList();
                topicList.addAll(lst);
            }
        }
    }

    for (String topic : topicList) {
        this.updateTopicRouteInfoFromNameServer(topic);
    }
}

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
    return updateTopicRouteInfoFromNameServer(topic, false, null);
}

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault &amp;&amp; defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
                    boolean changed = topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                    }

                    if (changed) {
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }

                        // Update Pub info
                        {
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                            publishInfo.setHaveTopicRouterInfo(true);
                            Iterator&lt;Entry&lt;String, MQProducerInner&gt;&gt; it = this.producerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry&lt;String, MQProducerInner&gt; entry = it.next();
                                MQProducerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicPublishInfo(topic, publishInfo);
                                }
                            }
                        }

                        // Update sub info
                        {
                            Set&lt;MessageQueue&gt; subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                            Iterator&lt;Entry&lt;String, MQConsumerInner&gt;&gt; it = this.consumerTable.entrySet().iterator();
                            while (it.hasNext()) {
                                Entry&lt;String, MQConsumerInner&gt; entry = it.next();
                                MQConsumerInner impl = entry.getValue();
                                if (impl != null) {
                                    impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                }
                            }
                        }
                        log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } else {
                    log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                }
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) &amp;&amp; !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                    log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                }
            } finally {
                this.lockNamesrv.unlock();
            }
        } else {
            log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
        }
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}

//......

}

  • updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

getTopicRouteInfoFromNameServer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {
//......

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
    throws RemotingException, MQClientException, InterruptedException {

    return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
}

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.TOPIC_NOT_EXIST: {
            if (allowTopicNotExist &amp;&amp; !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
            }

            break;
        }
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}    

//......

}

  • getTopicRouteInfoFromNameServer方法构造RequestCode.GET_ROUTEINTO_BY_TOPIC,若response.getCode为ResponseCode.SUCCESS,则使用TopicRouteData.decode(body, TopicRouteData.class)解析为TopicRouteData;这里remotingClient.invokeSync的addr参数为null

invokeSync

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
//......

private final AtomicReference&lt;List&lt;String&gt;&gt; namesrvAddrList = new AtomicReference&lt;List&lt;String&gt;&gt;();
private final AtomicReference&lt;String&gt; namesrvAddrChoosed = new AtomicReference&lt;String&gt;();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());

//......

public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null &amp;&amp; channel.isActive()) {
        try {
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis &lt; costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }

    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null &amp;&amp; cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}

private Channel getAndCreateNameserverChannel() throws InterruptedException {
    String addr = this.namesrvAddrChoosed.get();
    if (addr != null) {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null &amp;&amp; cw.isOK()) {
            return cw.getChannel();
        }
    }

    final List&lt;String&gt; addrList = this.namesrvAddrList.get();
    if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
        try {
            addr = this.namesrvAddrChoosed.get();
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null &amp;&amp; cw.isOK()) {
                    return cw.getChannel();
                }
            }

            if (addrList != null &amp;&amp; !addrList.isEmpty()) {
                for (int i = 0; i &lt; addrList.size(); i++) {
                    int index = this.namesrvIndex.incrementAndGet();
                    index = Math.abs(index);
                    index = index % addrList.size();
                    String newAddr = addrList.get(index);

                    this.namesrvAddrChoosed.set(newAddr);
                    log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                    Channel channelNew = this.createChannel(newAddr);
                    if (channelNew != null) {
                        return channelNew;
                    }
                }
            }
        } catch (Exception e) {
            log.error("getAndCreateNameserverChannel: create name server channel exception", e);
        } finally {
            this.lockNamesrvChannel.unlock();
        }
    } else {
        log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
    }

    return null;
}

private static int initValueIndex() {
    Random r = new Random();

    return Math.abs(r.nextInt() % 999) % 999;
}

//......

}

  • invokeSync首先通过getAndCreateChannel获取channel,而getAndCreateChannel方法在addr为null时执行的是getAndCreateNameserverChannel;这里取的是namesrvAddrChoosed.get(),若不为null则返回,为null的话则先从namesrvIndex.incrementAndGet()获取index,取绝对值,然后再对addrList.size()取余数作为选中的namesrv的地址,更新到namesrvAddrChoosed;namesrvIndex的初始值为initValueIndex,它通过Math.abs(r.nextInt() % 999) % 999算出一个随机初始值

小结

  • MQClientInstance的updateTopicRouteInfoFromNameServer首先从consumerTable及producerTable获取topicList,然后遍历topicList执行updateTopicRouteInfoFromNameServer,最后执行的是updateTopicRouteInfoFromNameServer(topic, false, null)
  • 这里会执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3)获取topicRouteData然后与topicRouteTable中的TopicRouteData进行对比,先通过topicRouteDataIsChange判断是否有变化,没有的话再通过isNeedUpdateTopicRouteInfo进一步判断
  • 若有变化则更新brokerAddrTable,遍历producerTable执行impl.updateTopicPublishInfo(topic, publishInfo);遍历consumerTable执行impl.updateTopicSubscribeInfo(topic, subscribeInfo),最后将cloneTopicRouteData更新到topicRouteTable

doc

本文由【go4it】发布于开源中国,原文链接:https://my.oschina.net/go4it/blog/3136636

全部评论: 0

    我有话说: