-
消息队列之activeMQ
1.activeMQ的主要功能
- 实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统
- 异步消息的消费和处理
- 控制消息的消费顺序
- 可以和Spring/springBoot整合简化编码
- 配置集群容错的MQ集群
2.activeMQ安装
下载地址:http://activemq.apache.org/components/classic/download/
这里笔者是下载的linux版的:
因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:
https://www.cnblogs.com/pluto-charon/p/11746636.html
安装activeMq:
# 安装apache
[root@localhost ~]# yum install ttpd
# 下载的apache-activemq并上传到linux的home下,解压
[root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz
# 进入到bin目录下
[root@localhost home]# cd /apache-activemq-5.16.0/bin
# 启动
[root@localhost bin]# ./activemq start
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')
# activemq的默认端口是61616,查看是否启动的三种方式
# 第一种
[root@localhost bin]# ps -ef |grep activemq
# 第二种
[root@localhost bin]# netstat -ano|grep 61616
tcp6 0 0 :::61616 :::* LISTEN off (0.00/0/0)
# 第三种
[root@localhost bin]# lsof -i:61616
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7517 root 132u IPv6 39926 0t0 TCP *:61616 (LISTEN)
# 带日志的启动方式
[root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
[root@localhost bin]# cd ..
# 可以看到,启动日志都已经记录到日志里了
[root@localhost apache-activemq-5.16.0]# cat myrunmq.log
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
# 关闭activemq
[root@localhost bin]# ./activemq stop
前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:
# 关闭linux防火墙
[root@localhost apache-activemq-5.16.0]# systemctl stop firewalld
在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>
<property name="port" value="8161"/>
</bean>
# 用户名和密码可修改可不修改,默认为admin/admin
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="user,admin" />
<!-- set authenticate=false to disable login -->
<property name="authenticate" value="true" />
</bean>
修改完成之后重启activemq
[root@localhost bin]# ./activemq restart
查看,地址为192.168.189.150:8161
到这里就说明activemq安装成功了。
3.JMS
JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。
1 JMS的优势:
异步:客户端不用发送请求,JMS自动将消息发送给客户端
可靠:JMS保证消息只传递一次
2.JMS的四大组件:
-
JMS provider:实现了jms接口和规范的消息中间件
-
JMS producer:消息生产者,创建和发送JMS消息的客户端应用
-
JMS consumer:消息消费者,接受和处理JMS消息的客户端应用
-
JMS message:由消息头、消息属性、消息体组成
消息头(在send方法之前,通过setXXX()设置):
JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)
JMSDeliverMode:消息是否持久
JMSExpiration:设置消息过期时间
JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4
JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定
消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");
消息体:
TextMessage:普通字符串
MapMessage:map类型,其中key为String类型,而值为java的基本类型
BytesMessage:二进制数组消息
StreamMessage:java数据流消息,用个标准流来顺序填充和读取
ObjectMessage:对象消息,包含一个可序列化的java对象
3.JMS的传送模型:
-
点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息
点对点消息传送的特性:
1.每个消息只有一个接收者
2.消息发送者和接收者没有时间依赖性
3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息
4.当接收者收到消息时,会发送确认收到通知
-
发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。
发布订阅消息传递的特性:
1.一个消息可以传递给多个订阅者
2.发布者和订阅者有时间依赖性
3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅
4.生产者代码实现
1.引入jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.0</version>
</dependency>
2.生产者代码
package activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @className: Jmsproducer
* @description: activemq生产者
* @author: charon
* @create: 2020-12-27 22:36
*/
public class JmsProducer {
/** 声明activemq的地址 */
private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
/** 队列名 */
private static final String QUEUE_NAME = "queue01";
/**
* @param args 参数
*/
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 获得连接
Connection conn = activeMQConnectionFactory.createConnection();
conn.start();
// 创建会话
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// 创建消息
for (int i = 0; i < 5; i++) {
// 消息体
TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】条消息");
// 消息头
// textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
// 消息属性
// textMessage.setStringProperty("c1","VIP");
messageProducer.send(textMessage);
}
// 关闭资源
messageProducer.close();
session.close();
conn.close();
}
}
运行代码在浏览器上查看,可以看到queue01里面有5条消息:
- Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
- Number Of Consumers:消费者的数量
- Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
- Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量
5.消费者代码实现
package activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;
/**
* @className: JmsConsumer
* @description: activeMq的消费者
* @author: charon
* @create: 2020-12-28 08:10
*/
public class JmsConsumer {
/** 声明activemq的地址 */
private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
/** 队列名 */
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 获得连接
Connection conn = activeMQConnectionFactory.createConnection();
conn.start();
// 创建会话
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 创建队列
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消息的生产者
MessageConsumer messageConsumer = session.createConsumer(queue);
// 同步方式,生产环境并不适用,这种方式将阻塞知道获得并返回第一条消息
// while (true){
// TextMessage textMessage =(TextMessage) messageConsumer.receive();
// if(null!=textMessage){
// System.out.println("---消费者收到消息:"+textMessage.getText());
// }else{
// break;
// }
// }
// 异步方式,创建监听,在又消息到达时,调用listener的onMessage方法,
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println("--消费者接受到消息:"+textMessage);
}
}
});
System.in.read();
// 关闭资源
messageConsumer.close();
session.close();
conn.close();
}
}
运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。
6.activeMQ集群搭建
在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。
我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。
# 每台服务器上安装activeMq,同时在集群环境下,activemq的jetty.xml文件重的host要改成0.0.0.0
# 修改activeMq.xml,注释掉kahadb这个配置,actviemq默认的是kahadb,并且添加leveldb
[root@localhost conf]# vi activemq.xml
<!-- <persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter> -->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
<!--实例间的通信地址-->
bind="tcp://0.0.0.0:62222"
<!--zookeeper的地址-->
zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
<!--修改为每个服务器的节点的ip-->
hostname="192.168.189.152"
sync="local_disk"
zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
# 启动三个节点的activemq
[root@localhost bin]# ./activemq restart
# 查看 连接zookeeper客户端
[root@localhost bin]# zkCli.sh
[zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
[00000000022, 00000000020, 00000000021]
# 访问
[zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
{"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
[zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
[zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。
在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161
只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。
第二种查看的方式:
查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。
第三种查看的方式为使用zookeeper的可视化工具。
由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:
- activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议
- 当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)
- 同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。
7.集群代码实现
集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。
/** 声明集群中activemq的地址,使用failover协议,随机 */
private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";
8.activemq的高级特性
1.消息发送方式
默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。
2.储存机制
在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的
所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。
3.死信队列
一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。
4.重复消息,幂等性调用
在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。
原文:https://www.cnblogs.com/pluto-charon/p/14225896.html