-
RabbitMQ工作模式详解
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,在分布式系统开发中应用的非常广泛
1
2
3
4
5
6
7
|
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version> 5.6 . 0 </version> </dependency> </dependencies> |
2.创建生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost( "localhost" ); connectionFactory.setPort( 5672 ); // 设置虚拟主机名字和登录,默认/ connectionFactory.setVirtualHost( "/xxx" ); connectionFactory.setUsername( "root" ); connectionFactory.setPassword( "123456" ); Connection connection = connectionFactory.newConnection(); // 创建消息通道 Channel channel = connection.createChannel(); // arg0:队列名称 arg1:是否持久化 arg2:是否排外 arg3:关闭连接时队列是否自动删除 arg4:队列其他参数 channel.queueDeclare( "simple_queue" , true , false , false , null ); String message = "你好,世界。" ; // 消息发送 // arg0:交换机名称,没有指定使用默认的Default Exchange // arg1:路由key,点对点模式可以使用队列名称 arg2:指定消息其他属性 arg3:消息的字节码 channel.basicPublish( "" , "simple_queue" , null , message.getBytes()); channel.close(); connection.close(); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { // 创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost( "localhost" ); connectionFactory.setPort( 5672 ); // 设置虚拟主机名字,默认/ connectionFactory.setVirtualHost( "/xxx" ); connectionFactory.setUsername( "root" ); connectionFactory.setPassword( "123456" ); // 创建一个新链接 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare( "simple_queue" , true , false , false , null ); // 创建消费者,并消费消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * @param consumerTag 消费者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routing key,交换机,消息和重发标志(收到消息失败后是否需要重新发送) * @param properties 消息属性信息 * @param body 消息体 **/ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String routingKey = envelope.getRoutingKey(); String exchange = envelope.getExchange(); long deliveryTag = envelope.getDeliveryTag(); String message = new String(body, "UTF-8" ); System.out.println( "路由:" + routingKey + ",交换机:" + exchange + ",消息id:" + deliveryTag + ",消息体:" + message); } }; // 消息监听 arg0:监听的队列名称 // arg1:是否自动应答,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 // arg2:消费者接收消息到后回调(消费消息) channel.basicConsume( "simple_queue" , true , consumer); // 关闭资源(不建议关闭,建议一直监听消息) } } |
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
SQL Server -- 解决存储过程传入参数作为s
JavaScript判断两个数组相等的四类方法
js如何操作video标签
React实战--利用甘特图和看板,强化Paas平
【记录】正则替换的偏方
前端下载 Blob 类型整理
抽象语法树AST必知必会
关于JS定时器的整理
JS中使用Promise.all控制所有的异步请求都完
js中字符串的方法
import-local执行流程与node模块路径解析流程