首页 > Python基础教程 >
-
C#教程之TaskCreationOptions.LongRunning 运行比可用线程
最近在学WebSocket,服务端需要监听多个WebSocket客户端发送的消息。
开始的解决方法是每个WebSocket客户端都添加一个线程进行监听,代码如下:
/// <summary> /// 监听端口 创建WebSocket /// </summary> /// <param name="httpListener"></param> private void CreateWebSocket(HttpListener httpListener) { if (!httpListener.IsListening) throw new Exception("HttpListener未启动"); HttpListenerContext listenerContext = httpListener.GetContextAsync().Result; if (!listenerContext.Request.IsWebSocketRequest) { CreateWebSocket(httpListener); return; } WebSocketContext webSocket = null; try { webSocket = new WebSocketContext(listenerContext, SubProtocol); } catch (Exception ex) { log.Error(ex); CreateWebSocket(HttpListener); return; } log.Info($"成功创建WebSocket:{webSocket.ID}"); int workerThreads = 0, completionPortThreads = 0; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); if (workerThreads <= ReservedThreadsCount + 1 || completionPortThreads <= ReservedThreadsCount + 1) { /** * 可用线程小于预留线程数量 * 通知客户端关闭连接 * */ webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "可用线程不足,无法连接").Wait(); } else { if (OnReceiveMessage != null) webSocket.OnReceiveMessage += OnReceiveMessage; webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket; webSocketContexts.Add(webSocket); // 在线程中监听客户端发送的消息 ThreadPool.QueueUserWorkItem(new WaitCallback(p => { (p as WebSocketContext).ReceiveMessageAsync().Wait(); }), webSocket); } CreateWebSocket(HttpListener); }
但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。
接受消息方法如下:
/// <summary> /// 递归 同步接收消息 /// </summary> /// <returns></returns> public void ReceiveMessage() { WebSocket webSocket = HttpListenerWebSocketContext.WebSocket; if (webSocket.State != WebSocketState.Open) throw new Exception("Http未握手成功,不能接受消息!"); var byteBuffer = WebSocket.CreateServerBuffer(ReceiveBufferSize); WebSocketReceiveResult receiveResult = null; try { receiveResult = webSocket.ReceiveAsync(byteBuffer, cancellationToken).Result; } catch (WebSocketException ex) { if (ex.InnerException is HttpListenerException) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } else { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "WebSocket 连接异常" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } } catch (Exception ex) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客户端断开连接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } if (receiveResult.CloseStatus.HasValue) { log.Info("接受到关闭消息!"); CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription).Wait(TimeSpan.FromSeconds(20)); return; } byte[] bytes = new byte[receiveResult.Count]; Array.Copy(byteBuffer.Array, bytes, bytes.Length); string message = Encoding.GetString(bytes); log.Info($"{ID}接收到消息:{message}"); if (OnReceiveMessage != null) OnReceiveMessage.Invoke(this, message); if (!cancellationToken.IsCancellationRequested) ReceiveMessage(); }
这是不能接受的。
后来在Task中看到,在创建Task时可以设置TaskCreationOptions参数
该枚举有个字段LongRunning
LongRunning | 2 |
指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。 它会向 TaskScheduler 提示,过度订阅可能是合理的。 可以通过过度订阅创建比可用硬件线程数更多的线程。 它还将提示任务计划程序:该任务需要附加线程,以使任务不阻塞本地线程池队列中其他线程或工作项的向前推动。 |
经过测试,可同时运行的任务数量的确可以超出可用线程数量。
测试如下:
没有设置 TaskCreationOptions.LongRunning 代码如下:
/// <summary> /// 测试任务 /// 只运行了9个任务 /// </summary> [TestMethod] public void TestTask1() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(p => { int index = (int)p; int runCount = 0; LongRunningTask($"线程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.None, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待超时,等待任务没有执行 cts.Cancel(); } /// <summary> /// 长时运行任务 /// 递归运行 /// </summary> /// <param name="taskName">任务名称</param> /// <param name="runCount">运行次数</param> /// <param name="token">传播有关取消操作的通知</param> private void LongRunningTask(string taskName, int runCount, CancellationToken token) { PrintTask($"任务【{taskName}】线程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次运行").Wait(); if (!token.IsCancellationRequested) LongRunningTask(taskName, runCount, token); } /// <summary> /// 异步打印任务 等待1秒后打印消息 /// </summary> /// <param name="message">消息</param> /// <returns></returns> private Task PrintTask(string message) { return Task.Factory.StartNew(() => { Thread.Sleep(1000); Console.WriteLine(message); }); }
测试结果
测试用了20秒才完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务没有执行,而是等待超时了。
设置了 TaskCreationOptions.LongRunning 代码如下:
/// <summary> /// 测试长时运行任务 /// 30个任务全部都运行了 /// </summary> [TestMethod] public void TestTaskLongRunning() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(p => { int index = (int)p; int runCount = 0; LongRunningTask($"线程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待没有超时,等待任务有执行 cts.Cancel(); }
测试结果:
测试用了10秒完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务立即执行了,程序等待了10秒完成。
使用TaskCreationOptions.LongRunning 需要注意的是Action必须是同步方法同时运行任务书才能超出可以用线程数量,否则不能。
例如:
/// <summary> /// 测试长时运行任务 /// 只运行了前9个任务 /// </summary> [TestMethod] public void TestTaskLongRunning2() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 I/O 线程的最大数目为{1} 同时运行30个长时运行线程,每个线程中运行一个异步方法,看是否30个线程是否都能运行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用辅助线程数目为{MaxCompletionPortThreads},最大可用异步 I/O 线程数目为{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(async p => { int index = (int)p; int runCount = 0; await LongRunningTaskAsync($"线程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待没有超时,等待任务有执行 cts.Cancel(); } /// <summary> /// 异步长时运行任务 /// </summary> /// <param name="taskName">任务名称</param> /// <param name="runCount">运行次数</param> /// <param name="token">传播有关取消操作的通知</param> /// <returns></returns> private async Task LongRunningTaskAsync(string taskName, int runCount, CancellationToken token) { await PrintTask($"任务【{taskName}】线程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次运行"); if (!token.IsCancellationRequested) await LongRunningTaskAsync(taskName, runCount, token); }
测试结果
测试用了10秒完成
主线程创建了一个等待10秒后完成的任务,任务等待超时20秒
说明主程序创建的任务立即执行了,程序等待了10秒完成。
WebSocket修改后的监听方法:
/// <summary> /// 监听端口 创建WebSocket /// </summary> /// <param name="httpListener"></param> private void CreateWebSocket(HttpListener httpListener) { if (!httpListener.IsListening) throw new Exception("HttpListener未启动"); HttpListenerContext listenerContext = httpListener.GetContext(); if (!listenerContext.Request.IsWebSocketRequest) { CreateWebSocket(httpListener); return; } WebSocketContext webSocket = null; try { webSocket = new WebSocketContext(listenerContext, SubProtocol); } catch (Exception ex) { log.Error(ex); CreateWebSocket(HttpListener); return; } log.Info($"成功创建WebSocket:{webSocket.ID}"); int workerThreads = 0, completionPortThreads = 0; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); if (OnReceiveMessage != null) webSocket.OnReceiveMessage += OnReceiveMessage; webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket; Task.Factory.StartNew(() => { webSocket.ReceiveMessage(); }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); CreateWebSocket(HttpListener); }
修改后的WebSocket服务可以监听超过可用线程数量的客户端