VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • RabbitMQ入门实战(1)--概念、安装及简单使用

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。本文主要介绍RabbitMQ的概念、安装及简单使用,文中使用到的软件版本:erlang 23.1、RabbitMQ 3.8.9、Centos 7.

1、简介

1.1、RabbitMQ组件

Broker:标识消息队列服务器实体。
Virtual Host:虚拟主机,标识一批交换机、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是"/"。
Exchange:交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于绑定键(bing-Key)将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,表示一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从一个消息队列中取得消息的客户端应用程序。
Message:消息,消息包括消息内容及一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、headers(消息头)等。

1.2、RabbitMQ中的Exchange类型

1.2.1、fanout

 fanout类型的exchange会忽略路由键的设置,直接将Message广播到所有绑定的Queue中。

1.2.2、direct

 交换机通过消息上的路由键匹配具有相同值的绑定键来路由消息到对应的队列上。

1.2.3、topic

与direct基本相同,唯一区别在于绑定键;topic的绑定键可以设置表达式,用来模糊匹配;表达式里的通配符可以是"*"或"#"。

* 表示一个单词。例如,绑定键是 *.orange.*,则路由键 lazy.orange.elephant、quick.orange.fox 匹配该绑定键。
# 表示0或多个单词。例如,绑定键是 lazy.#,则路由键 lazy.brown.fox、lazy.pink.rabbit 匹配该绑定键。

1.2.4、headers

headers与fanout、direct、topic不同,它时通过匹配AMQP消息的header而非路由键。headers与direct类似,性能方面比后者查很多,所以在实际项目中用的很少。

1.3、RabbitMQ集群

1.3.1、集群原理

上面图中采用三个节点组成了一个RabbitMQ的集群,Exchange A的元数据信息在所有节点上是一致的,而Queue只会存在于它所创建的那个节点上,其他节点只知道这个queue的metadata信息和一个指向该queue的owner node的指针。生产者或消费者通过tcp代理(HAProxy或nginx)来访问RabbitMQ。

1.3.2、Mirrored Queues(镜像队列)

默认集群中的Queue只存在于它所创建的那个节点上,如果该节点挂了将会造成数据的丢失;使用镜像队列将会在所有其他节点上创建同样的队列,发送数据时所有的队列都会有消息。可以通过设置策略来设置镜像队列。设置策略时有两个关键参数:ha-mode和ha-params

ha-mode ha-params Result
all   在集群中的每个节点都有镜像。当一个节点添加到集群中时,这个节点同样会有相应的镜像
exactly count

指定在集群中镜像的个数。如果集群中节点的个数小于count的值,那么所有的节点都会配置镜像。

如果其中一个镜像挂掉,那么会在另一个节点生成新的镜像。

nodes node names

在指定的节点列表中配置镜像。如果这些指定的节点都处于不可用状态(宕机或者关闭服务等),

那么客户端程序会在自己所连接的那么节点上创建queue。

设置策略例子:

a、ha.开头的队列在所有节点上设置为镜像队列

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'

b、two.开头的队列设置镜像数为2,数据自动同步(默认为手动)

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

c、nodes.开头的队列镜像设置在节点rabbit@pxc1,rabbit@pxc2上

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@pxc1", "rabbit@pxc2"]}'

 

在控制台可以看到镜像队列的详细信息:

1.3.3、Quorum Queues(仲裁队列)

仲裁队列是镜像队列(又称为HA队列)的替代方案,该队列把数据安全作为首要目标,在3.8.0版本可以使用。声明仲裁队列和声明普通队列方法一样,只需要把x-queue-type设置为quorum即可。
适用仲裁队列的情况:如销售系统中订单信息及选举系统的投票,消息的丢失将对系统的正确性和功能产生重大影响。
不适用仲裁队列的情况:临时性质的队列、低延迟队列、数据安全性不那么高的情况、队列积压很长。

仲裁队列有一个leader、多个成员;管理成员:

rabbitmq-queues add_member [-p <vhost>] <queue-name> <node> #增加成员
rabbitmq-queues delete_member [-p <vhost>] <queue-name> <node> #删除成员

如:

./rabbitmq-queues add_member quorum.1 rabbit@pxc2
./rabbitmq-queues delete_member quorum.1 rabbit@pxc2

 可以在控制台查看仲裁队列的详细信息:

2、安装

2.1、安装erlang

下载erlang安装包http://www.erlang.org/downloads,erlang和rabbitmq的版本的对应关系参见:https://www.rabbitmq.com/which-erlang.html

2.1.1、编译安装

shell> tar zxvf otp_src_23.1.tar.gz
shell> cd otp_src_23.1
shell> ./configure --prefix=/home/hadoop/app/otp_23.1
shell> make && make install

2.1.2、设置环境变量

PATH=$PATH: /home/hadoop/soft/otp_23.1/bin
export PATH

2.1.3、安装常见错误处理

a、configure: error: No curses library functions found

需安装ncurses-devel:

yum install ncurses-devel

a、wx-config: command not found

需安装wxWidgets-devel:

yum install epel-release
yum install wxWidgets-devel
cd /usr/bin
ln -s wx-config-3.0 wx-config

2.2、单机版安装RabbitMQ

2.2.1、下载并解压通用包

下载通用包https://www.rabbitmq.com/install-generic-unix.html并解压:

shell> xz -d rabbitmq-server-generic-unix-3.7.17.tar.xz
shell> tar -xvf rabbitmq-server-generic-unix-3.7.17.tar

2.2.2、配置

新建配置文件:${RABBITMQ_HOME}etc/rabbitmq/rabbitmq.conf

loopback_users = none #让guest用户可以从其他机器访问

2.2.3、管理

a、启动服务

sbin/rabbitmq-server –detached #后台运行

b、停止服务

sbin/rabbitmqctl stop

c、查看状态

sbin/rabbitmqctl status

d、查看交换机

sbin/rabbitmqctl list_exchanges

e、查看队列

sbin/rabbitmqctl list_queues

f、查看绑定

sbin/rabbitmqctl list_bindings

f、用户

sbin/rabbitmqctl add_user <username> <password>  #增加用户
sbin/rabbitmqctl set_user_tags <username> <tag> ...  #设置角色
sbin/rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>  #设置权限

如:

sbin/rabbitmqctl add_user admin admin
sbin/rabbitmqctl set_user_tags admin administrator
sbin/rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

2.2.4、控制台

开启管理插件:

sbin/rabbitmq-plugins enable rabbitmq_management

访问控制台:

http://10.49.196.10:15672

控制台可以直观的查看RabbitMQ的运行情况,并可进行创建队列、创建交换机、设置绑定、增加用户等管理工作,很方便。

2.3、集群安装RabbitMQ

2.3.1、先按单机版安装多台RabbitMQ

假设在10.49.196.10、10.49.196.11、10.49.196.12上都安装了单机版的RabbitMQ。

2.3.2、设置hosts文件

使三台机器之间可以通过主机名相互访问

10.49.196.10 pxc1
10.49.196.11 pxc2
10.49.196.12 pxc3

2.3.3、修改.erlang.cookie

修改/home/$user.erlang.cookie,使三台机器上.erlang.cookie文件里的值一样

2.3.4、以一个节点为主节点,添加其他的节点

假设以10.49.196.10为主机节点,在10.49.196.11上执行:

./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@pxc1
./rabbitmqctl start_app

添加之后可以在控制台看到节点信息:

 10.49.196.12节点类似添加。

2.3.5、安装HAProxy或nginx

使用HAProxy或nginx来做代理,这里就不详细介绍这两个工具的安装了。程序访问代理地址,达到高可用的目的。

2.3.6、重置节点

重置节点就会删除该节点所有的资源和数据

./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl start_app

2.3.6、移除集群中一个节点

假设移除10.49.196.11节点

2.3.6.1、正常移除

在10.49.196.11上重置节点:

./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl start_app
2.3.6.1、远程移除(适用于节点没有响应的情况)

a、主节点(10.49.196.11)上执行:

./rabbitmqctl stop_app

b、10.49.196.10上执行:

./rabbitmqctl forget_cluster_node rabbit@pxc2

c、重置10.49.196.11节点:

rabbitmqctl reset
rabbitmqctl start_app

2.4、 镜像队列

在默认集群的基础上,可以设置镜像队列来保证数据的可靠性。如设置以ha.开头的队列在所有节点上为镜像队列

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'

2.5、 仲裁队列

如果还需进一步保证数据的安全性,可以使用仲裁队列。

编码方式声明仲裁队列:

Map<String, Object> arguments = new HashMap<>();
//队列类型设为quorum,默认为classic
arguments.put("x-queue-type", "quorum");
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);

 也可以在控制台创建仲裁队列:

 

来源:https://www.cnblogs.com/wuyongyin/p/13891450.html

相关教程