VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > temp > 简明python教程 >
  • C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据

  网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?

    写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。

    支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。

    自己本机测试,5000个客户端并发发送消息正常,CPU压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。

    短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。

    有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。

    通过这几天的踩坑,测试,得出结论:

    1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)

    2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致

    3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG

    以下是主要代码:

    SocketServerHelper代码:

复制代码
using Models;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Configuration;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Socket服务端帮助类
    /// </summary>
    public class SocketServerHelper
    {
        #region 变量
        private int _serverPort;
        private Socket serverSocket;
        private ConcurrentDictionary<ClientSocket, string> clientSocketList = new ConcurrentDictionary<ClientSocket, string>();
        private ConcurrentDictionary<string, ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<string, ClientSocket>();
        private ConcurrentDictionary<string, ClientSocket> _dictDevNoClientSocket = new ConcurrentDictionary<string, ClientSocket>();

        public int _CallbackTimeout = 20;
        /// <summary>
        /// 等待回调超时时间(单位:秒)
        /// </summary>
        public int CallbackTimeout
        {
            get { return _CallbackTimeout; }
            set { value = _CallbackTimeout; }
        }

        public int _WaitResultTimeout = 20;
        /// <summary>
        /// 等待返回结果超时时间(单位:秒)
        /// </summary>
        public int WaitResultTimeout
        {
            get { return _WaitResultTimeout; }
            set { value = _WaitResultTimeout; }
        }

        private object _lockSend = new object();

        public event EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent;

        private System.Timers.Timer _checkClientTimer;
        #endregion

        #region SocketServerHelper 构造函数
        public SocketServerHelper(int serverPort)
        {
            _serverPort = serverPort;
        }
        #endregion

        #region 启动服务
        /// <summary>
        /// 启动服务
        /// </summary>
        public bool StartServer()
        {
            try
            {
                IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, _serverPort);
                serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                serverSocket.Bind(ipEndPoint);
                serverSocket.Listen(5000);
                Thread thread = new Thread(new ThreadStart(delegate ()
                {
                    while (true)
                    {
                        Socket client = null;
                        ClientSocket clientSocket = null;

                        try
                        {
                            client = serverSocket.Accept();
                            client.SendTimeout = 20000;
                            client.ReceiveTimeout = 20000;
                            client.SendBufferSize = 10240;
                            client.ReceiveBufferSize = 10240;
                            clientSocket = new ClientSocket(client);
                            clientSocketList.TryAdd(clientSocket, null);
                            LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count);
                        }
                        catch (Exception ex)
                        {
                            LogUtil.Error(ex);
                            Thread.Sleep(1);
                            continue;
                        }

                        if (client == null) continue;

                        Task.Run(() =>
                        {
                            try
                            {
                                byte[] buffer = new byte[10240];
                                SocketAsyncEventArgs args = new SocketAsyncEventArgs();
                                clientSocket.SocketAsyncArgs = args;
                                clientSocket.SocketAsyncCompleted = (s, e) =>
                                {
                                    ReceiveData(clientSocket, e);
                                };
                                args.SetBuffer(buffer, 0, buffer.Length);
                                args.Completed += clientSocket.SocketAsyncCompleted;
                                client.ReceiveAsync(args);
                            }
                            catch (Exception ex)
                            {
                                LogUtil.Error(ex);
                            }
                        });
                    }
                }));
                thread.IsBackground = true;
                thread.Start();

                //检测客户端
                _checkClientTimer = new System.Timers.Timer();
                _checkClientTimer.AutoReset = false;
                _checkClientTimer.Interval = 1000;
                _checkClientTimer.Elapsed += CheckClient;
                _checkClientTimer.Start();

                LogUtil.Log("服务已启动");
                return true;
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "启动服务出错");
                return false;
            }
        }
        #endregion

        #region 检测客户端
        /// <summary>
        /// 检测客户端
        /// </summary>
        private void CheckClient(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task.Run(() =>
            {
                try
                {
                    foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray())
                    {
                        Socket skt = clientSkt.Socket;
                        ClientSocket temp;
                        string strTemp;

                        DateTime now = DateTime.Now;
                        if (now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60)
                        {
                            clientSocketList.TryRemove(clientSkt, out strTemp);
                            LogUtil.Log("客户端已失去连接,当前客户端数:" + clientSocketList.Count);
                            ActionUtil.TryDoAction(() => { if (skt.Connected) skt.Disconnect(false); });
                            ActionUtil.TryDoAction(() =>
                            {
                                skt.Close();
                                skt.Dispose();
                                if (clientSkt.SocketAsyncArgs != null)
                                {
                                    if (clientSkt.SocketAsyncCompleted != null)
                                    {
                                        clientSkt.SocketAsyncArgs.Completed -= clientSkt.SocketAsyncCompleted;
                                    }
                                    clientSkt.SocketAsyncArgs.Dispose();
                                }
                                clientSkt.SocketAsyncCompleted = null;
                                clientSkt.SocketAsyncArgs = null;
                            });
                        }
                    }

                }
                catch (Exception ex)
                {
                    LogUtil.Error(ex, "检测客户端出错");
                }
                finally
                {
                    _checkClientTimer.Start();
                }
            });
        }
        #endregion

        #region 接收数据
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private void ReceiveData(ClientSocket clientSkt, SocketAsyncEventArgs e)
        {
            if (clientSkt == null) return;
            Socket skt = clientSkt.Socket;

            try
            {
                CopyTo(e.Buffer, clientSkt.Buffer, 0, e.BytesTransferred);

                #region 校验数据
                if (clientSkt.Buffer.Count < 4)
                {
                    if (skt.Connected) skt.ReceiveAsync(e);
                    return;
                }
                else
                {
                    byte[] bArrHeader = new byte[4];
                    CopyTo(clientSkt.Buffer, bArrHeader, 0, 0, bArrHeader.Length);
                    string strHeader = Encoding.ASCII.GetString(bArrHeader);
                    if (strHeader.ToUpper() == "0XFF")
                    {
                        if (clientSkt.Buffer.Count < 5)
                        {
                            if (skt.Connected) skt.ReceiveAsync(e);
                            return;
                        }
                        else
                        {
                            byte[] bArrType = new byte[1];
                            CopyTo(clientSkt.Buffer, bArrType, 4, 0, bArrType.Length);
                            if (bArrType[0] == 0) { } //心跳包
                            else if (bArrType[0] == 2 || bArrType[0] == 4) //注册包、返回值包
                            {
                                if (clientSkt.Buffer.Count < 9)
                                {
                                    if (skt.Connected) skt.ReceiveAsync(e);
                                    return;
                                }
                                else
                                {
                                    byte[] bArrLength = new byte[4];
                                    CopyTo(clientSkt.Buffer, bArrLength, 5, 0, bArrLength.Length);
                                    int dataLength = BitConverter.ToInt32(bArrLength, 0);
                                    if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + 9)
                                    {
                                        if (skt.Connected) skt.ReceiveAsync(e);
                                        return;
                                    }
                                }
                            }
                            else
                            {
                                LogUtil.Error("type错误,丢掉错误数据,重新接收");
                                clientSkt.Buffer.Clear(); //把错误的数据丢掉
                                if (skt.Connected) skt.ReceiveAsync(e);
                                return;
                            }
                        }
                    }
                    else
                    {
                        LogUtil.Error("不是0XFF,丢掉错误数据,重新接收");
                        clientSkt.Buffer.Clear(); //把错误的数据丢掉
                        if (skt.Connected) skt.ReceiveAsync(e);
                        return;
                    }
                }
                #endregion

                SocketData data = null;
                do
                {
                    data = ProcessSocketData(clientSkt);
                } while (data != null);

                if (skt.Connected) skt.ReceiveAsync(e);
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "处理接收的数据包 异常");
            }
        }
        #endregion

        #region 处理接收的数据包
        /// <summary>
        /// 处理接收的数据包
        /// </summary>
        private SocketData ProcessSocketData(ClientSocket clientSkt)
        {
            int readLength = 0;
            SocketData data = ResolveBuffer(clientSkt.Buffer, out readLength);
            if (data != null)
            {
                if (readLength > 0) clientSkt.RemoveBufferData(readLength);
                if (data.Type == 0) //收到心跳包
                {
                    clientSkt.LastHeartbeat = DateTime.Now;

                    //心跳应答
                    if (clientSkt.RoomNo != null || clientSkt.DevNo != null)
                    {
                        ThreadHelper.Run(() =>
                        {
                            lock (clientSkt.LockSend)
                            {
                                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF");
                                SocketHelper.Send(clientSkt.Socket, bArrHeader);
                                SocketHelper.Send(clientSkt.Socket, new byte[] { 0x01 });
                            }
                        });
                    }
                    else
                    {
                        LogUtil.Log("没有注册信息");
                    }

                    LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }

                if (data.Type == 2) //收到注册包
                {
                    if (data.SocketRegisterData != null && clientSkt != null)
                    {
                        ClientSocket temp;
                        if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.SocketRegisterData.RoomNo, out temp);
                        if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.SocketRegisterData.DevNo, out temp);
                        clientSkt.RoomNo = data.SocketRegisterData.RoomNo;
                        clientSkt.DevNo = data.SocketRegisterData.DevNo;
                        if (data.SocketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryAdd(data.SocketRegisterData.RoomNo, clientSkt);
                        if (data.SocketRegisterData.DevNo != null) _dictDevNoClientSocket.TryAdd(data.SocketRegisterData.DevNo, clientSkt);
                        LogUtil.Log("收到注册包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);

                        //注册反馈
                        ThreadHelper.Run(() =>
                        {
                            lock (clientSkt.LockSend)
                            {
                                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF");
                                SocketHelper.Send(clientSkt.Socket, bArrHeader);
                                SocketHelper.Send(clientSkt.Socket, new byte[] { 0x05 });
                            }
                        });
                    }
                }

                if (data.Type == 4) //收到返回值包
                {
                    ThreadHelper.Run(() =>
                    {
                        if (data.SocketResult != null) clientSkt.CallbackDict.TryAdd(data.SocketResult.callbackId, data.SocketResult);

                        if (ReceivedSocketResultEvent != null)
                        {
                            ReceivedSocketResultEvent(null, new Models.ReceivedSocketResultEventArgs(data.SocketResult));
                        }
                    });
                    LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }
            }
            return data;
        }
        #endregion

        #region ResolveBuffer
        /// <summary>
        /// 解析字节数组
        /// </summary>
        private SocketData ResolveBuffer(List<byte> buffer, out int readLength)
        {
            SocketData socketData = null;
            readLength = 0;

            try
            {
                if (buffer.Count < 4) return null;
                byte[] bArrHeader = new byte[4];
                CopyTo(buffer, bArrHeader, 0, 0, bArrHeader.Length);
                readLength += bArrHeader.Length;
                string strHeader = Encoding.ASCII.GetString(bArrHeader);
                if (strHeader.ToUpper() == "0XFF")
                {
                    if (buffer.Count < 5) return null;
                    byte[] bArrType = new byte[1];
                    CopyTo(buffer, bArrType, 4, 0, bArrType.Length);
                    readLength += bArrType.Length;
                    byte bType = bArrType[0];
                    socketData = new SocketData();
                    socketData.Type = bType;

                    if (socketData.Type == 2)
                    {
                        if (buffer.Count < 9) return null;
                        byte[] bArrLength = new byte[4];
                        CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length);
                        readLength += bArrLength.Length;
                        int dataLength = BitConverter.ToInt32(bArrLength, 0);

                        if (dataLength == 0 || buffer.Count < dataLength + 9) return null;
                        byte[] dataBody = new byte[dataLength];
                        CopyTo(buffer, dataBody, 9, 0, dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.SocketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString);
                    }

                    if (socketData.Type == 4)
                    {
                        if (buffer.Count < 9) return null;
                        byte[] bArrLength = new byte[4];
                        CopyTo(buffer, bArrLength, 5, 0, bArrLength.Length);
                        readLength += bArrLength.Length;
                        int dataLength = BitConverter.ToInt32(bArrLength, 0);

                        if (dataLength == 0 || buffer.Count < dataLength + 9) return null;
                        byte[] dataBody = new byte[dataLength];
                        CopyTo(buffer, dataBody, 9, 0, dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.SocketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString);
                    }
                }
                else
                {
                    LogUtil.Error("不是0XFF");
                    return null;
                }

            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "解析字节数组 出错");
                return null;
            }

            return socketData;
        }
        #endregion

        #region CopyTo
        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(byte[] bArrSource, List<byte> listTarget, int sourceIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (sourceIndex + i < bArrSource.Length)
                {
                    listTarget.Add(bArrSource[sourceIndex + i]);
                }
            }
        }

        /// <summary>
        /// 数组复制
        /// </summary>
        private void CopyTo(List<byte> listSource, byte[] bArrTarget, int sourceIndex, int targetIndex, int length)
        {
            for (int i = 0; i < length; i++)
            {
                if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count)
                {
                    bArrTarget[targetIndex + i] = listSource[sourceIndex + i];
                }
            }
        }
        #endregion

        #region 停止服务
        /// <summary>
        /// 停止服务
        /// </summary>
        public void StopServer()
        {
            try
            {
                foreach (ClientSocket clientSocket in clientSocketList.Keys.ToArray())
                {
                    Socket socket = clientSocket.Socket;
                    ActionUtil.TryDoAction(() => { if (socket.Connected) socket.Disconnect(false); });
                    ActionUtil.TryDoAction(() =>
                    {
                        socket.Close();
                        socket.Dispose();
                    });
                }
                clientSocketList.Clear();
                _dictDevNoClientSocket.Clear();
                _dictRoomNoClientSocket.Clear();
                if (serverSocket != null)
                {
                    ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.Disconnect(false); });
                    ActionUtil.TryDoAction(() =>
                    {
                        serverSocket.Close();
                        serverSocket.Dispose();
                    });
                }
                LogUtil.Log("服务已停止");
            }
            catch (Exception ex)
            {
                LogUtil.Error(ex, "停止服务出错");
            }
        }
        #endregion

        #region 释放资源
        /// <summary>
        /// 释放资源
        /// </summary>
        public void Dispose()
        {
            if (_checkClientTimer != null)
            {
                _checkClientTimer.Stop();
                _checkClientTimer.Close();
            }
        }
        #endregion

        #region Send
        /// <summary>
        /// Send 单个发送 并等待结果
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns>
        public SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo)
        {
            SocketData data = new SocketData();
            data.Type = 3;
            data.MsgContent = msgContent;

            ClientSocket clientSocket = null;
            if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket);
            if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket);

            if (clientSocket != null)
            {
                if (string.IsNullOrWhiteSpace(msgContent.callbackId))
                {
                    msgContent.callbackId = Guid.NewGuid().ToString("N");
                }

                Send(clientSocket, data);
                return WaitSocketResult(clientSocket, msgContent.callbackId);
            }
            else
            {
                SocketResult socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "客户端不存在";
                return socketResult;
            }
        }

        /// <summary>
        /// Send 单个发送
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns>
        public void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null)
        {
            SocketData data = new SocketData();
            data.Type = 3;
            data.MsgContent = msgContent;

            ClientSocket clientSocket = null;
            if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo, out clientSocket);
            if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo, out clientSocket);

            if (clientSocket != null)
            {
                if (string.IsNullOrWhiteSpace(msgContent.callbackId))
                {
                    msgContent.callbackId = Guid.NewGuid().ToString("N");
                }

                if (callback != null)
                {
                    WaitCallback(clientSocket, msgContent.callbackId, callback);
                }

                Send(clientSocket, data);
            }
            else
            {
                SocketResult socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "客户端不存在";
                if (callback != null) callback(socketResult);
            }
        }

        /// <summary>
        /// 等待回调
        /// </summary>
        private void WaitCallback(ClientSocket clientSocket, string callbackId, Action<SocketResult> callback = null)
        {
            DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout);
            System.Timers.Timer timer = new System.Timers.Timer();
            timer.AutoReset = false;
            timer.Interval = 100;
            timer.Elapsed += (s, e) =>
            {
                try
                {
                    SocketResult socketResult;
                    if (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt)
                    {
                        timer.Start();
                        return;
                    }
                    SocketResult sktResult;
                    clientSocket.CallbackDict.TryRemove(callbackId, out sktResult);
                    if (socketResult == null)
                    {
                        socketResult = new SocketResult();
                        socketResult.success = false;
                        socketResult.errorMsg = "超时";
                    }

                    if (callback != null) callback(socketResult);

                    timer.Close();
                }
                catch (Exception ex)
                {
                    LogUtil.Error("WaitCallback error" + ex);
                }
            };
            timer.Start();
        }

        /// <summary>
        /// 等待SocketResult
        /// </summary>
        private SocketResult WaitSocketResult(ClientSocket clientSocket, string callbackId)
        {
            SocketResult socketResult;
            DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout);
            while (!clientSocket.CallbackDict.TryGetValue(callbackId, out socketResult) && DateTime.Now < dt)
            {
                Thread.Sleep(10);
            }
            SocketResult sktResult;
            clientSocket.CallbackDict.TryRemove(callbackId, out sktResult);
            if (socketResult == null)
            {
                socketResult = new SocketResult();
                socketResult.success = false;
                socketResult.errorMsg = "超时";
            }
            return socketResult;
        }

        /// <summary>
        /// Send
        /// </summary>
        /// <returns>false:发送失败 true:发送成功,但不表示对方已收到</returns>
        private void Send(ClientSocket clientSocket, SocketData data)
        {
            bool bl = false;
            Socket socket = clientSocket.Socket;

            lock (clientSocket.LockSend)
            {
                byte[] bArrHeader = Encoding.ASCII.GetBytes("0XFF"); //发送header
                bl = SocketHelper.Send(socket, bArrHeader);

                if (data.Type == 1)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x01 }); //发送type
                }

                if (data.Type == 3)
                {
                    if (bl) bl = SocketHelper.Send(socket, new byte[] { 0x03 }); //发送type

                    if (data.MsgContent != null)
                    {
                        byte[] bArrData = null;
                        if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent));
                        if (bl) bl = SocketHelper.Send(socket, BitConverter.GetBytes(bArrData.Length)); //发送length
                        if (bl) bl = SocketHelper.Send(socket, bArrData); //
      



  

相关教程