-
C#队列学习笔记:RabbitMQ延迟队列
一、引言
日常生活中,很多的APP都有延迟队列的影子。比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用RabbitMQ延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依TTL自动失效。
在RabbitMQ中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。
二、示例
2.1、发送端(生产端)
新建一个控制台项目Send,并添加一个类RabbitMQConfig。
class RabbitMQConfig { public static string Host { get; set; } public static string VirtualHost { get; set; } public static string UserName { get; set; } public static string Password { get; set; } public static int Port { get; set; } static RabbitMQConfig() { Host = "192.168.2.242"; VirtualHost = "/"; UserName = "hello"; Password = "world"; Port = 5672; } }
class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:"); Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始生产。"); Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始生产。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessagePublishByQueueExpires(); } else if (chooseChar == "2") { DelayMessagePublishByMessageTTL(); } Console.ReadLine(); } /// <summary> /// 基于队列方式实现延迟队列 /// 将队列中所有消息的TTL(Time To Live,即过期时间)设置为一样 /// </summary> private static void DelayMessagePublishByQueueExpires() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; const int QuequeExpirySeconds = 1000 * 30; const int MessageExpirySeconds = 1000 * 10; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //当同时指定了queue和message的TTL值,则两者中较小的那个才会起作用。 Dictionary<string, object> dict = new Dictionary<string, object> { { "x-expires", QuequeExpirySeconds },//队列过期时间 { "x-message-ttl", MessageExpirySeconds },//消息过期时间 { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key }; //声明队列 channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向该消息队列发送消息message for (int i = 0; i < PublishMessageCount; i++) { var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body); Thread.Sleep(1000 * 2); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } /// <summary> /// 基于消息方式实现延迟队列 /// 对队列中消息进行单独设置,每条消息的TTL可以不同。 /// </summary> private static void DelayMessagePublishByMessageTTL() { const string MessagePrefix = "message_"; const int PublishMessageCount = 6; int MessageExpirySeconds = 0; var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { Dictionary<string, object> dict = new Dictionary<string, object> { { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由 { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key }; //声明队列 channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict); //向该消息队列发送消息message Random random = new Random(); for (int i = 0; i < PublishMessageCount; i++) { MessageExpirySeconds = i * 1000; var properties = channel.CreateBasicProperties(); properties.Expiration = MessageExpirySeconds.ToString(); var message = MessagePrefix + i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body); Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}"); } } } } }
2.2、接收端(消费端)
新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。
class Program { static void Main(string[] args) { Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:"); Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始消费。"); Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始消费。"); string chooseChar = Console.ReadLine(); if (chooseChar == "1") { DelayMessageConsumeByQueueExpires(); } else if (chooseChar == "2") { DelayMessageConsumeByMessageTTL(); } Console.ReadLine(); } public static void DelayMessageConsumeByQueueExpires() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } public static void DelayMessageConsumeByMessageTTL() { var factory = new ConnectionFactory() { HostName = RabbitMQConfig.Host, Port = RabbitMQConfig.Port, VirtualHost = RabbitMQConfig.VirtualHost, UserName = RabbitMQConfig.UserName, Password = RabbitMQConfig.Password, Protocol = Protocols.DefaultProtocol }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct"); string name = channel.QueueDeclare().QueueName; channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{DateTime.Now.ToString()} Received {message}"); }; channel.BasicConsume(queue: name, noAck: true, consumer: consumer); Console.ReadKey(); } } } }
2.3、运行结果
-----------------------------------------------------------------------------------------------------------
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式