当前位置:
首页 > Python基础教程 >
-
C#队列学习笔记:RabbitMQ安装及使用(2)
void Main(string[] args)
{
#region Hello World
//1.实例化连接工厂
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "hello",
Password = "world"
};
//2.建立连接
using (var connection = factory.CreateConnection())
{
//3.建立信道
using (var channel = connection.CreateModel())
{
//4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5.构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello World";
var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的
//6.发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
Console.WriteLine($"Send {message}");
Console.Read();
}
}
#endregion
}
}
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region Hello World //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.声明队列(指定durable:true,告知rabbitmq对消息进行持久化。) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //5.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //6.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //7.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); //8.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
四、Exchange
上面的示例,生产者和消费者直接是通过相同队列名称进行匹配衔接的。消费者订阅某个队列,生产者创建消息发布到队列中,队列再将消息转发到订阅的消费者。这样就会有一个局限性,即消费者一次只能发送消息到某一个队列。
那消费者如何才能发送消息到多个消息队列呢?
RabbitMQ提供了Exchange,它类似于路由器的功能,对消息进行路由,将消息发送到多个队列上。Exchange一方面从生产者接收消息,另一方面将消息推送到队列。但是Exchange是如何知道将消息附加到哪个队列或者直接忽略的呢?这些其实是由Exchange Type来定义的。关于Exchange的图文介绍,请看上一篇《C#队列学习笔记:RabbitMQ基础知识》,此处仅提供示例代码。
4.1、fanout
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region fanout exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;fanout类型无需指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region fanout exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用fanout exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定fanout类型exchange,fanout类型无需指定routingKey。 channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: ""); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
4.2、direct
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region direct exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;direct类型必须指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region direct exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用direct exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定direct类型exchange,direct类型必须指定routingKey。 channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
4.3、topic
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region topic exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.建立信道 using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5.构建byte消息数据包 for (int i = 1; i <= 10; i++) { string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i); var body = Encoding.UTF8.GetBytes(message);//消息是以二进制数组的形式传输的 //6.发送数据包(指定exchange;topic类型必须指定routingKey;指定basicProperties) channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body); Console.WriteLine($"Send {message}"); } Console.Read(); } } #endregion } }
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
class Program { static void Main(string[] args) { #region topic exchange type //1.实例化连接工厂 var factory = new ConnectionFactory { HostName = "localhost", UserName = "hello", Password = "world" }; //2.建立连接 using (var connection = factory.CreateConnection()) { //3.创建信道 using (var channel = connection.CreateModel()) { //4.使用topic exchange type,指定exchange名称。 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); //5.声明队列(随机生成队列名称) var queueName = channel.QueueDeclare().QueueName; //绑定队列到指定topic类型exchange,topic类型必须指定routingKey。 channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast"); //将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息。 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //6.构造消费者实例 var consumer = new EventingBasicConsumer(channel); //7.绑定消息接收后的事件委托 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine($"Received {message}"); //8.发送消息确认信号(手动消息确认) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //9.启动消费者(noAck: false 启用消息响应) channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); Console.Read(); } } #endregion } }
五、RPC
RPC--Remote Procedure Call,远程过程调用。RabbitMQ是如何进行远程调用的呢?示意图如下:
第一步:主要是进行远程调用的客户端需要指定接收远程回调的队列,并声明消费者监听此队列。
第二步:远程调用的服务端除了要声明消费端接收远程调用请求外,还要将结果发送到客户端用来监听结果的队列中去。
data:image/s3,"s3://crabby-images/8bfd5/8bfd5c2165fb2f1a38f21bb3321241f215187be9" alt=""
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式