当前位置:
首页 > temp > 简明python教程 >
-
C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据(3)
using Models;
using Newtonsoft.Json;
using PrisonWebApi.Controllers.Common;
using PrisonWebApi.DAL;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.Configuration;
using System.Linq;
using System.Threading;
using System.Timers;
using System.Web;
namespace Utils
{
/// <summary>
/// Web API 消息工具类
/// </summary>
public static class MsgUtil
{
#region 变量
private static WebApiMsgDal m_WebApiMsgDal = null;
private static System.Timers.Timer _timer;
private static SocketServerHelper _socketServerHelper;
#endregion
#region Init 初始化
/// <summary>
/// 初始化
/// </summary>
public static void Init()
{
ThreadHelper.Run(() =>
{
m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>();
int port = int.Parse(ConfigurationManager.AppSettings["SocketServerPort"]);
_socketServerHelper = new SocketServerHelper(port);
_socketServerHelper.ReceivedSocketResultEvent += _socketServerHelper_ReceivedSocketResultEvent;
_socketServerHelper.StartServer();
_timer = new System.Timers.Timer();
_timer.AutoReset = false;
_timer.Interval = 40000; //注意,这个参数必须比Socket等待回调超时时间CallbackTimeout大
_timer.Elapsed += MsgTask;
_timer.Start();
LogUtil.Log("Web API 消息工具类 初始化成功");
}, (ex) =>
{
LogUtil.Error("Web API 消息工具类 初始化失败");
});
}
#endregion
#region 定时任务
/// <summary>
/// 定时任务
/// </summary>
private static void MsgTask(object sender, ElapsedEventArgs e)
{
ThreadHelper.Run(() =>
{
try
{
m_WebApiMsgDal.DeleteTimeoutMsg(); //删除超时的消息
List<WEBAPI_MSG> list = m_WebApiMsgDal.GetMsgList();
foreach (WEBAPI_MSG msg in list)
{
WebApiMsgContent msgContent = JsonConvert.DeserializeObject<WebApiMsgContent>(msg.MSGCONTENT);
msgContent.callbackId = msg.ID;
Send(msgContent, msg.RECEIVER, msg.RECEIVER, null);
}
if (list.Count > 0)
{
LogUtil.Log("已重发" + list.Count.ToString() + "条消息");
}
}
catch (Exception ex)
{
LogUtil.Error(ex);
}
finally
{
_timer.Start();
}
});
}
#endregion
#region 接收数据
/// <summary>
/// 接收数据
/// </summary>
private static void _socketServerHelper_ReceivedSocketResultEvent(object sender, ReceivedSocketResultEventArgs e)
{
Func<string, bool> func = (callbackId) =>
{
try
{
if (m_WebApiMsgDal.Exists((string)callbackId))
{
m_WebApiMsgDal.DeleteById((string)callbackId);
}
}
catch (Exception ex)
{
LogUtil.Error(ex, "删除消息出错");
return false;
}
return true;
};
int tryCount = 0;
if (e.SocketResult != null)
{
while (!func(e.SocketResult.callbackId) && tryCount++ < 10)
{
Thread.Sleep(1000);
}
}
}
#endregion
#region Send 发送消息
/// <summary>
/// Send 发送消息
/// </summary>
public static void Send(WebApiMsgContent msgContent, string roomNo, string devNo, Action<SocketResult> callback = null)
{
_socketServerHelper.Send(msgContent, roomNo, devNo, callback);
}
/// <summary>
/// Send 发送消息
/// </summary>
public static SocketResult Send(WebApiMsgContent msgContent, string roomNo, string devNo)
{
try
{
return _socketServerHelper.Send(msgContent, roomNo, devNo);
}
catch (Exception ex)
{
LogUtil.Error(ex, "发送消息失败");
return null;
}
}
#endregion
#region 释放资源
/// <summary>
/// 释放资源
/// </summary>
public static void Dispose()
{
ThreadHelper.Run(() =>
{
_timer.Stop();
_timer.Elapsed -= MsgTask;
_timer.Close();
_timer.Dispose();
_timer = null;
_socketServerHelper.StopServer();
_socketServerHelper.ReceivedSocketResultEvent -= _socketServerHelper_ReceivedSocketResultEvent;
LogUtil.Log("Web API 消息工具类 释放资源成功");
}, (ex) =>
{
LogUtil.Error("Web API 消息工具类 释放资源失败");
});
}
#endregion
}
}
Web API 接口 MsgController 代码:
using DBUtil; using Models; using Newtonsoft.Json; using PrisonWebApi.DAL; using Swashbuckle.Swagger.Annotations; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.ComponentModel.DataAnnotations; using System.Globalization; using System.Linq; using System.Net; using System.Net.Http; using System.Web; using System.Web.Http; using Utils; namespace PrisonWebApi.Controllers.Common { /// <summary> /// Web API 消息 /// </summary> [RoutePrefix("api/msg")] public class MsgController : ApiController { #region 变量属性 private WebApiMsgDal m_WebApiMsgDal = ServiceHelper.Get<WebApiMsgDal>(); private TwoCJsDal m_TwoCJsDal = ServiceHelper.Get<TwoCJsDal>(); private BackstageAppInstallDal m_BackstageAppInstallDal = ServiceHelper.Get<BackstageAppInstallDal>(); #endregion #region 发送消息 /// <summary> /// 发送消息 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("SendMsg")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<SendMsgData>))] public HttpResponseMessage SendMsg([FromBody] SendMsgData data) { JsonResult jsonResult = null; if (data == null || data.msgContent == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } if (data.roomNo != null && data.devNos != null) { jsonResult = new JsonResult("监室号和设备编码(指仓内屏或仓外屏的设备编码)不能都有值,应填写其中一个,或者都不填写", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } if (string.IsNullOrWhiteSpace(data.msgContent.msgTime)) data.msgContent.msgTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); if (!string.IsNullOrWhiteSpace(data.devNos)) { try { foreach (string devNo in data.devNos.Split(',')) { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, null, devNo, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = devNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } else { if (!string.IsNullOrWhiteSpace(data.roomNo)) { try { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, data.roomNo, null, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = data.roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } else { try { List<string> roomNoList = m_TwoCJsDal.GetRoomNoListAll(); foreach (string roomNo in roomNoList) { data.msgContent.callbackId = Guid.NewGuid().ToString("N"); MsgUtil.Send(data.msgContent, roomNo, null, (socketResult) => { if (socketResult == null || !socketResult.success) { WEBAPI_MSG info = new WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString("N"); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime, "yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture); info.RECEIVER = roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApiMsgDal.Insert(info); } }); } } catch (Exception ex) { LogUtil.Error(ex, "消息发送失败"); jsonResult = new JsonResult("消息发送失败", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } } } jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "消息发送成功" }); return ApiHelper.ToJson(jsonResult); } #endregion #region APP安装消息反馈 /// <summary> /// APP安装消息反馈 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("InstallMsgFeedback")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))] public HttpResponseMessage InstallMsgFeedback([FromBody] InstallMsgFeedbackData data) { JsonResult jsonResult = null; if (data == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } BACKSTAGE_APP_INSTALL info = m_BackstageAppInstallDal.Get(data.id); if (info != null) { if (data.success) { info.STATUS = "1"; m_BackstageAppInstallDal.Update(info); } jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "反馈成功", id = info.ID }); } else { jsonResult = new JsonResult("反馈失败:安装记录不存在", ResultCode.操作失败); return ApiHelper.ToJson(jsonResult); } return ApiHelper.ToJson(jsonResult); } #endregion #region 发起点名成功反馈 /// <summary> /// 发起点名成功反馈 /// </summary> /// <param name="data">POST数据</param> [HttpPost] [Route("RollCallMsgFeedback")] [SwaggerResponse(HttpStatusCode.OK, "返回JSON", typeof(JsonResult<CommonSubmitResult>))] public HttpResponseMessage RollCallMsgFeedback([FromBody] RollCallMsgFeedbackData data) { JsonResult jsonResult = null; if (data == null) { jsonResult = new JsonResult("请检查参数格式", ResultCode.参数不正确); return ApiHelper.ToJson(jsonResult); } //TODO:此处尚未完成 jsonResult = new JsonResult<CommonSubmitResult>(new CommonSubmitResult() { msg = "反馈成功", id = null }); return ApiHelper.ToJson(jsonResult); } #endregion } #region SendMsgData 发送消息数据 /// <summary> /// 发送消息数据 /// </summary> [MyValidate] public class SendMsgData { /// <summary> /// 消息内容 /// </summary> [Required] public WebApiMsgContent msgContent { get; set; } /// <summary> /// 监室号(如果为空,并且devNos也为空,则发送到所有监室;如果为空,并且devNos不为空,则按devNos发送) /// </summary> public string roomNo { get; set; } /// <summary> /// 设备编码(逗号隔开)(仓内屏或仓外屏的设备编码) /// </summary> public string devNos { get; set; } } /// <summary> /// APP安装消息反馈 /// </summary> [MyValidate] public class InstallMsgFeedbackData { /// <summary> /// 安装记录ID /// </summary> [Required] public string id { get; set; } /// <summary> /// 安装是否成功 /// </summary> [Required] public bool success { get; set; } /// <summary> /// 安装失败原因 /// </summary> public string errorMsg { get; set; } } /// <summary> /// 发起点名成功反馈 /// </summary> [MyValidate] public class RollCallMsgFeedbackData { /// <summary> /// 点名ID /// </summary> [Required] public string id { get; set; } /// <summary> /// 发起点名是否成功 /// </summary> [Required] public bool success { get; set; } /// <summary> /// 发起点名失败原因 /// </summary> public string errorMsg { get; set; } } #endregion }
C# Socket,没有人比我的代码更简单明了了