因业务需要,需要进行阿里云RocketMQ的性能测试。

环境,一台windows系统CPU:I7,内存:8G,64位操作系统。

测试两种场景,为了保证订阅关系一致性(可以去阿里云官网了解订阅关系一致性),消费分为两种模式。

1、按Tag订阅,订阅所有Tag,测试使用3个消费者,3个生产者,每个生产者发送一万条数据,放到同一个Tag里,对应同一个ShardingKey,保证顺序消费。

测试结果:

阿里云RocketMQ的性能测试(一、本地测试)

2、按Tag订阅,每个消费者订阅一个Tag,每个Tag在一个分组里面。

测试结果:

阿里云RocketMQ的性能测试(一、本地测试)

其中要特别注意第二条,订阅关系一致性问题,如果3个消费者在一个组内,订阅的tag不一致,消费是有问题的,也可能就不消费。

结论:

TPS不到100,这个基于本地到阿里云,走的公网,效果一般,后面会有基于阿里云内网的测试,敬请期待。

发送消息代码:

public static void sendMqMessage( String topic, String tag, String message, String sharding ) {
    String key = UUID.randomUUID().toString();
    Message msg = new Message(
            // Message 所属的 Topic
            topic,
            // Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
            tag,
            // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
            message.getBytes()
    );
    // 设置代表消息的业务关键属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发
    msg.setKey(key);
    // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
    // 全局顺序消息,该字段可以设置为任意非空字符串。
    String shardingKey = sharding;
    try {
        SendResult sendResult = producer.send(msg, shardingKey);
        // 发送消息,只要不抛异常就是成功
        if (sendResult != null) {
            //System.out.println(message + tag + sharding);
            if(message.equals("10000")){
                System.out.println(tag + ":发送完毕10000!");
            }
            //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            //System.out.println(dateFormat.format(new Date()) + "-发送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId());
        }
    }
    catch (Exception e) {
        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
        throw e;
    }
}</code></pre> 
<h1>消费代码:</h1> 
<pre><code>public void consumerMqMessage() {
    String topic = "topic-test";
    String tags = "W3";//第一种模式使用*
    consumer = RocketMqConsumerSingleton.getInstance();
    // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
    consumer.subscribe(
        // Message 所属的 Topic
        topic,
        // 订阅指定 Topic 下的 Tags:
        // 1. * 表示订阅所有消息
        // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
        tags,
        new MessageOrderListener() {
            /**
             * 1. 消息消费处理失败或者处理出现异常,返回 OrderAction.Suspend&lt;br&gt;
             * 2. 消息处理成功,返回 OrderAction.Success
             */
            @Override
            public OrderAction consume(Message message, ConsumeOrderContext context) {
                String msg = new String(message.getBody());
                //System.out.println(msg);

                if(msg.equals("1")) {
                    System.out.println(message.getTag() + "开始:" + System.currentTimeMillis());
                }
                if(msg.equals("10000")) {
                    System.out.println(message.getTag() + "结束:" + System.currentTimeMillis());
                }
                //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                //System.out.println(dateFormat.format(new Date()) + "-消费消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID());
                try {
                    //Thread.sleep(2000);
                    //System.out.println("-------------消费者睡2秒后----------");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return OrderAction.Success;
            }
        });
    consumer.start();
}</code></pre>