当前位置:
首页 > Python基础教程 >
-
C#教程之基于阻塞队列的生产者消费者C#并发设计
这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:
调用code:
static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; for (int i = 0; i < 50; i++) { processQueue.Enqueue(i); } Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount()); processQueue.Flush(); Console.Read(); } /// <summary> /// 该方法对入队的每个元素进行处理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine("输出: {0}", value); } /// <summary> /// 处理异常 /// </summary> /// <param name="obj">队列实例</param> /// <param name="ex">异常对象</param> /// <param name="value">出错的数据</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); }
封装的队列:
public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //内部线程池 private List<Thread> _threadCollection; //队列是否正在处理数据 private int _isProcessing; //有线程正在处理数据 private const int Processing = 1; //没有线程处理数据 private const int UnProcessing = 0; //队列是否可用 单个线程下用while来判断,多个线程下用if判断,随后while循环队列的数量 private volatile bool _enabled = true; //内部处理线程数量 private int _internalThreadCount; // 消费者处理事件 public event Action<T> ProcessItemEvent; //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据 public event Action<dynamic, Exception, T> ProcessExceptionEvent; public ProcessQueue() { _queue = new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 3; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount) : this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 队列内部元素的数量 /// </summary> public int GetInternalItemCount() { //return _queue.Count; return _threadCollection.Count; } //生产者生产 public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item = default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex, item); } } } } // 通知消费者消费队列元素 private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { Console.WriteLine("DataAdded"); ProcessRangeItem(); StartProcess(); } } } //判断是否队列有线程正在处理 private bool IsProcessingItem() { // 替换第一个参数, 如果相等 //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing); return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } // 多消费者消费 private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } // 开启消费处理 private void ProcessItem() { Thread currentThread = new Thread((state) => { T item = default(T); while (_enabled) { try { try { if (!_queue.TryTake(out item)) { //Console.WriteLine("阻塞队列为0时的item: {0}", item); //Console.WriteLine("ok!!!"); break; } // 处理事件 ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex, item); } } }); _threadCollection.Add(currentThread); } // 开启消费者 private void StartProcess() { //Console.WriteLine("线程的数量: {0}", _threadCollection.Count); foreach (var thread in _threadCollection) { thread.Start(); thread.IsBackground = true; } } // 终止运行 private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex, T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this, ex, item); } } }
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式