559°

第十七章:springboot 整合 activeMQ

首先介绍 JMS

  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS 的使用,我们可以解除两个程序之间的耦合。JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。

概念    
        JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
        JMS生产者(Message Producer)
        JMS消费者(Message Consumer)
        JMS消息
        JMS队列
        JMS主题

 

JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)

 

接着介绍 activeMQ

ActiveMQ 是Apache出品,最流行的. 功能强大的即时通讯和集成模式的开源服务器

特点:
            1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
            2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
            3) 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
            4) Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
            5) 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
            6) 使用JDBC和高性能日志支持非常快速的持久化
            ...
 

下载地址:http://activemq.apache.org/activemq-5153-release.html

 

下载解压,进入 bin 文件夹 ,如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目录下的activemq.bat

如果没有异常,代表启动成功,常见启动失败原因是端口占用的问题

我们可以关闭占用的端口,或者修改 activeMQ 的端口。打开 conf/activemq.xml,修改被占用的端口。

  <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5671?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

浏览器打开 http://localhost:8161 进入 控制台,输入 默认的用户名和密码 admin/admin 

打开 queue 页面

Name:队列名称。
        Number Of Pending Messages:等待消费的消息个数。
        Number Of Consumers:当前连接的消费者数目
        Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减。
        Messages Dequeued:已经消费的消息数量

创建 队列,输入队列名称,点击 create 按钮

 

-----------------------------------------分割线-------------------------------------------------------------------------

现在我们整合 springboot 和 activeMQ

第一步、引入 pom 依赖

		<!-- 整合消息队列ActiveMQ -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>

		<!-- 如果配置线程池则加入 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
		</dependency>

 

第二步、配置文件 application.properties 修改

#整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://127.0.0.1:61616
#集群配置
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

第三步、启动类

加入注解

@EnableJms

加入 bean

@Bean
ConnectionFactory connectionFactory() {
   return new ActiveMQConnectionFactory();
}

@Bean
JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
   JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
   jmsTemplate.setPriority(999);
   return jmsTemplate;
}

@Bean(value="jmsMessagingTemplate")
JmsMessagingTemplate jmsMessagingTemplate(JmsTemplate jmsTemplate) {
   JmsMessagingTemplate messagingTemplate = new JmsMessagingTemplate(jmsTemplate);
   return messagingTemplate;
}

第四步、消息发布者

public interface ProducerService {

   /**
    * 功能描述:指定消息队列,还有消息
    * @param destination
    * @param message
    */
   public void sendMessage(Destination destination, final String message);
    
}
@Service
public class ProducerServiceImpl implements ProducerService{

   @Autowired
   private JmsMessagingTemplate jmsTemplate; //用来发送消息到broker的对象
   
   //发送消息,destination是发送到的队列,message是待发送的消息
   @Override
   public void sendMessage(Destination destination, String message) {
      
      jmsTemplate.convertAndSend(destination, message);
      
   }
     
}

第五步、消息消费者

@Component
public class OrderConsumer {
// 接收 order.queue 队列的消息
   @JmsListener(destination="order.queue")
   public void receiveQueue(String text){
       System.out.println("OrderConsumer收到的报文为:"+text);
   }
}

第六步、测试类进行测试

@RestController
@RequestMapping("/queue")
public class QueueController {
   
   @Autowired
   private ProducerService producerService;


   /**
    * 点对点消息发送 ,指定 队列
    * @param msg
    * @return
    */
   @RequestMapping("/order")
   public Object order(String msg){
      Destination destination = new ActiveMQQueue("order.queue");
      producerService.sendMessage(destination, msg);
        return "ok";
   }

}

打开  activeMQ 控制台,可以看到 队列中的消息数量 发生了 变化

 

以上是 activeMQ 的点对点消息队列。点对点并不是只A发送的消息只能指定B接收,而是只A发送的任意一条消息只能由一个人接收处理,也就是每条消息只能被消费一次。

JMS 的另一种模式 发布/订阅模式。A发送的消息可以被所有监听A的对象的接收,就好比学校的广播,所有的学生都可以收听校园广播信息。

正常情况下,activeMQ 只支持一种消息模式,这里做出配置修改,让其能够同时支持 两种 模式

 

启动类中 加入 bean

@Bean
            public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
                DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
                bean.setPubSubDomain(true);
                bean.setConnectionFactory(activeMQConnectionFactory);
                return bean;
            }
@Bean
public Topic topic(){
   return new ActiveMQTopic("topic.queue");
}

 

Topic  发布/订阅 使用 topic.queue 队列

新建订阅者,@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息

@Component
public class TopicSub {

   
   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive1(String text){
      System.out.println("topic.queue 消费者:receive1="+text);
   }
   
   
   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive2(String text){
      System.out.println("topic.queue 消费者:receive2="+text);
   }


   @JmsListener(destination="topic.queue",containerFactory="jmsListenerContainerTopic")
   public void receive3(String text){
      System.out.println("topic.queue 消费者:receive3="+text);
   }
   
   
}

 

修改 信息发送者 ProducerServiceImpl.java,增加代码

@Autowired
private Topic topic;


@Override
public void publish(String msg) {
   this.jmsTemplate.convertAndSend(this.topic, msg);
}

 

修改测试类 QueueController.java ,增加代码

/**
 * 发布、订阅消息
 * @param msg
 * @return
 */
@RequestMapping("/publish")
public Object publish(String msg){
   producerService.publish(msg);
   return "ok";
}

 

启动项目,访问 /queue/order ,打印出一条数据 

OrderConsumer收到的报文为:订单信息

访问  /queue/publish ,打印出三条数据

topic.queue 消费者:receive3=发布、订阅
topic.queue 消费者:receive1=发布、订阅
topic.queue 消费者:receive2=发布、订阅

 

本文由【嘴角轻扬30】发布于开源中国,原文链接:https://my.oschina.net/u/3387320/blog/3009301

全部评论: 0

    我有话说: