88°

MessageQueueSelector实现顺序消费

顺序消息的定义:

顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。

 

部分顺序消费实现原理:

1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)

2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
		producer.setNamesrvAddr("10.76.0.38:9876");
		producer.start();
		for (int i = 0; i < 1000; i++) {
			Order order  = new Order();
			order.orderId = i;
			order.status = "生成";
		Message msg1 = new Message("local-test-producer",
				"TagA",
				JsonUtils.toJson(order).getBytes()
		);
		SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List&lt;MessageQueue&gt; mqs, Message msg, Object arg) {
				return null;
			}
		}, order.orderId);
		log.info("sendResult1={}",sendResult1);
		Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



		order.status="付款";

		Message msg2 = new Message("local-test-producer",
				"TagA",
				JsonUtils.toJson(order).getBytes()
		);
		SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List&lt;MessageQueue&gt; mqs, Message msg, Object arg) {
				return null;
			}
		}, order.orderId);
		log.info("sendResult2={}",sendResult2);
		Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



		order.status="发货";
		Message msg3 = new Message("local-test-producer",
				"TagA",
				JsonUtils.toJson(order).getBytes()
		);
		producer.send(msg2, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List&lt;MessageQueue&gt; mqs, Message msg, Object arg) {
				return null;
			}
		}, order.orderId);
		Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);


		SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List&lt;MessageQueue&gt; mqs, Message msg, Object arg) {
				Integer id = (Integer) arg;
				int index = id % mqs.size();
				return mqs.get(index);
			}
			//MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。
		}, order.orderId);
		log.info("sendResult3={}",sendResult1);
	}

 

 

 

 

 

       //同一个MessageQueue里面的消息要顺序消费,不能并发消费。
		//但是同一个Topic的不同MessageQueue是可以同时消费的
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
		consumer.setNamesrvAddr("10.76.0.38:9876");
		consumer.subscribe("test", "");
		consumer.setPullBatchSize(1);
		consumer.setConsumeThreadMin(1);
		consumer.setConsumeThreadMax(1);
	//	consumer.registerMessageListener(new MessageListenerConcurrently() {
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				List<String> messages = new ArrayList<>();
			for (MessageExt msg : msgs) {
				messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
			}
			System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
			return ConsumeOrderlyStatus.SUCCESS;
		}
	});
	consumer.start();
	Thread.currentThread().join();</code></pre> 

 

源码分析:

我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?

就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。

 

本文由【铁骨铮铮】发布于开源中国,原文链接:https://my.oschina.net/u/4129361/blog/3073211

全部评论: 0

    我有话说: