-
C#队列学习笔记:RabbitMQ使用多线程提高消费吞吐率
一、引言
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。本例使用多线程来创建多信道并绑定队列,达到多workers的目的。
二、示例
2.1、环境准备
在NuGet上安装RabbitMQ.Client。
2.2、工厂类
添加一个工厂类RabbitMQFactory:
/// <summary> /// 多路复用技术(Multiplexing)目的:为了避免创建多个TCP而造成系统资源的浪费和超载,从而有效地利用TCP连接。 /// </summary> public static class RabbitMQFactory { private static IConnection sharedConnection; private static int ChannelCount { get; set; } private static readonly object _locker = new object(); public static IConnection SharedConnection { get { if (ChannelCount >= 1000) { if (sharedConnection != null && sharedConnection.IsOpen) { sharedConnection.Close(); } sharedConnection = null; ChannelCount = 0; } if (sharedConnection == null) { lock (_locker) { if (sharedConnection == null) { sharedConnection = GetConnection(); ChannelCount++; } } } return sharedConnection; } } private static IConnection GetConnection() { var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; return factory.CreateConnection(); } }
2.3、主窗体
代码如下:
public partial class RabbitMQMultithreading : Form { public delegate void ListViewDelegate<T>(T obj); public RabbitMQMultithreading() { InitializeComponent(); } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="msg"></param> private void ShowMessage(string msg) { if (InvokeRequired) { BeginInvoke(new ListViewDelegate<string>(ShowMessage), msg); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), msg }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// ShowMessage重载 /// </summary> /// <param name="format"></param> /// <param name="args"></param> private void ShowMessage(string format, params object[] args) { if (InvokeRequired) { BeginInvoke(new MethodInvoker(delegate () { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); })); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss ffffff"), string.Format(format, args) }); lvwMsg.Items.Insert(0, item); } } /// <summary> /// 生产者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnSend_Click(object sender, EventArgs e) { int messageCount = 100; var factory = new ConnectionFactory { HostName = "192.168.2.242", UserName = "hello", Password = "world", Port = AmqpTcpEndpoint.UseDefaultPort,//5672 VirtualHost = ConnectionFactory.DefaultVHost,//使用默认值:"/" Protocol = Protocols.DefaultProtocol, AutomaticRecoveryEnabled = true }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World"; var body = Encoding.UTF8.GetBytes(message); for (int i = 1; i <= messageCount; i++) { channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); ShowMessage($"Send {message}"); } } } } /// <summary> /// 消费者 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void btnReceive_Click(object sender, EventArgs e) { Random random = new Random(); int rallyNumber = random.Next(1, 1000); int channelCount = 0; await Task.Run(() => { try { int asyncCount = 10; List<Task<bool>> tasks = new List<Task<bool>>(); var connection = RabbitMQFactory.SharedConnection; for (int i = 1; i <= asyncCount; i++) { tasks.Add(Task.Factory.StartNew(() => MessageWorkItemCallback(connection, rallyNumber))); } Task.WaitAll(tasks.ToArray()); string syncResultMsg = $"集结号 {rallyNumber} 已吹起号角--" + $"本次开启信道成功数:{tasks.Count(s => s.Result == true)}," + $"本次开启信道失败数:{tasks.Count() - tasks.Count(s => s.Result == true)}" + $"累计开启信道成功数:{channelCount + tasks.Count(s => s.Result == true)}"; ShowMessage(syncResultMsg); } catch (Exception ex) { ShowMessage($"集结号 {rallyNumber} 消费异常:{ex.Message}"); } }); } /// <summary> /// 异步方法 /// </summary> /// <param name="state"></param> /// <param name="rallyNumber"></param> /// <returns></returns> private bool MessageWorkItemCallback(object state, int rallyNumber) { bool syncResult = false; IModel channel = null; try { IConnection connection = state as IConnection; //不能使用using (channel = connection.CreateModel())来创建信道,让RabbitMQ自动回收channel。 channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Thread.Sleep(1000); ShowMessage($"集结号 {rallyNumber} Received {message}"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer); syncResult = true; } catch (Exception ex) { syncResult = false; ShowMessage(ex.Message); } return syncResult; } }
2.4、运行结果
多点几次消费者即可增加信道,提升消费能力。
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式