当前位置:
首页 > Python基础教程 >
-
C#教程之 C#线程安全类型
1、IProducerConsumerCollection (线程安全接口)
此接口的所有实现必须都启用此接口的所有成员,若要从多个线程同时使用。
using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; namespace ConsoleApp1 { public class SafeStack<T> : IProducerConsumerCollection<T> { // Used for enforcing thread-safety private object m_lockObject = new object(); // We'll use a regular old Stack for our core operations private Stack<T> m_sequentialStack = null; // // Constructors // public SafeStack() { m_sequentialStack = new Stack<T>(); } public SafeStack(IEnumerable<T> collection) { m_sequentialStack = new Stack<T>(collection); } // // Safe Push/Pop support // public void Push(T item) { lock (m_lockObject) m_sequentialStack.Push(item); } public bool TryPop(out T item) { bool rval = true; lock (m_lockObject) { if (m_sequentialStack.Count == 0) { item = default(T); rval = false; } else item = m_sequentialStack.Pop(); } return rval; } // // IProducerConsumerCollection(T) support // public bool TryTake(out T item) { return TryPop(out item); } public bool TryAdd(T item) { Push(item); return true; // Push doesn't fail } public T[] ToArray() { T[] rval = null; lock (m_lockObject) rval = m_sequentialStack.ToArray(); return rval; } public void CopyTo(T[] array, int index) { lock (m_lockObject) m_sequentialStack.CopyTo(array, index); } // // Support for IEnumerable(T) // public IEnumerator<T> GetEnumerator() { // The performance here will be unfortunate for large stacks, // but thread-safety is effectively implemented. Stack<T> stackCopy = null; lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack); return stackCopy.GetEnumerator(); } // // Support for IEnumerable // IEnumerator IEnumerable.GetEnumerator() { return ((IEnumerable<T>)this).GetEnumerator(); } // // Support for ICollection // public bool IsSynchronized { get { return true; } } public object SyncRoot { get { return m_lockObject; } } public int Count { get { return m_sequentialStack.Count; } } public void CopyTo(Array array, int index) { lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index); } } }
using System; using System.Collections.Concurrent; namespace ConsoleApp1 { class Program { static void Main() { TestSafeStack(); // Keep the console window open in debug mode. Console.WriteLine("Press any key to exit."); Console.ReadKey(); } // Test our implementation of IProducerConsumerCollection(T) // Demonstrates: // IPCC(T).TryAdd() // IPCC(T).TryTake() // IPCC(T).CopyTo() static void TestSafeStack() { SafeStack<int> stack = new SafeStack<int>(); IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack; // Test Push()/TryAdd() stack.Push(10); Console.WriteLine("Pushed 10"); ipcc.TryAdd(20); Console.WriteLine("IPCC.TryAdded 20"); stack.Push(15); Console.WriteLine("Pushed 15"); int[] testArray = new int[3]; // Try CopyTo() within boundaries try { ipcc.CopyTo(testArray, 0); Console.WriteLine("CopyTo() within boundaries worked, as expected"); } catch (Exception e) { Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message); } // Try CopyTo() that overflows try { ipcc.CopyTo(testArray, 1); Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE"); } catch (Exception e) { Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message); } // Test enumeration Console.Write("Enumeration (should be three items): "); foreach (int item in stack) Console.Write("{0} ", item); Console.WriteLine(""); // Test TryPop() int popped = 0; if (stack.TryPop(out popped)) { Console.WriteLine("Successfully popped {0}", popped); } else Console.WriteLine("FAILED to pop!!"); // Test Count Console.WriteLine("stack count is {0}, should be 2", stack.Count); // Test TryTake() if (ipcc.TryTake(out popped)) { Console.WriteLine("Successfully IPCC-TryTaked {0}", popped); } else Console.WriteLine("FAILED to IPCC.TryTake!!"); } } }
2、ConcurrentStack类:安全堆栈
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var taskStack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskStack)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentStack<CustomTask> stack) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; stack.Push(workItem); Console.WriteLine("Task {0} has been posted", workItem.Id); } } static async Task TaskProcessor( ConcurrentStack<CustomTask> stack, string name, CancellationToken token) { await GetRandomDelay(); do { CustomTask workItem; bool popSuccesful = stack.TryPop(out workItem); if (popSuccesful) { Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } } }
3、ConcurrentQueue类:安全队列
using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var taskQueue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSource = Task.Run(() => TaskProducer(taskQueue)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId = i.ToString(); processors[i - 1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token)); } await taskSource; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(processors); } static async Task TaskProducer(ConcurrentQueue<CustomTask> queue) { for (int i = 1; i <= 20; i++) { await Task.Delay(50); var workItem = new CustomTask { Id = i }; queue.Enqueue(workItem); Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId); } } static async Task TaskProcessor( ConcurrentQueue<CustomTask> queue, string name, CancellationToken token) { CustomTask workItem; bool dequeueSuccesful = false; await GetRandomDelay(); do { dequeueSuccesful = queue.TryDequeue(out workItem); if (dequeueSuccesful) { Console.WriteLine("读取Task {0} has been processed by {1} ThreadID={2}", workItem.Id, name, Thread.CurrentThread.ManagedThreadId); } await GetRandomDelay(); } while (!token.IsCancellationRequested); } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } class CustomTask { public int Id { get; set; } } } }
4、ConcurrentDictionary类
ConcurrentDictionary类写操作比使用锁的通常字典(Dictionary)要慢的多,而读操作则要快些。因此对字典要大量的线程安全的读操作,ConcurrentDictionary类是最好的选择
ConcurrentDictionary类的实现使用了细粒度锁(fine-grained locking)技术,这在多线程写入方面比使用锁的通常的字典(也被称为粗粒度锁)
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; namespace ConsoleApp1 { class Program { static void Main(string[] args) { var concurrentDictionary = new ConcurrentDictionary<int, string>(); var dictionary = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = Item; } } sw.Stop(); Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDictionary[i] = Item; } sw.Stop(); Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { CurrentItem = dictionary[i]; } } sw.Stop(); Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed); sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem = concurrentDictionary[i]; } sw.Stop(); Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed); } const string Item = "Dictionary item"; public static string CurrentItem; } }
5、ConcurrentBag类
namespace ConsoleApp1 { class CrawlingTask { public string UrlToCrawl { get; set; } public string ProducerName { get; set; } } }
using System.Collections.Generic; namespace ConsoleApp1 { static class Module { public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>(); public static void CreateLinks() { _contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } } }
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Module.CreateLinks(); Task t = RunProgram(); t.Wait(); } static async Task RunProgram() { var bag = new ConcurrentBag<CrawlingTask>(); string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" }; var crawlers = new Task[4]; for (int i = 1; i <= 4; i++) { string crawlerName = "Crawler " + i.ToString(); bag.Add(new CrawlingTask { UrlToCrawl = urls[i - 1], ProducerName = "root" }); crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName)); } await Task.WhenAll(crawlers); } static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName) { CrawlingTask task; //尝试从bag中取出对象 while (bag.TryTake(out task)) { IEnumerable<string> urls = await GetLinksFromContent(task); if (urls != null) { foreach (var url in urls) { var t = new CrawlingTask { UrlToCrawl = url, ProducerName = crawlerName }; //将子集插入到bag中 bag.Add(t); } } Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!", task.UrlToCrawl, task.ProducerName, crawlerName); } } static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task) { await GetRandomDelay(); if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl]; return null; } static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(150, 200); return Task.Delay(delay); } } }
6、BlockingCollection类
BlockingCollection类: 我们能够改变任务存储在阻塞集合中的方式。默认情况下它使用的是ConcurrentQueue容器,但是我们能够使用任何实现了IProducerConsumerCollection泛型接口的集合。
namespace ConsoleApp1 { class CustomTask { public int Id { get; set; } } }
using System; using System.Threading.Tasks; namespace ConsoleApp1 { static class Module { public static Task GetRandomDelay() { int delay = new Random(DateTime.Now.Millisecond).Next(1, 500); return Task.Delay(delay); } } }
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Console.WriteLine("Using a Queue inside of BlockingCollection"); Console.WriteLine(); Task t = RunProgram(); t.Wait(); //Console.WriteLine(); //Console.WriteLine("Using a Stack inside of BlockingCollection"); //Console.WriteLine(); //Task t = RunProgram(new ConcurrentStack<CustomTask>()); //t.Wait(); } static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null) { var taskCollection = new BlockingCollection<CustomTask>(); if (collection != null) taskCollection = new BlockingCollection<CustomTask>(collection); //初始化collection中的数据 var taskSource = Task.Run(() => TaskProducer(taskCollection)); Task[] processors = new Task[4]; for (int i = 1; i <= 4; i++) { string processorId =
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式