209°

rabbitmq 之 ack

场景1:对于消息处理失败,有可能有由于网络波动导致的数据处理异常,待网络稳定时消息就会正常处理,对于这种处理失败,应该继续尝试去处理消息

场景2:消息重复处理,例如我们通过消息队列向数据库中添加数据,由于数据库网络波动,导致数据库连接超时,而我们的系统认为消息处理失败,就会把消息回滚到消息队列,继续尝试处理,这时就会造成消息重复处理的现象,对于重要的消息,我们可以每处理一条消息,就记录一下,处理新的消息时,进行判断消息是否已经处理,如果已经处理,就丢弃消息 设置firstQueue队列为手动ack处理

   @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer_one(){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.addQueues(queueConfig.firstQueue());
        simpleMessageListenerContainer.setExposeListenerChannel(true);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
        simpleMessageListenerContainer.setConcurrentConsumers(1);
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
        simpleMessageListenerContainer.setMessageListener(firstConsumer);
        return simpleMessageListenerContainer;
    }


 生产者

 public void send(String uuid,Object message) {
        CorrelationData correlationId = new CorrelationData(uuid);
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1,
                    (Object) (String.valueOf(message)+i), correlationId);
        }
    }


未确认ack的消费者 

@Component
public class FirstConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        String msg = new String(message.getBody());
       
        // 处理消息
        System.out.println("FirstConsumer {} handleMessage :"+msg);
    }
}

执行结果,发现只消费了一条(但是未ack)

由于未确认ack,故在rabbitmq的界面上看到的firstQueue队列的信息见下图

从图中看到队列中有5个消息,unacked(表示未ack确认的有一个),还有四个准备中,消息就算程序收到了 但是未确认ACK导致消息服务器以为他是未成功消费的 后续还会再发。

此时发现程序重启,有接受到了ss0的这条数据。

手动确认ack消费者

@Component
public class FirstConsumer implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        // 每次只接收一个信息
        channel.basicQos(1);
        String msg = null;
        try {
             msg = new String(message.getBody());
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            if (msg.equals("ss1")){
                channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
                        MessageProperties.PERSISTENT_TEXT_PLAIN, "重新放入队列中的数据".getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("FirstConsumer  consumer fail");
            // 丢弃信息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
        // 处理消息
        System.out.println("FirstConsumer {} handleMessage :"+msg);
    }
}

打断点查看下

 

此时有断点,再次去rabbitmq的界面查看如图

此时会发现,队列中只有2个消息了,还有一个未ack(因为打断点了),故需要手动调用channel.basicAck告知服务器,消费者已经消费了可以从队列中删除了。

本文由【zhu_kai1】发布于开源中国,原文链接:https://my.oschina.net/u/3370769/blog/3001446

全部评论: 0

    我有话说: