Spring JMS 是基于 Spring 框架的 JMS 消息解决方案,提供模板化发送和接收消息的抽象层;本文主要介绍在 SpringBoot 中用 Spring JMS 操作 ActiveMQ,文中使用到的软件版本:ActiveMQ 5.16.2、SpringBoot 2.4.9、Java 1.8.0_191。
1、SpringBoot 整合 ActiveMQ "Classic"
1.1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--jms 连接池--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
1.2、配置(application.yml)
spring: activemq: broker-url: tcp://10.40.100.69:61616 pool: enabled: true max-connections: 5
1.3、配置类
package com.abc.demo.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration public class ActiveMQConfig { @Value("${spring.activemq.broker-url}") private String brokerUrl; @Autowired private ConnectionFactory connectionFactory; @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //开启事务,需配合@Transactional使用 jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean("jmsMessagingTemplateTransaction") public JmsMessagingTemplate jmsMessagingTemplate() { JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(); jmsMessagingTemplate.setJmsTemplate(jmsTemplate()); return jmsMessagingTemplate; } @Bean("jmsListenerContainerFactoryTopic") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } @Bean("jmsListenerContainerFactoryTopicDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(new ActiveMQConnectionFactory(brokerUrl)); factory.setPubSubDomain(true); //设置持久订阅 factory.setSubscriptionDurable(true); factory.setClientId("1234"); return factory; } }
1.4、生产者
package com.abc.demo.activemq; import com.abc.demo.util.DateUtil; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.jms.Destination; /** * 生产者,这里使用定时任务来发送消息 */ @Component public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); private Destination queue = new ActiveMQQueue("testQueue"); private Destination topic = new ActiveMQTopic("testTopic"); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Scheduled(cron = "0 0/1 * * * ?") public void sendToQueue() { String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } @Scheduled(cron = "30 0/1 * * * ?") public void sendToTopic() { String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } /** * 事务 */ @Transactional @Scheduled(cron = "5 0/1 * * * ?") public void sendToQueueTransaction() { for (int i = 0; i < 5; i++) { String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } } /** * 事务 */ @Transactional @Scheduled(cron = "35 0/1 * * * ?") public void sendToTopicTransaction() { for (int i = 0; i < 5; i++) { String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } } }
1.5、消费者
package com.abc.demo.activemq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "testQueue") public void recevieFromQueue(String msg, Session session) throws JMSException { logger.info(session.getTransacted() + "-" + msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic") public void recevieFromTopic(String msg) { logger.info(msg); } /** * 持久化订阅 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable") public void recevieFromTopicDurable(String msg) { logger.info(msg); } }
2、SpringBoot 整合 ActiveMQ Artemis
2.1、引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency> <!--jms 连接池--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency>
2.2、配置(application.yml)
spring: artemis: host: 10.40.96.140 port: 61616 pool: enabled: true max-connections: 5
2.3、配置类
package com.abc.demo.activemq.artemis; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration public class ActiveMQConfig { @Value("${spring.artemis.host}") private String host; @Value("${spring.artemis.port}") private String port; @Autowired private ConnectionFactory connectionFactory; @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); //开启事务,需配合@Transactional使用 jmsTemplate.setSessionTransacted(true); return jmsTemplate; } @Bean("jmsMessagingTemplateTransaction") public JmsMessagingTemplate jmsMessagingTemplate() { JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(); jmsMessagingTemplate.setJmsTemplate(jmsTemplate()); return jmsMessagingTemplate; } @Bean("jmsListenerContainerFactoryTopic") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopic() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); return factory; } @Bean("jmsListenerContainerFactoryTopicDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); // factory.setConnectionFactory(new ActiveMQConnectionFactory("tcp://" + host + ":" + port)); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //设置持久订阅 factory.setSubscriptionDurable(true); factory.setClientId("1234"); return factory; } @Bean("jmsListenerContainerFactoryTopicShare") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShare() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //设置共享订阅 factory.setSubscriptionShared(true); return factory; } @Bean("jmsListenerContainerFactoryTopicShareDurable") public JmsListenerContainerFactory<?> jmsListenerContainerFactoryTopicShareDurable() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(true); //设置共享订阅 factory.setSubscriptionShared(true); //设置持久订阅 factory.setSubscriptionDurable(true); return factory; } }
2.4、生产者
package com.abc.demo.activemq.artemis; import com.abc.demo.util.DateUtil; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.apache.activemq.artemis.jms.client.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.jms.Destination; /** * 生产者,这里使用定时任务来发送消息 */ @Component public class Producer { private static Logger logger = LoggerFactory.getLogger(Producer.class); private Destination queue = new ActiveMQQueue("testQueue"); private Destination topic = new ActiveMQTopic("testTopic"); @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Scheduled(cron = "0 0/1 * * * ?") public void sendToQueue() { String message = "queue-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } @Scheduled(cron = "30 0/1 * * * ?") public void sendToTopic() { String message = "topic-msg-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss"); logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } /** * 事务 */ @Transactional @Scheduled(cron = "5 0/1 * * * ?") public void sendToQueueTransaction() { for (int i = 0; i < 5; i++) { String message = "queue-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(queue, message); } } /** * 事务 */ @Transactional @Scheduled(cron = "35 0/1 * * * ?") public void sendToTopicTransaction() { for (int i = 0; i < 5; i++) { String message = "topic-msg-transaction-" + DateUtil.getCurrentDateString("yyyyMMddHHmmss") + "-" + i; logger.info(message); jmsMessagingTemplate.convertAndSend(topic, message); } } }
2.5、消费者
package com.abc.demo.activemq.artemis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Session; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @JmsListener(destination = "testQueue") public void recevieFromQueue(String msg, Session session) throws JMSException { logger.info(session.getTransacted() + "-" + msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopic") public void recevieFromTopic(String msg) { logger.info(msg); } /** * 持久化订阅 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicDurable") public void recevieFromTopicDurable(String msg) { logger.info(msg); } /** * 共享订阅 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test") public void recevieFromTopicShare(String msg) { logger.info(msg); } @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShare", subscription = "test") public void recevieFromTopicShare2(String msg) { logger.info(msg); } /** * 共享持久订阅 * @param msg */ @JmsListener(destination = "testTopic", containerFactory = "jmsListenerContainerFactoryTopicShareDurable", subscription = "test2") public void recevieFromTopicShareDurable(String msg) { logger.info(msg); } }