-
Spring框架之jms源码完全解析
本文先对jms进行简单的介绍,其次对Spring中jms模块源码文件清单进行梳理,然后对jms的单独使用和Spring整合jms使用进行演示,最后对Spring中jms模块有两个核心JmsTemplate和消息监听器源码进行分析。
一、jms简介
分布式系统消息通信技术主要包括:(1) RPC(Remote Procedure Call Protocol),一般是C/S方式,同步的,跨语言跨平台,面向过程。(2)CORBA(Common Object Request Broker Architecture),CORBA从概念上扩展了RPC,面向对象的,企业级的(面向对象中间件还有DCOM)。(3) RMI(Remote Method Invocation),面向对象方式的 Java RPC。(4)WebService基于Web,C/S或B/S,跨系统跨平台跨网络。多为同步调用,实时性要求较高。(5)MOM(Message oriented Middleware) 面向消息中间件。
面向消息中间件,主要适用于消息通道、消息总线、消息路由和发布/订阅的场景。目前主流标准有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。AMQP是一个面向协议的,跟语言平台无关的消息传递应用层协议规范。STOMP是流文本定向消息协议,是一种为MOM设计的简单文本协议。AMQP和STOMP都是跟http处于同一层的协议。JMS是Java平台上的面向接口的消息规范,是一套API标准,并没有考虑异构系统。
JMS即Java消息服务应用程序接口,是一个Java平台关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数面向消息中间件提供商都对JMS提供支持,JMS类似于JDBC(Java Database Connectivity),JDBC 是可以用来访问许多不同关系数据库的API,而JMS则提供同样与厂商无关的访问方法,用来访问消息收发服务。所以两个应用程序之间要进行通信,我们使用了一个JMS服务,进行中间的转发,通过使用JMS,可以解除两个程序之间的耦合,提高消息灵活性,支持异步性。
JMS编程模型包含的几个要素:
(1)连接工厂。 连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI(Java命名和目录接口)树中。针对两种不同的jms消息模型(点对点和发布/订阅),分别有QueueConnectionFactory和TopicConnectionFactory两种。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
(2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
(3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
通俗的讲,Connection(连接)是一个物理概念,是指一个通过网络建立的客户端和专有服务器或调度器之间的一个网络连接。Session(会话)是一个逻辑概念,它存在于实例中,一个Connection可以拥有多个Session,也可以没有Session,同一个Connection上的多个Session之间不会相互影响。connection相当于修路,而session相当于通过这条道路的一次运输。
(4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以Destination实际上就是两种类型的对象:Queue、Topic。
(5)JMS消息。消息由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
通常有两种类型:① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
(6)消息的生产者。生产者(Message Producer)对象由Session对象创建,用于发送消息,将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
(7)消息消费者。 消费者(Message Consumer)对象由Session对象创建,用于接收消息,接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
(8)消息监听器。消息监听器MessageListener,类似于钩子函数,hook到消息相关的事件中,换句话说,当消息被创建、开始传输、转发、传输中止、删除时,会调用相应的钩子函数。如注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。
Spring中集成JMS:
JMS是一个 Java 标准,定义了使用消息代理的通用API 。Spring 通过基于模板的抽象为 JMS 功能提供了支持,这个模板就是 JmsTemplate 。使用 JmsTemplate能够非常容易地在消息生产方发送队列和主题消息,在消费消息的一方也能够非常容易地接收这些消息。 (模板方法模式是一种设计模式。通俗的讲就是完成一件事情,有固定的数个步骤,但是每个步骤根据对象的不同,而实现细节不同。这样可以在父类中定义一个完成该事情的总方法,按照完成事件需要的步骤去调用其每个步骤的实现方法。每个步骤的具体实现,由子类完成)。后面我们的代码分析也以JmsTemplate为核心进行分析。对于类似Java EE的消息驱动Bean形式的异步接收,Spring提供了大量用于创建消息驱动POJOs的消息监听器。Spring还提供了一种创建消息监听器的声明式方法。
根据《Spring 5 官方文档》:
(1)org.springframework.jms:定义了各种不同的JmsException异常类。在org.springframework.jms.support.JmsUtils的convertJmsAccessException方法中将javax.jms.JMSException异常类转成成等价的org.springframework.jms.JmsException。
(2)org.springframework.jms.annotation包提供了支持注解驱动监听端点的必要基础架构,通过使用@JmsListener实现。
(3)org.springframework.jms.config包提供了 JMS 命名空间的解析实现,以及配置监听容器和创建监听端点的 java 配置支持。
(4)org.springframework.jms.connection包提供了适用于独立应用程序的ConnectionFactory实现。 它还包含 Spring 对 JMS 的PlatformTransactionManager实现(即JmsTransactionManager)。这将允许 JMS 作为事务性资源无缝集成到 Spring 的事务管理机制中。
(5)org.springframework.jms.core包提供了使用 JMS 的核心功能。它包含了JMS 模板类,用来处理资源的创建与释放,从而简化 JMS 的使用,就像JdbcTemplate对 JDBC 做的一样。
(6)org.springframework.jms.listener:提供了消息监听器及相关支持类。
(7)org.springframework.jms.remoting:提供基于JMS的RPC方案。
(8)org.springframework.jms.support包:提供了一些支持的功能函数。converter子包提供了 MessageConverter 抽象,进行 Java 对象和 JMS 消息的互相转换。destination子包提供了管理 JMS 目的地的不同策略,比如针对 JNDI 中保存的目标的服务定位器。
二、jms模块源码文件清单
1 jms/
1.1 JmsException:继承自NestedRuntimeException,NestedRuntimeException又继承自RuntimeException。
Java中所有异常的父类是Throwable类,在Throwable类下有两大子类:一个是Error类,指系统错误异常,例如:VirtualMachineError 虚拟机错误,ThreadDeath 线程死锁。一般如果是Error类的异常的话,就是程序的硬伤,就好比是工厂里断水断电,机器损坏了。另一个是Exception类,指编码、环境、用户操作输入等异常,这个是比较常见的异常类,Exception类下面又有两个子类,非检查异常(又称运行时异常RuntimeException)和检查异常。
在RuntimeException异常中有几个常见的子类,例如:InputMismatchException 输入不匹配异常;ArithmeticException 算术运算异常;NullPointerException 空指针异常;ArrayIndexOutOfBoundsException 数组下标越界异常;ClassCastException 类型转换异常。
检查异常中的子类有:IOException 文件异常;SQLException SQL数据库错误异常。
1.2 IllegalStateException
1.3 InvalidClientIDException
1.4 InvalidDestinationException
1.5 InvalidSelectorException
1.6 JmsSecurityException
1.7 MessageEOFException
1.8 MessageFormatException
1.9 MessageNotReadableException
1.10 MessageNotWriteableException
1.11 ResourceAllocationException
1.12 TransactionInProgressException
1.13 TransactionRolledBackException
1.14 UncategorizedJmsException
1.2-1.14的异常处理类都继承自JmsException,适用场景如类名所示,其中UncategorizedJmsException表示当其他JmsException都匹配不到时抛出该异常。org.springframework.jms.support.JmsUtils的convertJmsAccessException方法负责将javax.jms.JMSException异常类转成成等价的org.springframework.jms.JmsException。
2 jms/annotation
2.1 JmsListener:该类是一个注解接口。java用@interface Annotation{ } 定义一个注解 @Annotation,一个注解是一个类。注解相当于一种标记,在程序中加上了注解就等于为程序加上了某种标记,以后javac编译器,开发工具和其他程序可以用反射来了解你的类以及各种元素上有无任何标记,看你有什么标记,就去干相应的事。
@JmsListener注解用来声明这是个监听器方法,也就是标记这个方法被JMS消息监听器监听。该类中属性destination表示监听的队列名字,containerFactory表示用来创建JMS监听器容器。处理@JmsListener注解主要靠JmsListenerAnnotationBeanPostProcessor。注册JmsListenerAnnotationBeanPostProcessor可以手动进行,更便捷的是通过Spring的config文件<jms:annotation-driven/>配置,或者使用@EnableJms注解两种方式将注解的监听器类自动放到监听器容器中。
2.2 EnableJms:用@JmsListener这个注解的时候。需要在配置类(@Configuration类)上加上@EnableJms注解,并且要配置一个DefaultJmsListenerContainerFactory监听容器工厂的Bean实例。
Spring根据注解@EnableJms自动扫描带有@JmsListener的方法,并为其创建一个MessageListener把它包装起来。而JmsListenerContainerFactory的Bean的作用就是为每个MessageListener创建MessageConsumer并启动消息接收循环。
Spring接收消息的步骤:通过JmsListenerContainerFactory配合@EnableJms扫描所有@JmsListener方法,自动创建MessageConsumer、MessageListener以及线程池,启动消息循环接收处理消息,最终由我们自己编写的@JmsListener方法处理消息,可能会由多线程同时并发处理。
2.3 JmsListenerAnnotationBeanPostProcessor:该后置处理器用来实现@JmsListener注解,将带有@JmsListener方法注册到指定的JMS消息监听器容器中。该类中afterSingletonsInstantiated方法的最关键的一句 registrar.afterPropertiesSet()即可完成所有监听的注册。这个后置处理器可以通过 <jms:annotation-driven> XML配置或者@EnableJms注解两种方式自动注册。
2.4 JmsBootstrapConfiguration:配置类,注册一个用于处理@JmsListener注解的JmsListenerAnnotationBeanPostProcessor后置处理器。同时也注册一个默认的JmsListenerEndpointRegistry。当使用@EnableJms注解时,这个配置类会被自动载入。
2.5 JmsListenerConfigurer:Spring管理bean实现的可选接口,这些管理bean用来自定义JMS监听器端点的配置方式。
2.6 JmsListeners:注解容器,多个@JmsListener注解的组成的集合。
3 jms/config
3.1 JmsListenerContainerFactory:消息监听容器工厂接口,基于JmsListenerEndpoint。
3.2 AbstractJmsListenerContainerFactory:消息监听容器工厂的抽象基类。
3.3 DefaultJmsListenerContainerFactory:JmsListenerContainerFactory接口的默认实现,该工厂用来创建DefaultMessageListenerContainer。
3.4 DefaultJcaListenerContainerFactory:JmsListenerContainerFactory接口的一个实现,用来创建一个基于 JCA的MessageListener容器JmsMessageEndpointManager。
JCA (J2EE 连接器架构,Java Connector Architecture)是对J2EE标准集的重要补充。因为它注重的是将Java程序连接到非Java程序和软件包中间件的开发。
3.5 SimpleJmsListenerContainerFactory:JmsListenerContainerFactory接口的一个简单实现,用来创建一个标准的SimpleMessageListenerContainer。
3.6 JmsNamespaceHandler:JMS命名空间处理器。注册了三种标签元素对应的处理函数:"listener-container"、"jca-listener-container"、"annotation-driven"。
3.7 AbstractListenerContainerParser:用来解析JMS监听器容器元素。
3.8 JmsListenerContainerParser:解析JMS的<listener-container>元素。
3.9 JcaListenerContainerParser:解析JMS的<jca-listener-container>元素。
3.10 AnnotationDrivenJmsBeanDefinitionParser:解析jms名字空间中的 'annotation-driven' 元素。
3.11 AbstractJmsListenerEndpoint:Jms监听端点的基础模型。
3.12 JmsListenerEndpoint:JMS listener endpoint的模型。借助JmsListenerConfigurer可以用来注册端点。
3.13 MethodJmsListenerEndpoint:JmsListenerEndpoint的一个实现,提供了一些方法用来为该endpoint处理到来的消息。包括get/set 所属bean、所属方法、jms相关参数、spring上下文、消息处理工厂等。
3.14 SimpleJmsListenerEndpoint:JmsListenerEndpoint接口的一个实现,提供了MessageListener,用来为给endpoint处理到来的消息。
3.15 JmsListenerEndpointRegistrar:将JmsListenerEndpoint对象注册到JmsListenerEndpointRegistry对象中。
3.16 JmsListenerEndpointRegistry:创建MessageListenerContainer实例,用来保存注册过的JmsListenerEndpoint,同时对这些消息监听容器的生命周期进行管理。不同于手动创建的MessageListenerContainer,通过注册生成的监听容器不属于ApplicationContext管理的bean,不会被自动装配。
如果需要管理注册的消息监听容器则调用getListenerContainers()函数。如果要使用一个指定的消息监听容器,使用函数getListenerContainer(String),参数就是endpoint的id值。
3.17 JmsListenerConfigUtils:配置常量值,用于子包间的内部共享。
4 jms/connection
4.1 SingleConnectionFactory:connectionFactory是Spring用于创建到JMS服务器链接的。Spring提供了多种connectionFactory,主要有SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。
4.2 CachingConnectionFactory:继承自SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还提供缓存JMS资源功能,包括缓存Session、MessageProducer和MessageConsumer。
Spring中发送消息的核心是JmsTemplate,然而Jmstemplate的问题是在每次调用时都要打开/关闭session和producter,效率很低,所以引申出了PooledConnectionFactory连接池,用于缓存session和producter。然而这还不是最好的。从spring2.5.3版本后,Spring又提供了CachingConnectionFactory,这才是首选的方案。默认情况下, CachingConnectionFactory只缓存一个session。
4.3 SmartConnectionFactory:继承自ConnectionFactory接口,指示从该connectionFactory得到的Connection怎样释放掉。
4.4 DelegatingConnectionFactory:ConnectionFactory接口的实现类,对所有调用给定的目标ConnectionFactory进行代理。
4.5 ConnectionFactoryUtils:ConnectionFactory类的功能函数,特别是用于从一个指定的ConnectionFactory获得transactional JMS resources。主要在框架内部使用,比如JmsTemplate、DefaultMessageListenerContainer会使用到该类。
4.6 CachedMessageConsumer:MessageConsumer的装饰器,使一个共享的MessageConsumer实例能适应所有的调用。
4.7 CachedMessageProducer:MessageProducer装饰器,使得一个共享的MessageProducer实例能适应多数调用。
4.8 ChainedExceptionListener:ExceptionListener接口的实现类,支持异常链。在java代码中常常会再捕获一个异常后抛出另外一个异常,并且希望把异常原始信息保存下来,这被称为异常链。
4.9 JmsResourceHolder:JmsResourceHolder继承了ResourceHolderSupport,作为Jms资源句柄,封装了JMS的connection、session等资源。
4.10 JmsTransactionManager:JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个Connection/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。
4.11 SessionProxy:继承自Session的接口,被Session代理实现,用来获得该代理的目标Session。
4.12 SynchedLocalTransactionFailedException:同步本地事务未完成时抛出的异常。
4.13 TransactionAwareConnectionFactoryProxy:ConnectionFactory的代理,添加了Spring的事务功能,同事务JNDI ConnectionFactory类似。
4.14 UserCredentialsConnectionFactoryAdapter:ConnectionFactory的一个适配器,授予用户对于每个标准的 createConnection()方法调用的权限。
5 jms/core
5.1 JmsMessageOperations:继承了MessageSendingOperations、MessageReceivingOperations、MessageRequestReplyOperations几个接口,包含关于JMS消息的操作方法,包括send、receive、convertAndSend、receiveAndConvert等。
5.2 JmsMessagingTemplate:JmsMessageOperations接口的一个实现。
5.3 JmsOperations:详细列出JMS一系列操作,该接口会被JmsTemplate实现。
5.4 JmsTemplate:核心类。在JDBC中,Spring提供了一个JdbcTemplate来简化JDBC代码开发,同样,Spring也提供了JmsTemplate来简化JMS消息处理的开发。
JmsTemplate其实是Spring对JMS更高一层的抽象,它封装了大部分创建连接、获取session及发送接收消息相关的代码,使得我们可以把精力集中在消息的发送和接收上。
5.5 MessageCreator:利用给定的Session创建一个JMS消息。
5.6 MessagePostProcessor:和JmsTemplate的send方法一起用,将一个对象转换成message。在一个消息被转换器处理后可以进行进一步修改。在设置JMS头部和属性的时候有用。
5.7 BrowserCallback:浏览JMS queue中的信息的回调函数。在JmsTemplate类中的browse、browseSelected函数中BrowserCallback作为一个参数传入。(有些函数要求应用先传给它一个函数,好在合适的时候调用,以完成目标任务。这个被传入的、后又被调用的函数就称为回调函数callback function)。
5.8 ProducerCallback:send一个消息到JMS destination的回调函数。作为JmsTemplate类的execute函数一个参数。
5.9 SessionCallback:在一个给定的Session执行一系列操作的回调函数。作为JmsTemplate的execute函数的参数使用。
jms/core/support
5.10 JmsGatewaySupport:方便应用类访问JMS。使用该类时需要设置一个ConnectionFactory 或者JmsTemplate实例。如果存在ConnectionFactory ,那么它通过createJmsTemplate方法会创建自己的JmsTemplate。
6 jms/listener
6.1 AbstractJmsListeningContainer:继承自JmsDestinationAccessor,作为所有Message Listener Container的公共基类。它主要提供了JMS connection的生命周期管理的功能,但是没有对消息接收的方式(主动接收方式或者异步接收方式)等做任何假定。
6.2 MessageListenerContainer:框架内部使用的一个抽象类,用来表示一个消息监听器容器,不会被用来支持JMS和JCA模式的外部容器实现。
6.3 AbstractMessageListenerContainer: Spring消息监听器容器(如SimpleMessageListenerContainer、SimpleMessageListenerContainer)的父类。
6.4 AbstractPollingMessageListenerContainer:继承自AbstractMessageListenerContainer,它提供了对于主动接收消息(polling)的支持,以及支持外部的事务管理。
6.5 SimpleMessageListenerContainer:最简单的消息监听器容器,用来从jms 消息队列中接收消息,然后推送注册到它内部的消息监听器(MessageListener)中,只能处理固定数量的JMS会话,且不支持事务。
在Spring框架中使用JMS传递消息有两种方式:JMS template和message listener container,前者用于同步收发消息,后者主要用于异步收消息。
6.6 DefaultMessageListenerContainer:用于异步消息监听的消息监听器容器。跟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer也支持创建多个Session和MessageConsumer来接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。
6.7 LocallyExposedJmsResourceHolder:JMS资源句柄JmsResourceHolder的子类,指示本地的资源。
6.8 SessionAwareMessageListener:SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法。
6.9 SubscriptionNameProvider:消息监听器会实现该接口,表示一个持久的订阅,否则消息监听器被用作一个默认的订阅。
jms/listener/adapter
6.10 AbstractAdaptableMessageListener:JMS消息监听器适配器的抽象类,提供系列方法用来提取JMS消息的有效信息。
6.11 JmsResponse:在运行状态时,destination需要计算时使用该类,返回JMS监听器的方法用来指示destination。如果在运行中不需要计算destination,推荐使用org.springframework.messaging.handler.annotation.SendTo @SendTo。
6.12 MessageListenerAdapter:MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它委托给目标监听器进行处理。MessageListenerAdapter会把接收到的消息做如下转换:
TextMessage转换为String对象;
BytesMessage转换为byte数组;
MapMessage转换为Map对象;
ObjectMessage转换为对应的Serializable对象。
6.13 MessagingMessageListenerAdapter:MessageListener适配器,援引一个可配置的InvocableHandlerMethod(用于在某个请求被控制器方法处理时,包装处理所需的各种参数和执行处理逻辑)。
6.14 ListenerExecutionFailedException:监听器方法执行失败时抛出的异常。
6.15 ReplyFailureException:需要回复的消息发送失败抛出的异常。
jms/listener/endpoint
6.16 JmsMessageEndpointFactory:JCA 1.7 MessageEndpointFactory工厂类的一个实现,为JMS监听器提供了事务管理能力。
6.17 JmsMessageEndpointManager:GenericMessageEndpointManager的一个拓展,ActivationSpec配置中加入了对JMS的支持。
6.18 JmsActivationSpecFactory:基于JmsActivationSpecConfig用来创建JCA 1.5 ActivationSpec对象的工厂。
6.19 StandardJmsActivationSpecFactory:JmsActivationSpecFactory接口的标准实现,支持JMS 1.5中定义的标准JMS属性,忽视Spring的"maxConcurrency" 、 "prefetchSize" 设置。
6.20 DefaultJmsActivationSpecFactory: JmsActivationSpecFactory接口的默认实现。支持JCA 1.5中所定义的标准额JMS属性,也包括Spring拓展的一些设置,如"maxConcurrency" 、 "prefetchSize" 。
6.21 JmsActivationSpecConfig:激活JMS message endpoint的一些通用的配置对象。
7 jms/remoting
7.1 JmsInvokerClientInterceptor:方法拦截器,序列化远程触发对象和反序列化远程触发结果对象,使用Java序列化方法,例如RMI。
7.2 JmsInvokerProxyFactoryBean:jms触发代理的工厂bean,暴露bean引用的代理服务,使用特定的服务接口。
7.3 JmsInvokerServiceExporter:为了支持基于消息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean导出为基于消息的服务;同时为客户端提供了JmsInvokerProxyFactoryBean来使用这些服务。
8 jms/support
8.1 JmsAccessor:定义了几个用于访问JMS服务的共通属性,提供了创建Connection和Session的方法。是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer的父类。
8.2 JmsHeaderMapper:将消息头整合到要向外发送的JMS消息中的接口,或者从接收到的JMS消息提取出消息头的信息。
8.3 SimpleJmsHeaderMapper :JmsHeaderMapper接口的简单实现。
8.4 JmsHeaders:将JMS属性设置到通用消息头部或者从其提取出JMS属性用到的预定义的名字或者前缀。
8.5 JmsMessageHeaderAccessor:MessageHeaderAccessor接口的一个实现,能够访问JMS规范的头header。
8.6 JmsUtils:JMS工具包,主要是框架内部使用。
8.7 QosSettings:收集Quality-of-Service设置,在发送消息时使用。
jms/support/converter
8.8 MessageConverter:在收发消息时,将Java objects和JMS messages相互转换。
8.9 SimpleMessageConverter: 实现String与TextMessage之间的相互转换,字节数组与BytesMessage之间的相互转换,Map与MapMessage之间的相互转换以及Serializable对象与ObjectMessage之间的相互转换。
8.10 MarshallingMessageConverter:使用JAXB库实现消息与XML格式之间的相互转换。
8.11 MessagingMessageConverter:利用MessageConverter将messaging abstraction的Message和javax.jms.Message相互转换。
8.12 SmartMessageConverter:MessageConverter的拓展,增加了转换提示功能。
8.13 MappingJackson2MessageConverter:使用Jackson 2 JSON库实现消息与JSON格式之间相互转换。
8.14 MessageType:定义几个常量表示要转换成得目标消息的类型,有text、bytes、map、object。
8.15 MessageConversionException:MessageConverter出错时抛出的异常。
jms/support/destination
8.16 DestinationResolver:将指定的目的地名解析为目的地实例。 参数pubSubDomain用于指定是使用“发布/订阅”模式(解析后的目的地是Topic),还是使用“点对点”模式(解析后的目的地是Queue)。
8.17 BeanFactoryDestinationResolver:实现了DestinationResolver接口和BeanFactoryAware接口。它会根据指定的目的地名从BeanFactory中查找目的地实例。
8.18 DynamicDestinationResolver:实现了DestinationResolver接口。根据指定的目的地名动态创建目的地实例。
8.19 CachingDestinationResolver:继承了DestinationResolver,增加了缓存的功能,在目的地失效的时候,removeFromCache方法会被调用;在JMS provider失效的时候,clearCache方法会被调用。
8.20 JndiDestinationResolver:继承自JndiLocatorSupport, 同时实现了CachingDestinationResolver接口。如果在JMS provider中配置了静态目的地,那么JndiDestinationResolver通过JNDI查找的方式获得目的地实例。
8.21 JmsDestinationAccessor:提供了用于解析目的地的方法。destinationResolver属性的默认值是DynamicDestinationResolver的实例,也就是说默认采用动态目的地解析的方式;pubSubDomain属性用于指定是使用“发布/订阅”模式还是使用“点对点”模式,默认值是false(点对点模式)。
8.22 DestinationResolutionException:将指定的目的地名解析为目的地实例出错抛出的异常。
三、jms的使用演示
(一)jms的单独使用
为了更好的理解Spring整合jms,先来看下单独使用Java消息服务的方式。以Java消息服务的开源实现产品ActiveMQ为例。使用消息服务,需要做三件事:1、开启消息服务器。2、创建消息生产者。3、创建消息消费者。
1、开启消息服务器
如果是Windows系统下可以直接双击ActiveMQ安装目录下的bin目录下的activemq.bat文件来启动消息服务器。如果是Linux系统,进入activeMq安装包下的bin目录,使用./activemq start命令就可以启动activemq服务。
2、创建消息生产者
消息的生产者主要用来将包含业务逻辑的消息发送到消息服务器。以下为发送消息测试,尝试发送三条消息到消息服务器,消息内容为“大家好,这是个测试”。
1 public class Sender{ 2 public static void main(String[] args) throws Exception{ 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start() 6 7 Session session = connection.creatSession(Boolean.True, Session.AUTO_ACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 10 MessageProducer producer = session.createProducer(destionation); 11 for (int i=0; i<3; i++){ 12 TextMessage message = session.createTextMessage("大家好,这是个测试"); 13 Tread.sleep(1000); 14 //通过消息生产者发出消息 15 producer.send(message); 16 } 17 session.commit(); 18 session.close(); 19 connection.close(); 20 } 21 }
3、创建消息消费者
消息的消费者用于连接消息服务器将服务器中的消息提取出来进行相应的处理。
1 public class Receiver{ 2 public static void main(String[] args) throws Exception { 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactroy(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start(); 6 7 final Session session = connection.createSession(Boolean.TRUE, Session.AUTOACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 MessageConsumer consumer = session.createConsumer(destination); 10 11 int i = 0; 12 while(i<3){ 13 i++; 14 TextMessage message = (TextMessage) consumer.receive(); 15 session.commit(); 16 //TODO something... 17 System.out.println("收到消息:" + message.getText()); 18 } 19 20 session.close(); 21 connection.close(); 22 } 23 }
运行时,先开启消息的生产者,向服务器发送消息,然后开启消息的消费者。上述代码可以看出,和数据库实现很相似,一系列的冗余但是必不可少的代码用于创建connectionFactory、connection、session,利用session来 createQueue、createProducer、createConsumer,真正用于发送和接收消息的代码并不多。
Spring 通过基于模板方法的设计模式来解决这个问题,这个模板就是 JmsTemplate 。JmsTemplate封装了大部分创建连接、获取session及发送接收消息相关的代码,使得我们可以把精力集中在消息的发送和接收上。所以使用 JmsTemplate能够非常容易地在消息生产方发送队列和主题消息,在消费消息的一方也能够非常容易地接收这些消息。
(二)Spring整合jms
在Spring中使用jms同样需要做三件事。1、配置文件的配置。2、发送消息。3、接收消息。
1、配置文件的配置
上面我们提到了Spring将Connection的创建和关闭,Session的创建和关闭等操作都封装到了JmsTemplate中,所以在Spring的核心配置文件中首先要注册JmsTemplate类型的bean。ActiveMQConnectionFactory用于连接消息服务器,ActiveMQQueue消息队列是实际的消息源,也要注册。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 </beans>
2、发送消息
Spring中使用jmsTemplate发送消息到消息服务器中去,省去了冗余的Connection以及Session等的创建和销毁过程。
1 public class HelloWorldSender{ 2 public static void main(String[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXmlApplicationContext(new string[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 5 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 6 Destination destination = (Destination) context.getBean("destination"); 7 8 jmsTemplate.send(destination, new MessageCreator(){ 9 public Message createMessage(Session session) throws JMSException{ 10 return session.createTextMessage("大家好,这是个测试"); 11 } 12 }); 13 } 14 }
3、接收消息。
Spring中连接服务器接收消息示例如下:
1 public class HelloWorldReceiver{ 2 public static void main(string[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXMLApplicationContext(new String[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 5 Destination destination = (Destination) context.getBean("destination"); 6 7 TextMessage msg = (TextMessage) jmsTemplate.receive(destination); 8 System.out.println("received msg is:" + msg.getText()); 9 } 10 }
经过上面3步就完成了Spring消息的发送和接收。在HelloWorldSender发送消息类中使用jmsTemplate.send方法来发送消息,没有问题。在HelloWorldReceiver接收消息类中,使用jmsTemplate.receive方法来接收消息会存在一个问题,该方法只能接收一次消息,如果未收到消息则一直等待。利用消息监听器来解决这个问题,消息监听器可以循环监听消息服务器上的消息。消息监听器并非Spring独有, Spring整合JMS的应用提供三种类型的消息监听器,分别是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。
MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。
SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。
MessageListenerAdapter类实现了SessionAwareMessageListener接口和MessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。TextMessage转换为String对象;BytesMessage转换为byte数组;MapMessage转换为Map对象;ObjectMessage转换为对应的Serializable对象。
4、利用消息监听器接收消息(第3步改进版)。
我们需要做两步,第一创建一个消息监听器,第二为了使用消息监听器,修改配置文件。
第一, 我们先来创建一个消息监听器:
1 public class MyMessageListener implements MessageListener{ 2 3 @Override 4 public void onMessage(Message arg0){ 5 TextMessage msg = (TextMessage) arg0; 6 try{ 7 System.out.println(msg.getText()); 8 } catch (JMSException e){ 9 e.printStackTrace(); 10 } 11 } 12 }
一旦有新消息Spring会将消息引导至消息监听器以方便用户进行相应的逻辑处理。
第二,修改配置文件。
为了使用消息监听器,需要在配置文件中注册消息监听器容器,并将消息监听器注入到消息监听器容器中。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 <bean id="myTextListener" class="test.activeMQ.Spring.MyMessageListener" /> 21 22 <bean id="javaConsumer" class="org.Springframework.jms.listener.DefaultMessageListenerContainer"> 23 <property name="ConnectionFactory" ref="connectionFactory" /> 24 <Property name="destination" ref="destination" /> 25 <Property name="messageListener" ref="myTextListener" /> 26 </bean> 27 28 </beans>
通过以上的修改配置就可以进行消息的监听功能了,一旦有消息传入消息服务器,则会被消息监听器监听到,并由Spring将消息内容引导至消息监听器的处理函数中等待用户的进一步逻辑处理。
四、Spring中jms模块核心源码分析
从第三节可以看出,Spring中使用JmsTemplate模板类来进行发送消息和接收消息操作,接收消息可以使用消息监听器的方法来替代模板方法。所以Spring中jms模块核心主要有两个:JmsTemplate和消息监听器。
(一)JmsTemplate发送消息
我们先来看JmsTemplate,在上面使用示例中,使用JmsTemplate发送消息的函数为:
1 jmsTemplate.send(destination, new MessageCreator(){ 2 public Message createMessage(Session session) throws JMSException{ 3 return session.createTextMessage("大家好,这是个测试"); 4 } 5 });
进入JmsTemplate类的send函数:
1 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 2 execute(session -> { 3 doSend(session, destination, messageCreator); 4 return null; 5 }, false); 6 }
(1)通用代码的抽取
调用了该类中的execute函数,继续进入execute函数查看源码逻辑:
1 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 2 Assert.notNull(action, "Callback object must not be null"); 3 Connection conToClose = null; 4 Session sessionToClose = null; 5 try { 6 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 7 obtainConnectionFactory(), this.transactionalResourceFactory, startConnection); 8 if (sessionToUse == null) { 9 //创建connection 10 conToClose = createConnection(); 11 //根据connection创建session 12 sessionToClose = createSession(conToClose); 13 //是否开启向服务推送连接信息,只有接收信息时需要,发送时不需要 14 if (startConnection) { 15 conToClose.start(); 16 } 17 sessionToUse = sessionToClose; 18 } 19 if (logger.isDebugEnabled()) { 20 logger.debug("Executing callback on JMS Session: " + sessionToUse); 21 } 22 //调用回调函数 23 return action.doInJms(sessionToUse); 24 } 25 catch (JMSException ex) { 26 throw convertJmsAccessException(ex); 27 } 28 finally { 29 //关闭session 30 JmsUtils.closeSession(sessionToClose); 31 //释放连接 32 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 33 } 34 }
execute函数封装创建Connection、创建Session、关闭Session和关闭Connection等操作,这些代码都是发送消息都要做的工作,没有差异,execute方法帮助我们抽离了这些冗余代码是我们更加专注业务逻辑的实现。做完这些通用的操作后,通过调用回调函数将程序引入用户自定义实现的个性化处理。Spring使用execute方法封装了冗余代码,而将个性化的代码实现放在了回调函数action.doInJms(sessionToUse)中。
(2)发送消息的实现
我们继续看回调函数action.doInJms(sessionToUse)。在发送消息的功能中回调函数通过局部类实现。
1 new SessionCallback<Object>(){ 2 public Object doInJms(Session session) throws JMSException { 3 doSend(session, destination, messageCreator); 4 return null; 5 } 6 }
此时的发送逻辑转向了doSend方法,我们只需要关注该方法:
1 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 2 throws JMSException { 3 4 Assert.notNull(messageCreator, "MessageCreator must not be null"); 5 MessageProducer producer = createProducer(session, destination); 6 try { 7 Message message = messageCreator.createMessage(session); 8 if (logger.isDebugEnabled()) { 9 logger.debug("Sending created message: " + message); 10 } 11 doSend(producer, message); 12 // Check commit - avoid commit call within a JTA transaction. 13 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 14 // Transacted session created by this template -> commit. 15 JmsUtils.commitIfNecessary(session); 16 } 17 } 18 finally { 19 JmsUtils.closeMessageProducer(producer); 20 } 21 }
1 protected void doSend(MessageProducer producer, Message message) throws JMSException { 2 if (this.deliveryDelay >= 0) { 3 producer.setDeliveryDelay(this.deliveryDelay); 4 } 5 if (isExplicitQosEnabled()) { 6 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 7 } 8 else { 9 producer.send(message); 10 } 11 }
最终的目标还是通过MessageProducer的send来发送消息。
(二)JmsTemplate接收消息
在上面使用示例中,使用JmsTemplate接收消息的函数为:TextMessage msg = (TextMessage) jmsTemplate.receive(destination);我们进入jmsTemplate类的receive函数:
1 public Message receive(Destination destination) throws JmsException { 2 return receiveSelected(destination, null); 3 }
继续进入JmsTemplate类的receiveSelected函数:
1 public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException { 2 return execute(session -> doReceive(session, destination, messageSelector), true); 3 }
继续进入JmsTemplate类的doReceive函数:
1 protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector) 2 throws JMSException { 3 4 return doReceive(session, createConsumer(session, destination, messageSelector)); 5 }
1 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 2 try { 3 // Use transaction timeout (if available). 4 long timeout = getReceiveTimeout(); 5 ConnectionFactory connectionFactory = getConnectionFactory(); 6 JmsResourceHolder resourceHolder = null; 7 if (connectionFactory != null) { 8 resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 9 } 10 if (resourceHolder != null && resourceHolder.hasTimeout()) { 11 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 12 } 13 Message message = receiveFromConsumer(consumer, timeout); 14 if (session.getTransacted()) { 15 // Commit necessary - but avoid commit call within a JTA transaction. 16 if (isSessionLocallyTransacted(session)) { 17 // Transacted session created by this template -> commit. 18 JmsUtils.commitIfNecessary(session); 19 } 20 } 21 else if (isClientAcknowledge(session)) { 22 // Manually acknowledge message, if any. 23 if (message != null) { 24 message.acknowledge(); 25 } 26 } 27 return message; 28 } 29 finally { 30 JmsUtils.closeMessageConsumer(consumer); 31 } 32 }
其中代码Message message = receiveFromConsumer(consumer, timeout);中的receiveFromConsumer函数在JmsDestinationAccessor类(package org.springframework.jms.support.destination,JmsTemplate的父类)中定义,我们进入查看源码:
1 protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException { 2 if (timeout > 0) { 3 return consumer.receive(timeout); 4 } 5 else if (timeout < 0) { 6 return consumer.receiveNoWait(); 7 } 8 else { 9 return consumer.receive(); 10 } 11 }
实现的方式和发送相似,使用execute函数来封装冗余的公共操作,包括创建MessageConsumer,而最终的目标还是通过MessageConsumer(javax.jms.MessageConsumer包中)的receive来接收消息。
(三)消息监听器
消息监听器容器是一个特殊的bean,一旦有消息到达就可以获取消息,并通过调用onMessage()方法将消息传递给一个消息监听器MessageListener。Spring提供了两种消息监听器容器:
SimpleMessageListenerContainer(package org.springframework.jms.listener):最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。
DefaultMessageListenerContainer(package org.springframework.jms.listener):这个消息监听器容器建立在SimpleMessageListenerContainer容器之上,添加了对事务的支持。
下面以DefaultMessageListenerContainer为例进行分析。在上面消息监听器的使用示例中,需要在配置文件中注册消息监听器容器,并将消息监听器注入到消息监听器容器中。我们只有把自定义的消息监听器注入到消息监听器容器中,容器才会把消息转给消息监听器进行处理。
DefaultMessageListenerContainer类的继承关系如下:
DefaultMessageListenerContainer
-- AbstractPollingMessageListenerContainer
-- AbstractMessageListenerContainer
-- AbstractJmsListeningContainer
-- JmsDestinationAccessor
-- JmsAccessor
-- InitializingBean
-- BeanNameAware
-- DisposableBean
-- SmartLifecycle
-- MessageListenerContainer
-- SmartLifecycle
我们看到DefaultMessageListenerContainer类实现了InitializingBean接口,InitializingBean接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。也就是说spring初始化bean的时候,如果该bean实现了InitializingBean接口,会自动调用afterPropertiesSet方法。DefaultMessageListenerContainer在其父类AbstractJmsListeningContainer中实现了该方法:
1 public void afterPropertiesSet() { 2 //验证connectionFactory 3 super.afterPropertiesSet(); 4 //验证配置文件 5 validateConfiguration(); 6 初始化 7 initialize(); 8 }
DefaultMessageListenerContainer监听器容器的初始化中包含了三句代码,前两句用于属性验证,比如connectionFactory或者destination等属性是否为空等,而真正用于初始化的操作委托在initialize函数中执行:
1 public void initialize() throws JmsException { 2 try { 3 //lifecycleMonitor用于控制生命周期的同步处理 4 synchronized (this.lifecycleMonitor) { 5 this.active = true; 6 this.lifecycleMonitor.notifyAll(); 7 } 8 doInitialize(); 9 } 10 catch (JMSException ex) { 11 synchronized (this.sharedConnectionMonitor) { 12 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 13 this.sharedConnection = null; 14 } 15 throw convertJmsAccessException(ex); 16 } 17 }
函数中调用了该类的抽象方法doInitialize,该函数实际在其子类DefaultMessageListenerContainer中实现(父类调用抽象方法,该抽象方法由子类实现):
1 protected void doInitialize() throws JMSException { 2 synchronized (this.lifecycleMonitor) { 3 for (int i = 0; i < this.concurrentConsumers; i++) { 4 scheduleNewInvoker(); 5 } 6 } 7 }
concurrentConsumers设置的是对每个listener在初始化的时候设置的并发消费者的个数,因为在spring中messageListener实例是单例的,spring-jms不能自作主张的创建多个messageListener实例来并发消费。所以spring在内部,创建了多个MessageConsumer实例,并使用consumer.receive()方法以阻塞的方式来获取消息,当获取消息后,再执行messageListener.onMessage()方法。concurrentConsumers属性就是为了指定spring内部可以创建MessageConsumer的最大个数,当messageConsumer实例被创建后,将会封装在一个Runner接口并交给taskExecutor来调度;如果consumer在一直没有收到消息,则会被置为“idle”并从consumer列表中移除;如果所有的consumer都处于active状态,则会创建新的consumer实例直到达到maxConcurrentConsumers个数上限。通常taskExecutor的线程池容量稍大于concurrentConsumer。
我们继续上述源码,doInitialize函数中调用了本类DefaultMessageListenerContainer中的scheduleNewInvoker方法:
1 private void scheduleNewInvoker() { 2 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 3 if (rescheduleTaskIfNecessary(invoker)) { 4 // This should always be true, since we're only calling this when active. 5 this.scheduledInvokers.add(invoker); 6 } 7 }
其中调用了父类AbstractJmsListeningContainer(package org.springframework.jms.listener)的rescheduleTaskIfNecessary方法:
1 protected final boolean rescheduleTaskIfNecessary(Object task) { 2 if (this.running) { 3 try { 4 doRescheduleTask(task); 5 } 6 catch (RuntimeException ex) { 7 logRejectedTask(task, ex); 8 this.pausedTasks.add(task); 9 } 10 return true; 11 } 12 else if (this.active) { 13 this.pausedTasks.add(task); 14 return true; 15 } 16 else { 17 return false; 18 } 19 }
这里需要注意的是,子类DefaultMessageListenerContainer调用了父类AbstractJmsListeningContainer的rescheduleTaskIfNecessary方法,rescheduleTaskIfNecessary方法又调用回子类DefaultMessageListenerContainer的方法doRescheduleTask,即钩子方法。所以doRescheduleTask方法是在DefaultMessageListenerContainer中定义的。
1 protected void doRescheduleTask(Object task) { 2 Assert.state(this.taskExecutor != null, "No TaskExecutor available"); 3 this.taskExecutor.execute((Runnable) task); 4 }
doRescheduleTask函数其实是在开启一个线程执行Runnable。Spring根据concurrentConsumer数量建立了对应数量的线程,而每一个线程都作为一个独立的接收者在循环接收消息。
现在回到DefaultMessageListenerContainer的scheduleNewInvoker方法。我们上面介绍过DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。我们重点关注AsyncMessageListenerInvoker类(该类是DefaultMessageListenerContainer的一个内部类)。它是作为一个Runnable去执行,我们看下其run方法:
1 public void run() { 2 //并发控制 3 synchronized (lifecycleMonitor) { 4 activeInvokerCount++; 5 lifecycleMonitor.notifyAll(); 6 } 7 boolean messageReceived = false; 8 try { 9 //根据每个任务设置的最大处理消息数量而做不同的处理 10 //小于0默认为无限制,一直能接収消息 11 if (maxMessagesPerTask < 0) { 12 messageReceived = executeOngoingLoop(); 13 } 14 else { 15 int messageCount = 0; 16 //消息数量控制,一旦超出数量则停止循环 17 while (isRunning() && messageCount < maxMessagesPerTask) { 18 messageReceived = (invokeListener() || messageReceived); 19 messageCount++; 20 } 21 } 22 } 23 catch (Throwable ex) { 24 //清理操作,包括关闭session等 25 clearResources(); 26 if (!this.lastMessageSucceeded) { 27 // We failed more than once in a row or on startup - 28 // wait before first recovery attempt. 29 waitBeforeRecoveryAttempt(); 30 } 31 this.lastMessageSucceeded = false; 32 boolean alreadyRecovered = false; 33 synchronized (recoveryMonitor) { 34 if (this.lastRecoveryMarker == currentRecoveryMarker) { 35 handleListenerSetupFailure(ex, false); 36 recoverAfterListenerSetupFailure(); 37 currentRecoveryMarker = new Object(); 38 } 39 else { 40 alreadyRecovered = true; 41 } 42 } 43 if (alreadyRecovered) { 44 handleListenerSetupFailure(ex, true); 45 } 46 } 47 finally { 48 synchronized (lifecycleMonitor) { 49 decreaseActiveInvokerCount(); 50 lifecycleMonitor.notifyAll(); 51 } 52 if (!messageReceived) { 53 this.idleTaskExecutionCount++; 54 } 55 else { 56 this.idleTaskExecutionCount = 0; 57 } 58 synchronized (lifecycleMonitor) { 59 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 60 // We're shutting down completely. 61 scheduledInvokers.remove(this); 62 if (logger.isDebugEnabled()) { 63 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 64 } 65 lifecycleMonitor.notifyAll(); 66 clearResources(); 67 } 68 else if (isRunning()) { 69 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); 70 if (nonPausedConsumers < 1) { 71 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + 72 "Check your thread pool configuration! Manual recovery necessary through a start() call."); 73 } 74 else if (nonPausedConsumers < getConcurrentConsumers()) { 75 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + 76 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + 77 "to be triggered by remaining consumers."); 78 } 79 } 80 } 81 } 82 }
上面函数根据maxMessagesPerTask(每个任务设置的最大处理消息数量)值的不同,分开进行了处理。如果是无限值,执行函数executeOngoingLoop;如果不是,控制接收消息数量,一旦超出数量则停止循环,同时可以通过设置标志位running来控制消息接收的暂停与恢复,核心代码就是invokeListener()。我们也看一下executeOngoingLoop代码:
1 private boolean executeOngoingLoop() throws JMSException { 2 boolean messageReceived = false; 3 boolean active = true; 4 while (active) { 5 synchronized (lifecycleMonitor) { 6 boolean interrupted = false; 7 boolean wasWaiting = false; 8 //如果当前任务已经处于激活状态但是却给了暂时终止的命令 9 while ((active = isActive()) && !isRunning()) { 10 if (interrupted) { 11 throw new IllegalStateException("Thread was interrupted while waiting for " + 12 "a restart of the listener container, but container is still stopped"); 13 } 14 if (!wasWaiting) { 15 //如果并非处于等待状态则说明是第一次执行,需要将激活任务数量减少 16 decreaseActiveInvokerCount(); 17 } 18 //开始进入等待状态,等待任务的恢复命令 19 wasWaiting = true; 20 try { 21 //通过wait等待,也就是等待notify或者notifyAll 22 lifecycleMonitor.wait(); 23 } 24 catch (InterruptedException ex) { 25 // Re-interrupt current thread, to allow other threads to react. 26 Thread.currentThread().interrupt(); 27 interrupted = true; 28 } 29 } 30 if (wasWaiting) { 31 activeInvokerCount++; 32 } 33 if (scheduledInvokers.size() > maxConcurrentConsumers) { 34 active = false; 35 } 36 } 37 //正常处理流程 38 if (active) { 39 messageReceived = (invokeListener() || messageReceived); 40 } 41 } 42 return messageReceived; 43 }
上面函数中线程等待不是单纯采用while循环来控制,因为如果单纯采用while循环会浪费CPU的始终周期,给资源造成巨大的浪费。这里采用的使用全局控制变量lifecycleMonitor的wait()方法来暂停线程。所以,如果终止线程需要再次恢复的话,除了更改this.running标志位外,还需要调用lifecycleMonitor.notify或者lifecycleMonitor.notifyAll来使线程恢复。
从上述代码中可以看出其核心执行流程也是invokeListener()。所以内部类AsyncMessageListenerInvoker的run方法中核心的处理就是调用invokeListener来接收消息并激活消息监听器。
1 private boolean invokeListener() throws JMSException { 2 this.currentReceiveThread = Thread.currentThread(); 3 try { 4 //初始化资源包括首次创建的时候创建session和consumer 5 initResourcesIfNecessary(); 6 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); 7 //改变标志位,信息成功处理 8 this.lastMessageSucceeded = true; 9 return messageReceived; 10 } 11 finally { 12 this.currentReceiveThread = null; 13 } 14 }
上述函数调用了receiveAndExecute函数,该函数在DefaultMessageListenerContainer的父类AbstractPollingMessageListenerContainer(package org.springframework.jms.listener)给出了:
1 protected boolean receiveAndExecute( 2 Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) 3 throws JMSException { 4 5 if (this.transactionManager != null) { 6 // Execute receive within transaction. 7 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 8 boolean messageReceived; 9 try { 10 messageReceived = doReceiveAndExecute(invoker, session, consumer, status); 11 } 12 catch (JMSException | RuntimeException | Error ex) { 13 rollbackOnException(this.transactionManager, status, ex); 14 throw ex; 15 } 16 this.transactionManager.commit(status); 17 return messageReceived; 18 } 19 20 else { 21 // Execute receive outside of transaction. 22 return doReceiveAndExecute(invoker, session, consumer, null); 23 } 24 }
在介绍消息监听器容器的分类时,已经介绍DefaultMessageListenerContainer消息监听器容器建立在SimpleMessageListenerContainer容器之上,添加了对事务的支持。如果用户配置了this.transactionManage也就是配置了事务,那么,消息的接收会被控制在事务之内,一旦出现任何异常都会被回滚,而回滚操作也会交由事务管理器同一处理。
上面函数调用了doReceiveAndExecute(),doReceiveAndExecute包含了整个消息的接收处理过程,我们看下其代码:
1 protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session, 2 @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 MessageConsumer consumerToClose = null; 7 try { 8 Session sessionToUse = session; 9 boolean transactional = false; 10 if (sessionToUse == null) { 11 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 12 obtainConnectionFactory(), this.transactionalResourceFactory, true); 13 transactional = (sessionToUse != null); 14 } 15 if (sessionToUse == null) { 16 Connection conToUse; 17 if (sharedConnectionEnabled()) { 18 conToUse = getSharedConnection(); 19 } 20 else { 21 conToUse = createConnection(); 22 conToClose = conToUse; 23 conToUse.start(); 24 } 25 sessionToUse = createSession(conToUse); 26 sessionToClose = sessionToUse; 27 } 28 MessageConsumer consumerToUse = consumer; 29 if (consumerToUse == null) { 30 consumerToUse = createListenerConsumer(sessionToUse); 31 consumerToClose = consumerToUse; 32 } 33 //接收消息 34 Message message = receiveMessage(consumerToUse); 35 if (message != null) { 36 if (logger.isDebugEnabled()) { 37 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 38 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 39 sessionToUse + "]"); 40 } 41 //模板方法,当消息接收且在未处理前给子类机会做相应的处理, 42 messageReceived(invoker, sessionToUse); 43 boolean exposeResource = (!transactional && isExposeListenerSession() && 44 !TransactionSynchronizationManager.hasResource(obtainConnectionFactory())); 45 if (exposeResource) { 46 TransactionSynchronizationManager.bindResource( 47 obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); 48 } 49 try { 50 //激活监听器 51 doExecuteListener(sessionToUse, message); 52 } 53 catch (Throwable ex) { 54 if (status != null) { 55 if (logger.isDebugEnabled()) { 56 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 57 } 58 status.setRollbackOnly(); 59 } 60 handleListenerException(ex); 61 // Rethrow JMSException to indicate an infrastructure problem 62 // that may have to trigger recovery... 63 if (ex instanceof JMSException) { 64 throw (JMSException) ex; 65 } 66 } 67 finally { 68 if (exposeResource) { 69 TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 70 } 71 } 72 // Indicate that a message has been received. 73 return true; 74 } 75 else { 76 if (logger.isTraceEnabled()) { 77 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 78 "session [" + sessionToUse + "] did not receive a message"); 79 } 80 //接收到空消息的处理 81 noMessageReceived(invoker, sessionToUse); 82 // Nevertheless call commit, in order to reset the transaction timeout (if any). 83 if (shouldCommitAfterNoMessageReceived(sessionToUse)) { 84 commitIfNecessary(sessionToUse, null); 85 } 86 // Indicate that no message has been received. 87 return false; 88 } 89 } 90 finally { 91 JmsUtils.closeMessageConsumer(consumerToClose); 92 JmsUtils.closeSession(sessionToClose); 93 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 94 } 95 }
上述代码中我们重点关注下激活监听器 doExecuteListener(sessionToUse, message)方法,doExecuteListener方法在AbstractPollingMessageListenerContainer类的父类AbstractMessageListenerContainer(package org.springframework.jms.listener)给出。
1 protected void doExecuteListener(Session session, Message message) throws JMSException { 2 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 3 if (logger.isWarnEnabled()) { 4 logger.warn("Rejecting received message because of the listener container " + 5 "having been stopped in the meantime: " + message); 6 } 7 rollbackIfNecessary(session); 8 throw new MessageRejectedWhileStoppingException(); 9 } 10 11 try { 12 invokeListener(session, message); 13 } 14 catch (JMSException | RuntimeException | Error ex) { 15 rollbackOnExceptionIfNecessary(session, ex); 16 throw ex; 17 } 18 commitIfNecessary(session, message); 19 }
该函数又调用了该类中的invokeListener函数和commitIfNecessary函数:
(1)invokeListener函数
1 protected void invokeListener(Session session, Message message) throws JMSException { 2 Object listener = getMessageListener(); 3 4 if (listener instanceof SessionAwareMessageListener) { 5 doInvokeListener((SessionAwareMessageListener) listener, session, message); 6 } 7 else if (listener instanceof MessageListener) { 8 doInvokeListener((MessageListener) listener, message); 9 } 10 else if (listener != null) { 11 throw new IllegalArgumentException( 12 "Only MessageListener and SessionAwareMessageListener supported: " + listener); 13 } 14 else { 15 throw new IllegalStateException("No message listener specified - see property 'messageListener'"); 16 } 17 }
上述方法又调用了该类中的doInvokeListener方法,继续查看其代码:
1 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 2 throws JMSException { 3 4 Connection conToClose = null; 5 Session sessionToClose = null; 6 try { 7 Session sessionToUse = session; 8 if (!isExposeListenerSession()) { 9 // We need to expose a separate Session. 10 conToClose = createConnection(); 11 sessionToClose = createSession(conToClose); 12 sessionToUse = sessionToClose; 13 } 14 // Actually invoke the message listener... 15 listener.onMessage(message, sessionToUse); 16 // Clean up specially exposed Session, if any. 17 if (sessionToUse != session) { 18 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 19 // Transacted session created by this container -> commit. 20 JmsUtils.commitIfNecessary(sessionToUse); 21 } 22 } 23 } 24 finally { 25 JmsUtils.closeSession(sessionToClose); 26 JmsUtils.closeConnection(conToClose); 27 } 28 }
1 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 2 listener.onMessage(message); 3 }
通过层层调用,最终提取监听器并使用listener.onMessage(message);激活了监听器,也就是激活了用户自定义的监听器逻辑。doExecuteListener函数中还有一句重要的代码commitIfNecessary。
(2)commitIfNecessary函数
AbstractMessageListenerContainer类中的doExecuteListener方法中调用的commitIfNecessary函数。
1 protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException { 2 // Commit session or acknowledge message. 3 if (session.getTransacted()) { 4 // Commit necessary - but avoid commit call within a JTA transaction. 5 if (isSessionLocallyTransacted(session)) { 6 // Transacted session created by this container -> commit. 7 JmsUtils.commitIfNecessary(session); 8 } 9 } 10 else if (message != null && isClientAcknowledge(session)) { 11 message.acknowledge(); 12 } 13 }
其中又调用了JmsUtils类(package org.springframework.jms.support)的commitIfNecessary(session)函数,我们进入该函数:
1 public static void commitIfNecessary(Session session) throws JMSException { 2 Assert.notNull(session, "Session must not be null"); 3 try { 4 session.commit(); 5 } 6 catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) { 7 // Ignore -> can only happen in case of a JTA transaction. 8 } 9 }
DefaultMessageListenerContainer增加了事务的支持,session.commit()在此完成消息事务的事务提交。告诉消息服务器本地已经正常接收消息,消息服务器接收到本地的事务提交后便可以将此消息删除。否则,当前消息会被其他接收者重新接收。
Spring中使用JmsTemplate模板类来进行发送消息和接收消息操作,接收消息可以使用消息监听器的方法来替代模板方法。至此我们完成了Spring中jms模块两个核心JmsTemplate和消息监听器的分析。
本文参考了 郝佳《Spring源码深度解析》、《Spring 5 官方文档》及博客园、CSDN部分文献。
来源:https://www.cnblogs.com/xxkj/p/14137694.html