当前位置:
首页 > temp > 简明python教程 >
-
C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据
网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?
写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。
支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。
自己本机测试,5000个客户端并发发送消息正常,CPU压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。
短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。
有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。
通过这几天的踩坑,测试,得出结论:
1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)
2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致
3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG
以下是主要代码:
SocketServerHelper代码:
using Models; using Newtonsoft.Json; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// Socket服务端帮助类 /// </summary> public class SocketServerHelper { #region 变量 private int _serverPort; private Socket serverSocket; private ConcurrentDictionary<ClientSocket, string> clientSocketList = new ConcurrentDictionary<ClientSocket, string>(); private ConcurrentDictionary<string, ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); private ConcurrentDictionary<string, ClientSocket> _dictDevNoClientSocket = new ConcurrentDictionary<string, ClientSocket>(); public int _CallbackTimeout = 20; /// <summary> /// 等待回调超时时间(单位:秒) /// </summary> public int CallbackTimeout { get { return _CallbackTimeout; } set { value = _CallbackTimeout; } } public int _WaitResultTimeout = 20; /// <summary> /// 等待返回结果超时时间(单位:秒) /// </summary> public int WaitResultTimeout { get { return _WaitResultTimeout; } set { value = _WaitResultTimeout; } } private object _lockSend = new object(); public event EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent; private System.Timers.Timer _checkClientTimer; #endregion #region SocketServerHelper 构造函数 public SocketServerHelper(int serverPort) { _serverPort = serverPort; } #endregion #region 启动服务 /// <summary> /// 启动服务 /// </summary> public bool StartServer() { try { IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, _serverPort); serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(ipEndPoint); serverSocket.Listen(5000); Thread thread = new Thread(new ThreadStart(delegate () { while (true) { Socket client = null; ClientSocket clientSocket = null; try { client = serverSocket.Accept(); client.SendTimeout = 20000; client.ReceiveTimeout = 20000; client.SendBufferSize = 10240; client.ReceiveBufferSize = 10240; clientSocket = new ClientSocket(client); clientSocketList.TryAdd(clientSocket, null); LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count); } catch (Exception ex) { LogUtil.Error(ex); Thread.Sleep(1); continue; } if (client == null) continue; Task.Run(() => { try { byte[] buffer = new byte[10240]; SocketAsyncEventArgs args = new SocketAsyncEventArgs(); clientSocket.SocketAsyncArgs = args; clientSocket.SocketAsyncCompleted = (s, e) => { ReceiveData(clientSocket, e); }; args.SetBuffer(buffer, 0, buffer.Length); args.Completed += clientSocket.SocketAsyncCompleted; client.ReceiveAsync(args); } catch (Exception ex) { LogUtil.Error(ex); } }); } })); thread.IsBackground = true; thread.Start(); //检测客户端 _checkClientTimer = new System.Timers.Timer(); _checkClientTimer.AutoReset = false; _checkClientTimer.Interval = 1000; _checkClientTimer.Elapsed += CheckClient; _checkClientTimer.Start(); LogUtil.Log("服务已启动"); return true; } catch (Exception ex) { LogUtil.Error(ex, "启动服务出错"); return false; } } #endregion #region 检测客户端 /// <summary> /// 检测客户端 /// </summary> private void CheckClient(object sender, System.Timers.ElapsedEventArgs e) { Task.Run(() => { try { foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray()) { Socket skt = clientSkt.Socket; ClientSocket temp; string strTemp; DateTime now = DateTime.Now; if (now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60) { clientSocketList.TryRemove(clientSkt, out strTemp); LogUtil.Log("客户端已失去连接,当前客户端数:" + clientSocketList.Count); ActionUtil.TryDoAction(() => { if (skt.Connected) skt.Disconnect(false); }); ActionUtil.TryDoAction(() => { skt.Close(); skt.Dispose(); if (clientSkt.SocketAsyncArgs != null) { if (clientSkt.SocketAsyncCompleted != null) { clientSkt.SocketAsyncArgs.Completed -= clientSkt.SocketAsyncCompleted; } clientSkt.SocketAsyncArgs.Dispose(); } clientSkt.SocketAsyncCompleted = null; clientSkt.SocketAsyncArgs = null; }); } } } catch (Exception ex) { LogUtil.Error(ex, "检测客户端出错"); } finally { _checkClientTimer.Start(); } }); } #endregion #region 接收数据 /// <summary> /// 处理接收的数据包 /// </summary> private void ReceiveData(ClientSocket clientSkt, SocketAsyncEventArgs e) { if (clientSkt == null) return; Socket skt = clientSkt.Socket; try { CopyTo(e.Buffer, clientSkt.Buffer, 0, e.BytesTransferred); #region 校验数据 if (clientSkt.Buffer.Count < 4) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrHeader = new byte[4]; CopyTo(clientSkt.Buffer, bArrHeader, 0, 0, bArrHeader.Length); string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (clientSkt.Buffer.Count < 5) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrType = new byte[1]; CopyTo(clientSkt.Buffer, bArrType, 4, 0, bArrType.Length); if (bArrType[0] == 0) { } //心跳包 else if (bArrType[0] == 2 || bArrType[0] == 4) //注册包、返回值包 { if (clientSkt.Buffer.Count < 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } else { byte[] bArrLength = new byte[4]; CopyTo(clientSkt.Buffer, bArrLength, 5, 0, bArrLength.Length); int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + 9) { if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("type错误,丢掉错误数据,重新接收"); clientSkt.Buffer.Clear(); //把错误的数据丢掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } } else { LogUtil.Error("不是0XFF,丢掉错误数据,重新接收"); clientSkt.Buffer.Clear(); //把错误的数据丢掉 if (skt.Connected) skt.ReceiveAsync(e); return; } } #endregion SocketData data = null; do { data = ProcessSocketData(clientSkt); } while (data != null); if (skt.Connected) skt.ReceiveAsync(e); } catch (Exception ex) { LogUtil.Error(ex, "处理接收的数据包 异常"); } } #endregion #region 处理接收的数据包 /// <summary> /// 处理接收的数据包 /// </summary> private SocketData ProcessSocketData(ClientSocket clientSkt) { int readLength = 0; SocketData data = ResolveBuffer(clientSkt.Buffer, out readLength); if (data != null) { if (readLength > 0) clientSkt.RemoveBufferData(readLength); if (data.Type == 0) //收到心跳包 { clientSkt.LastHeartbeat = DateTime.Now; //心跳应答 if (clientSkt.RoomNo != null || clientSkt.DevNo != null) { ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x01 }); } }); } else { LogUtil.Log("没有注册信息"); } LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } if (data.Type == 2) //收到注册包 { if (data.SocketRegisterData != null && clientSkt != null) { ClientSocket temp; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.SocketRegisterData.RoomNo, out temp); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.SocketRegisterData.DevNo, out temp); clientSkt.RoomNo = data.SocketRegisterData.RoomNo; clientSkt.DevNo = data.SocketRegisterData.DevNo; if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryAdd(data.SocketRegisterData.RoomNo, clientSkt); if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryAdd(data.SocketRegisterData.DevNo, clientSkt); LogUtil.Log("收到注册包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); //注册反馈 ThreadHelper.Run(() => { lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); SocketHelper.Send(clientSkt.Socket, bArrHeader); SocketHelper.Send(clientSkt.Socket, new byte[] { 0x05 }); } }); } } if (data.Type == 4) //收到返回值包 { ThreadHelper.Run(() => { if (data.SocketResult != null) clientSkt.CallbackDict.TryAdd(data.SocketResult.callbackId, data.SocketResult); if (ReceivedSocketResultEvent != null) { ReceivedSocketResultEvent(null, new Models.ReceivedSocketResultEventArgs(data.SocketResult)); } }); LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } } return data; } #endregion #region ResolveBuffer /// <summary> /// 解析字节数组 /// </summary> private SocketData ResolveBuffer(List<byte> buffer, out int readLength) { SocketData socketData = null; readLength = 0; try { if (buffer.Count < 4) return null; byte[] bArrHeader = new byte[4]; CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length); readLength += bArrHeader.Length; string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.ToUpper() == "0XFF") { if (buffer.Count < 5) return null; byte[] bArrType = new byte[1]; CopyTo(buffer, bArrType, 4, 0, bArrType.Length); readLength += bArrType.Length; byte bType = bArrType[0]; socketData = new SocketData(); socketData.Type = bType; if (socketData.Type == 2) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString); } if (socketData.Type == 4) { if (buffer.Count < 9) return null; byte[] bArrLength = new byte[4]; CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length); readLength += bArrLength.Length; int dataLength = BitConverter.ToInt32(bArrLength, 0); if (dataLength == 0 || buffer.Count < dataLength + 9) return null; byte[] dataBody = new byte[dataLength]; CopyTo(buffer, dataBody, 9, 0, dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.SocketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString); } } else { LogUtil.Error("不是0XFF"); return null; } } catch (Exception ex) { LogUtil.Error(ex, "解析字节数组 出错"); return null; } return socketData; } #endregion #region CopyTo /// <summary> /// 数组复制 /// </summary> private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length) { for (int i = 0; i < length; i++) { if (sourceIndex + i < bArrSource.Length) { listTarget.Add(bArrSource[sourceIndex + i]); } } } /// <summary> /// 数组复制 /// </summary> private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length) { for (int i = 0; i < length; i++) { if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count) { bArrTarget[targetIndex + i] = listSource[sourceIndex + i]; } } } #endregion #region 停止服务 /// <summary> /// 停止服务 /// </summary> public void StopServer() { try { foreach (ClientSocket clientSocket in clientSocketList.Keys.ToArray()) { Socket socket = clientSocket.Socket; ActionUtil.TryDoAction(() => { if (socket.Connected) socket.Disconnect(false); }); ActionUtil.TryDoAction(() => { socket.Close(); socket.Dispose(); }); } clientSocketList.Clear(); _dictDevNoClientSocket.Clear(); _dictRoomNoClientSocket.Clear(); if (serverSocket != null) { ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.Disconnect(false); }); ActionUtil.TryDoAction(() => { serverSocket.Close(); serverSocket.Dispose(); }); } LogUtil.Log("服务已停止"); } catch (Exception ex) { LogUtil.Error(ex, "停止服务出错"); } } #endregion #region 释放资源 /// <summary> /// 释放资源 /// </summary> public void Dispose() { if (_checkClientTimer != null) { _checkClientTimer.Stop(); _checkClientTimer.Close(); } } #endregion #region Send /// <summary> /// Send 单个发送 并等待结果 /// </summary> /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns> public SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } Send(clientSocket, data); return WaitSocketResult(clientSocket, msgContent.callbackId); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客户端不存在"; return socketResult; } } /// <summary> /// Send 单个发送 /// </summary> /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns> public void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null) { SocketData data = new SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = null; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket); if (clientSocket != null) { if (string.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString("N"); } if (callback != null) { WaitCallback(clientSocket, msgContent.callbackId, callback); } Send(clientSocket, data); } else { SocketResult socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "客户端不存在"; if (callback != null) callback(socketResult); } } /// <summary> /// 等待回调 /// </summary> private void WaitCallback(ClientSocket clientSocket, string callbackId, Action<SocketResult> callback = null) { DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout); System.Timers.Timer timer = new System.Timers.Timer(); timer.AutoReset = false; timer.Interval = 100; timer.Elapsed += (s, e) => { try { SocketResult socketResult; if (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { timer.Start(); return; } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超时"; } if (callback != null) callback(socketResult); timer.Close(); } catch (Exception ex) { LogUtil.Error("WaitCallback error" + ex); } }; timer.Start(); } /// <summary> /// 等待SocketResult /// </summary> private SocketResult WaitSocketResult(ClientSocket clientSocket, string callbackId) { SocketResult socketResult; DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout); while (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt) { Thread.Sleep(10); } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId, out sktResult); if (socketResult == null) { socketResult = new SocketResult(); socketResult.success = false; socketResult.errorMsg = "超时"; } return socketResult; } /// <summary> /// Send /// </summary> /// <returns>false:发送失败 true:发送成功,但不表示对方已收到</returns> private void Send(ClientSocket clientSocket, SocketData data) { bool bl = false; Socket socket = clientSocket.Socket; lock (clientSocket.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header bl = SocketHelper.Send(socket, bArrHeader); if (data.Type == 1) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x01 }); //发送type } if (data.Type == 3) { if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x03 }); //发送type if (data.MsgContent != null) { byte[] bArrData = null; if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent)); if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length if (bl) bl = SocketHelper.Send(socket, bArrData); //
栏目列表
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
SQL Server -- 解决存储过程传入参数作为s
关于JS定时器的整理
JS中使用Promise.all控制所有的异步请求都完
js中字符串的方法
import-local执行流程与node模块路径解析流程
检测数据类型的四种方法
js中数组的方法,32种方法
前端操作方法
数据类型
window.localStorage.setItem 和 localStorage.setIte
如何完美解决前端数字计算精度丢失与数