网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?
写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。
支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。
自己本机测试,5000个客户端并发发送消息正常,cpu压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。
短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。
有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。
通过这几天的踩坑,测试,得出结论:
1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)
2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致
3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG
以下是主要代码:
SocketServerHelper代码:
using Models; Newtonsoft.Json; System; System.Collections.Concurrent; System.Collections.Generic; System.Configuration; System.Linq; System.Net; System.Net.sockets; System.Runtime.InteropServices; System.Text; System.Threading; System.Threading.Tasks; namespace Utils { /// <summary> /// Socket服务端帮助类 </summary> public class SocketServerHelper { #region 变量 private int _serverPort; private Socket serverSocket; private ConcurrentDictionary<ClientSocket,string> clientSocketList = new ConcurrentDictionary<ClientSocket,1)">string>(); private ConcurrentDictionary<string,ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<(); int _CallbackTimeout = 20; <summary> 等待回调超时时间(单位:秒) </summary> CallbackTimeout { get { return _CallbackTimeout; } set { value = _CallbackTimeout; } } int _WaitResultTimeout = 等待返回结果超时时间(单位:秒) WaitResultTimeout { _WaitResultTimeout; } _WaitResultTimeout; } } object _lockSend = new objectevent EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent; System.Timers.Timer _checkClientTimer; #endregion #region SocketServerHelper 构造函数 public SocketServerHelper( serverPort) { _serverPort = serverPort; } #region 启动服务 启动服务 bool StartServer() { try { IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any,_serverPort); serverSocket = Socket(ipEndPoint.AddressFamily,SocketType.Stream,ProtocolType.Tcp); serverSocket.Bind(ipEndPoint); serverSocket.Listen(5000); Thread thread = new Thread(new ThreadStart(delegate () { while (true) { Socket client = null; ClientSocket clientSocket = ; { client = serverSocket.Accept(); client.SendTimeout = 20000; client.ReceiveTimeout = ; client.SendBufferSize = 10240; client.ReceiveBufferSize = ; clientSocket = ClientSocket(client); clientSocketList.TryAdd(clientSocket,); LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count); } catch (Exception ex) { LogUtil.Error(ex); Thread.Sleep(1); continue; } if (client == null) { byte[] buffer = byte[]; socketasynceventargs args = socketasynceventargs(); clientSocket.socketAsyncArgs = args; clientSocket.socketAsyncCompleted = (s,e) => { ReceiveData(clientSocket,e); }; args.SetBuffer(buffer,0,buffer.Length); args.Completed += clientSocket.socketAsyncCompleted; client.ReceiveAsync(args); } (Exception ex) { LogUtil.Error(ex); } } })); thread.IsBackground = ; thread.Start(); //检测客户端 _checkClientTimer = System.Timers.Timer(); _checkClientTimer.AutoReset = false; _checkClientTimer.Interval = 1000; _checkClientTimer.Elapsed += CheckClient; _checkClientTimer.Start(); LogUtil.Log(服务已启动"); return ; } (Exception ex) { LogUtil.Error(ex,1)">启动服务出错; } } #region 检测客户端 检测客户端 void CheckClient( sender,System.Timers.ElapsedEventArgs e) { { foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray()) { Socket skt = clientSkt.socket; ClientSocket temp; string strTemp; DateTime Now = DateTime.Now; if (Now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60) { clientSocketList.TryRemove(clientSkt,1)">out strTemp); LogUtil.Log(客户端已失去连接,当前客户端数: clientSocketList.Count); ActionUtil.TryDoAction(() => { if (skt.Connected) skt.disconnect(); }); ActionUtil.TryDoAction(() => { skt.Close(); skt.dispose(); if (clientSkt.socketAsyncArgs != ) { if (clientSkt.socketAsyncCompleted != ) { clientSkt.socketAsyncArgs.Completed -= clientSkt.socketAsyncCompleted; } clientSkt.socketAsyncArgs.dispose(); } clientSkt.socketAsyncCompleted = ; clientSkt.socketAsyncArgs = ; }); } } } 检测客户端出错); } finally { _checkClientTimer.Start(); } } #region 接收数据 处理接收的数据包 void ReceiveData(ClientSocket clientSkt,socketasynceventargs e) { if (clientSkt == ; Socket skt = clientSkt.socket; { copyTo(e.Buffer,clientSkt.Buffer,e.BytesTransferred); #region 校验数据 if (clientSkt.Buffer.Count < 4) { if (skt.Connected) { if (!skt.ReceiveAsync(e)) ReceiveData(clientSkt,e); } ; } else { byte[] bArrHeader = ]; copyTo(clientSkt.Buffer,bArrHeader,1)">0,string strHeader = Encoding.ASCII.GetString(bArrHeader); if (strHeader.toupper() == 0XFF) { 5) { (skt.Connected) { ; } byte[] bArrType = ]; copyTo(clientSkt.Buffer,bArrType,1)">4,bArrType.Length); if (bArrType[0] == 0) { } 心跳包 else 2 || bArrType[4) 注册包、返回值包 { 9) { (skt.Connected) { ; } { byte[] bArrLength = ]; copyTo(clientSkt.Buffer,bArrLength,1)">5,bArrLength.Length); int dataLength = BitConverter.ToInt32(bArrLength,1)">); if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + ) { (skt.Connected) { ; } } } { LogUtil.Error(string.Format(type错误,丢掉错误数据,重新接收,roomNo={0},devNo={1}把错误的数据丢掉 (skt.Connected) { ; } } } { LogUtil.Error(不是0XFF,丢掉错误数据,重新接收,roomNo={0},devNo={1}把错误的数据丢掉 (skt.Connected) { ; } } #endregion SocketData data = ; do { data = ProcessSocketData(clientSkt); } while (data != ); (skt.Connected) { 处理接收的数据包 异常); } } 字节数组转字符串 string ByteArrToString(List<byte> byteList) { List<string> list = new List<(); foreach (byte b byteList) { list.Add(b.ToString(X2)); } string.Join(" #region 处理接收的数据包 SocketData ProcessSocketData(ClientSocket clientSkt) { int readLength = ; SocketData data = ResolveBuffer(clientSkt.Buffer,1)"> readLength); if (data != ) { if (readLength > ) clientSkt.RemoveBufferData(readLength); if (data.Type == 0) 收到心跳包 { clientSkt.LastHeartbeat = DateTime.Now; 心跳应答 if (clientSkt.RoomNo != null || clientSkt.DevNo != lock (clientSkt.LockSend) { byte[] bArrHeader = Encoding.ASCII.GetBytes(); SocketHelper.Send(clientSkt.socket,bArrHeader); SocketHelper.Send(clientSkt.socket,1)">byte[] { 0x01 }); } } { LogUtil.Log(没有注册信息); } LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } 2) 收到注册包 { if (data.socketRegisterData != null && clientSkt != ) { ClientSocket temp; if (data.socketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.socketRegisterData.RoomNo,1)"> temp); if (data.socketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.socketRegisterData.DevNo,1)"> temp); clientSkt.RoomNo = data.socketRegisterData.RoomNo; clientSkt.DevNo = data.socketRegisterData.DevNo; ) _dictRoomNoClientSocket.TryAdd(data.socketRegisterData.RoomNo,clientSkt); ) _dictDevNoClientSocket.TryAdd(data.socketRegisterData.DevNo,clientSkt); LogUtil.Log(收到注册包,roomNo=" + clientSkt.RoomNo + ,devNo= clientSkt.DevNo); 注册反馈 0x05 }); } } } 收到返回值包 { ThreadHelper.Run(() => { if (data.socketResult != ) clientSkt.CallbackDict.TryAdd(data.socketResult.callbackId,data.socketResult); if (ReceivedSocketResultEvent != ) { ReceivedSocketResultEvent(null,1)"> Models.ReceivedSocketResultEventArgs(data.socketResult)); } }); LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo); } } data; } #region ResolveBuffer 解析字节数组 private SocketData ResolveBuffer(List<byte> buffer,1)">out readLength) { SocketData socketData = ; readLength = ; if (buffer.Count < ]; copyTo(buffer,bArrHeader.Length); readLength += bArrHeader.Length; Encoding.ASCII.GetString(bArrHeader); 5) ; ]; copyTo(buffer,bArrType.Length); readLength += bArrType.Length; byte bType = bArrType[]; socketData = SocketData(); socketData.Type = bType; if (socketData.Type == 29) ; ]; copyTo(buffer,bArrLength.Length); readLength += bArrLength.Length; ); 0 || buffer.Count < dataLength + byte[] dataBody = byte[dataLength]; copyTo(buffer,dataBody,1)">9,dataBody.Length); readLength += dataBody.Length; string jsonString = Encoding.UTF8.GetString(dataBody); socketData.socketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString); } Encoding.UTF8.GetString(dataBody); socketData.socketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString); } } { LogUtil.Error(不是0XFF); ; } } 解析字节数组 出错; } socketData; } #region copyTo 数组复制 void copyTo(byte[] bArrSource,List<byte> listTarget,1)">int sourceIndex,1)"> length) { for (int i = 0; i < length; i++if (sourceIndex + i < bArrSource.Length) { listTarget.Add(bArrSource[sourceIndex + i]); } } } void copyTo(List<byte> listSource,1)">byte[] bArrTarget,1)">int targetIndex,1)">if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count) { bArrTarget[targetIndex + i] = listSource[sourceIndex + i]; } } } #region 停止服务 停止服务 StopServer() { foreach (ClientSocket clientSocket clientSocketList.Keys.ToArray()) { Socket socket = clientSocket.socket; ActionUtil.TryDoAction(() => { if (socket.Connected) socket.disconnect(); }); ActionUtil.TryDoAction(() => { socket.Close(); socket.dispose(); }); } clientSocketList.Clear(); _dictDevNoClientSocket.Clear(); _dictRoomNoClientSocket.Clear(); if (serverSocket != ) { ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.disconnect( { serverSocket.Close(); serverSocket.dispose(); }); } LogUtil.Log(服务已停止停止服务出错); } } #region 释放资源 释放资源 dispose() { if (_checkClientTimer != ) { _checkClientTimer.Stop(); _checkClientTimer.Close(); } } #region Send Send 单个发送 并等待结果 </summary> <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns> public SocketResult Send(WebApimsgContent msgContent,1)">string roomNo,1)"> devNo) { SocketData data = SocketData(); data.Type = 3; data.MsgContent = msgContent; ClientSocket clientSocket = ; if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo,1)"> clientSocket); if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo,1)"> clientSocket); if (clientSocket != if (.IsNullOrWhiteSpace(msgContent.callbackId)) { msgContent.callbackId = Guid.NewGuid().ToString(N); } Send(clientSocket,data); WaitSocketResult(clientSocket,msgContent.callbackId); } { SocketResult socketResult = SocketResult(); socketResult.success = ; socketResult.errorMsg = 客户端不存在 socketResult; } } Send 单个发送 void Send(WebApimsgContent msgContent,1)">string devNo,Action<SocketResult> callback = ) { SocketData data = ); } if (callback != ) { WaitCallback(clientSocket,msgContent.callbackId,callback); } Send(clientSocket,data); } ) callback(socketResult); } } 等待回调 void WaitCallback(ClientSocket clientSocket,1)">string callbackId,1)">) { DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout); System.Timers.Timer timer = System.Timers.Timer(); timer.AutoReset = ; timer.Interval = 100; timer.Elapsed += (s,1)"> { SocketResult socketResult; if (!clientSocket.CallbackDict.TryGetValue(callbackId,1)">out socketResult) && DateTime.Now < dt) { timer.Start(); ; } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId,1)"> sktResult); if (socketResult == ) { socketResult = SocketResult(); socketResult.success = ; socketResult.errorMsg = 超时; } ) callback(socketResult); timer.Close(); } (Exception ex) { LogUtil.Error(WaitCallback error ex); } }; timer.Start(); } 等待SocketResult private SocketResult WaitSocketResult(ClientSocket clientSocket,1)"> callbackId) { SocketResult socketResult; DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout); while (!clientSocket.CallbackDict.TryGetValue(callbackId,1)"> dt) { Thread.Sleep(10); } SocketResult sktResult; clientSocket.CallbackDict.TryRemove(callbackId,1)"> sktResult); ) { socketResult = socketResult; } Send false:发送失败 true:发送成功,但不表示对方已收到 Send(ClientSocket clientSocket,SocketData data) { bool bl = ; Socket socket = clientSocket.socket; (clientSocket.LockSend) { "); 发送header bl = SocketHelper.Send(socket,bArrHeader); if (bl) bl = SocketHelper.Send(socket,1)">0x01 }); 发送type 0x03 }); 发送type if (data.MsgContent != byte[] bArrData = if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent)); 发送length 发送body } } } } } }
SocketClientHelper代码:
Socket客户端帮助类 SocketClientHelper { _serverIP; Socket clientSocket; socketasynceventargs _socketAsyncArgs; public EventHandler<socketasynceventargs> _socketAsyncCompleted { get; set; } System.Timers.Timer heartbeatTimer; event EventHandler<SocketReceivedEventArgs> SocketReceivedEvent; System.Timers.Timer _checkServerTimer; DateTime _lastHeartbeat; private List<byte> _buffer = _roomNo; _devNo; bool _registerSuccess = ; RoomNo { _roomNo; } } DevNo { _devNo; } } 删除接收到的一个包 void RemoveBufferData( count) { 0; i < count; i++if (_buffer.Count > ) { _buffer.RemoveAt(); } } } #region SocketClientHelper 构造函数 public SocketClientHelper(string serverIP,1)"> serverPort) { _serverIP = serverIP; _serverPort =#region 连接服务器 连接服务器 ConnectServer() { if (clientSocket == null || !clientSocket.Connected) { ) { clientSocket.Close(); clientSocket.dispose(); } string ip = ConfigurationManager.AppSettings[ServerIP]; string hostName = ConfigurationManager.AppSettings[HostNameint port = Convert.ToInt32(ConfigurationManager.AppSettings[ServerPort]); IPEndPoint ipep = if (hostName != ) { IPHostEntry host = Dns.GetHostEntry(hostName); IPAddress ipAddr = host.AddressList[]; ipep = IPEndPoint(ipAddr,port); } { ipep = IPEndPoint(IPAddress.Parse(ip),port); } clientSocket = Socket(ipep.AddressFamily,ProtocolType.Tcp); clientSocket.SendTimeout = ; clientSocket.ReceiveTimeout = ; clientSocket.SendBufferSize = ; clientSocket.ReceiveBufferSize = ; { clientSocket.Connect(ipep); } (Exception ex) { LogUtil.Error(ex); null || !clientSocket.Connected) ; _lastHeartbeat =]; _socketAsyncArgs = socketasynceventargs(); _socketAsyncArgs.SetBuffer(buffer,buffer.Length); _socketAsyncCompleted = (s,1)"> { ReceiveData(clientSocket,e); }; _socketAsyncArgs.Completed += _socketAsyncCompleted; clientSocket.ReceiveAsync(_socketAsyncArgs); } (Exception ex) { LogUtil.Error(ex); } 检测服务端 _checkServerTimer = System.Timers.Timer(); _checkServerTimer.AutoReset = ; _checkServerTimer.Interval = ; _checkServerTimer.Elapsed += CheckServer; _checkServerTimer.Start(); LogUtil.Log(已连接服务器连接服务器失败#region 检测服务端 检测服务端 void CheckServer( { DateTime Now = DateTime.Now; if (Now.Subtract(_lastHeartbeat).TotalSeconds > ) { LogUtil.Log(服务端已失去连接if (clientSocket.Connected) clientSocket.disconnect(); clientSocket.Close(); clientSocket.dispose(); _socketAsyncArgs.Completed -= _socketAsyncCompleted; _socketAsyncCompleted = ; _socketAsyncArgs.dispose(); _socketAsyncArgs = ; } (Exception ex) { LogUtil.Error(ex); } Thread.Sleep(3000int tryCount = while (!ConnectServer() && tryCount++ < 10000) 重连 { Thread.Sleep(); } RegisterToServer(_roomNo,_devNo); 重新注册 检测服务端出错 { _checkServerTimer.Start(); } } #region 断开服务器 断开服务器 disconnectServer() { ); clientSocket.Close(); clientSocket.dispose(); } LogUtil.Log(已断开服务器断开服务器失败#region 释放资源 if (heartbeatTimer != ) { heartbeatTimer.Stop(); heartbeatTimer.Close(); } if (_checkServerTimer != ) { _checkServerTimer.Stop(); _checkServerTimer.Close(); } } #region 心跳 StartHeartbeat() { heartbeatTimer = System.Timers.Timer(); heartbeatTimer.AutoReset = ; heartbeatTimer.Interval = 10000; heartbeatTimer.Elapsed += new System.Timers.ElapsedEventHandler((obj,eea) => (_lockSend) { ); SocketHelper.Send(clientSocket,bArrHeader); SocketHelper.Send(clientSocket,1)">0x00 }); } (Exception ex) { LogUtil.Error(向服务器发送心跳包出错: ex.Message); } { heartbeatTimer.Start(); } } }); heartbeatTimer.Start(); } #region 停止心跳 StopHeartbeat() { heartbeatTimer.Stop(); } #region 注册 注册 bool RegisterToServer( devNo) { _registerSuccess = ; SocketData data = ; data.socketRegisterData = SocketRegisterData(); data.socketRegisterData.RoomNo = roomNo; data.socketRegisterData.DevNo = devNo; _roomNo = roomNo; _devNo = devNo; Send(data); DateTime dt = DateTime.Now; while (!_registerSuccess && DateTime.Now.Subtract(dt).TotalMilliseconds < ) { Thread.Sleep( _registerSuccess; } ReceiveData(Socket socket,_buffer,1)">if (_buffer.Count < (socket.Connected) { socket.ReceiveAsync(e)) ReceiveData(socket,1)">]; copyTo(_buffer,1)"> (socket.Connected) { ]; copyTo(_buffer,1)">1 || bArrType[5) { } 心跳应答包、注册反馈包 3) 消息包 (socket.Connected) { ]; copyTo(_buffer,1)">0 || _buffer.Count < dataLength + (socket.Connected) { { LogUtil.Error(type错误,丢掉错误数据,重新接收); _buffer.Clear(); (socket.Connected) { { LogUtil.Error(不是0XFF,丢掉错误数据,重新接收); _buffer.Clear(); (socket.Connected) { ProcessSocketData(socket); } (socket.Connected) { SocketData ProcessSocketData(Socket socket) { ; SocketData data = ResolveBuffer(_buffer,1)">) RemoveBufferData(readLength); 1) 心跳应答 { _lastHeartbeat =LogUtil.Log("收到心跳应答包,服务端正常"); 消息数据 if (SocketReceivedEvent != ) { SocketReceivedEventArgs args = SocketReceivedEventArgs(data.MsgContent); args.Callback = CallbackSocket(socket); ThreadHelper.Run((obj) => { SocketReceivedEvent(this,obj as SocketReceivedEventArgs); } (Exception ex) { LogUtil.Error(ex); } },args); } } 注册反馈 { _registerSuccess = ; LogUtil.Log(收到注册反馈包,注册成功); } } Encoding.UTF8.GetString(dataBody); socketData.MsgContent = JsonConvert.DeserializeObject<WebApimsgContent>; } } Send(SocketData data) { Send(clientSocket,data); } Send(Socket socket,1)"> (_lockSend) { 发送header bool bl =0x00 }); 0x02 }); Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.socketRegisterData)); } } 0x04 }); Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.socketResult)); } }
SocketHelper代码(里面同步接收的方法Receive和ReceiveByte没有用到):
Socket封装 static SocketHelper { bool Send(Socket socket,1)">[] data) { if (socket == null || !socket.Connected) ; int sendTotal = while (sendTotal < data.Length) { int sendLength = data.Length - sendTotal; if (sendLength > 1024) sendLength = 1024int sendOnce = socket.Send(data,sendTotal,sendLength,SocketFlags.None); sendTotal += sendOnce; } (Exception ex) { LogUtil.Error(ex); #region Receive Receive byte[] Receive(Socket socket,1)">[length]; int receiveCount = while ((receiveCount = socket.Receive(buffer,length,SocketFlags.None)) == ) { Thread.Sleep(); } while (receiveCount < length) { int revCount = socket.Receive(buffer,receiveCount,buffer.Length - receiveCount,SocketFlags.None); receiveCount += revCount; } buffer; } (Exception ex) { ; } } byte? ReceiveByte(Socket socket) { ]; 1,1)">return buffer[]; } #region IsZero IsZero bool IsZero( data) { if (b != ; } } LogUtil.Error(接收的字节数组内容全是0); bl; } } }
代码中接收消息是异步接收,提高性能,发送消息是同步发送,主要是为了和Android端对接方便,Android端按我这种方式发就可以了。
由于网络、客户端可能不在线等原因,消息不一定能送达,所以为了保证消息送达,需要使用数据库,将发送失败的消息存入数据库,定时重发,发送成功或者超时2天则删除失败记录,下面是自己画的时序图,可能画的不太专业:
业务相关代码:
MsgUtil代码:
PrisonWebApi.Controllers.Common; PrisonWebApi.DAL; System.ComponentModel.DataAnnotations; System.Timers; System.Web; Web API 消息工具类 MsgUtil { static WebApimsgDal m_WebApimsgDal = ; static System.Timers.Timer _timer; SocketServerHelper _socketServerHelper; #region Init 初始化 初始化 Init() { ThreadHelper.Run(() => { m_WebApimsgDal = ServiceHelper.Get<WebApimsgDal>(); int port = int.Parse(ConfigurationManager.AppSettings[SocketServerPort]); _socketServerHelper = SocketServerHelper(port); _socketServerHelper.ReceivedSocketResultEvent += _socketServerHelper_ReceivedSocketResultEvent; _socketServerHelper.StartServer(); _timer = System.Timers.Timer(); _timer.AutoReset = ; _timer.Interval = 40000; 注意,这个参数必须比Socket等待回调超时时间CallbackTimeout大 _timer.Elapsed += MsgTask; _timer.Start(); LogUtil.Log(Web API 消息工具类 初始化成功); },(ex) => { LogUtil.Error(Web API 消息工具类 初始化失败); }); } #region 定时任务 定时任务 void MsgTask( { m_WebApimsgDal.DeleteTimeoutMsg(); 删除超时的消息 List<WEBAPI_MSG> list = m_WebApimsgDal.GetMsgList(); foreach (WEBAPI_MSG msg list) { WebApimsgContent msgContent = JsonConvert.DeserializeObject<WebApimsgContent>(msg.MSGCONTENT); msgContent.callbackId = msg.ID; Send(msgContent,msg.RECEIVER,1)">); } if (list.Count > ) { LogUtil.Log(已重发" + list.Count.ToString() + 条消息); } } (Exception ex) { LogUtil.Error(ex); } { _timer.Start(); } }); } 接收数据 void _socketServerHelper_ReceivedSocketResultEvent(bool> func = (callbackId) =>if (m_WebApimsgDal.Exists(()callbackId)) { m_WebApimsgDal.DeleteById(()callbackId); } } (Exception ex) { LogUtil.Error(ex,1)">删除消息出错; }; if (e.socketResult != while (!func(e.socketResult.callbackId) && tryCount++ < #region Send 发送消息 Send 发送消息 ) { _socketServerHelper.Send(msgContent,roomNo,devNo,callback); } static SocketResult Send(WebApimsgContent msgContent,1)"> devNo) { _socketServerHelper.Send(msgContent,devNo); } 发送消息失败 dispose() { ThreadHelper.Run(() => { _timer.Stop(); _timer.Elapsed -= MsgTask; _timer.Close(); _timer.dispose(); _timer = ; _socketServerHelper.StopServer(); _socketServerHelper.ReceivedSocketResultEvent -= _socketServerHelper_ReceivedSocketResultEvent; LogUtil.Log(Web API 消息工具类 释放资源成功Web API 消息工具类 释放资源失败 } }
Web API 接口 MsgController 代码:
DBUtil; Swashbuckle.Swagger.Annotations; System.Globalization; System.Net.Http; System.Web; System.Web.Http; Utils; PrisonWebApi.Controllers.Common { Web API 消息 </summary> [RoutePrefix(api/msg)] MsgController : ApiController { #region 变量属性 private WebApimsgDal m_WebApimsgDal = ServiceHelper.Get<WebApimsgDal>private TwoCJsDal m_TwoCJsDal = ServiceHelper.Get<TwoCJsDal>private BackstageAppInstallDal m_BackstageAppInstallDal = ServiceHelper.Get<BackstageAppInstallDal>private RollCallDal m_RollCallDal = ServiceHelper.Get<RollCallDal>private RollCallConfirmDal m_RollCallConfirmDal = ServiceHelper.Get<RollCallConfirmDal>#region 发送消息 发送消息 <param name="data">POST数据</param> [HttpPost] [Route(SendMsg)] [SwaggerResponse(HttpStatusCode.OK,1)">返回JSON",1)">typeof(JsonResult<SendMsgData>))] public HttpResponseMessage SendMsg([FromBody] SendMsgData data) { JsonResult jsonResult = if (data == null || data.msgContent == ) { jsonResult = new JsonResult(请检查参数格式 ApiHelper.ToJson(jsonResult); } if (data.roomNo != null && data.devNos != 监室号和设备编码(指仓内屏或仓外屏的设备编码)不能都有值,应填写其中一个,或者都不填写string.IsNullOrWhiteSpace(data.msgContent.msgTime)) data.msgContent.msgTime = DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss); .IsNullOrWhiteSpace(data.devNos)) { string devNo in data.devNos.Split('')) { data.msgContent.callbackId = Guid.NewGuid().ToString(); MsgUtil.Send(data.msgContent,(socketResult) =>socketResult.success) { WEBAPI_MSG info = WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString(); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime,CultureInfo.InvariantCulture); info.RECEIVER = devNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApimsgDal.Insert(info); } }); } } 消息发送失败); jsonResult = ApiHelper.ToJson(jsonResult); } } .IsNullOrWhiteSpace(data.roomNo)) { { data.msgContent.callbackId = Guid.NewGuid().ToString( data.roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApimsgDal.Insert(info); } }); } (Exception ex) { LogUtil.Error(ex,1)">); jsonResult = ApiHelper.ToJson(jsonResult); } } { List<string> roomNoList = m_TwoCJsDal.GetRoomNoListAll(); string roomNo roomNoList) { data.msgContent.callbackId = Guid.NewGuid().ToString(); MsgUtil.Send(data.msgContent,1)"> { socketResult.success) { WEBAPI_MSG info = WEBAPI_MSG(); info.ID = Guid.NewGuid().ToString(); info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime,CultureInfo.InvariantCulture); info.RECEIVER = roomNo; info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent); m_WebApimsgDal.Insert(info); } }); } } ApiHelper.ToJson(jsonResult); } } } jsonResult = new JsonResult<CommonSubmitResult>( CommonSubmitResult() { msg = 消息发送成功 }); ApiHelper.ToJson(jsonResult); } #region APP安装消息反馈 APP安装消息反馈 InstallMsgFeedbacktypeof(JsonResult<CommonSubmitResult> HttpResponseMessage InstallMsgFeedback([FromBody] InstallMsgFeedbackData data) { JsonResult jsonResult = ApiHelper.ToJson(jsonResult); } BACKSTAGE_APP_INSTALL info = m_BackstageAppInstallDal.Get(data.id); if (info != (data.success) { info.STATUS = 1; m_BackstageAppInstallDal.Update(info); } jsonResult = CommonSubmitResult() { msg = 反馈成功 info.ID }); } { jsonResult = 反馈失败:安装记录不存在#region 发起点名成功反馈 发起点名成功反馈 RollCallMsgFeedback HttpResponseMessage RollCallMsgFeedback([FromBody] RollCallMsgFeedbackData data) { JsonResult jsonResult = ApiHelper.ToJson(jsonResult); } ROLL_CALL info = m_RollCallDal.Get(data.id); 2; info.UPDATE_TIME = DateTime.Now; m_RollCallDal.Update(info); } { info.STATUS = 3; info.ERROR_MSG = data.errorMsg; info.UPDATE_TIME = DateTime.Now; m_RollCallDal.Update(info); } jsonResult = 反馈失败:点名记录不存在#region 点名确认 点名确认 RollCallConfirm HttpResponseMessage RollCallConfirm([FromBody] RollCallConfirmData data) { JsonResult jsonResult = ApiHelper.ToJson(jsonResult); } ROLL_CALL_CONFIRM info = m_RollCallConfirmDal.Get(data.rollCallId,data.prisonerId); if (info == null) info = ROLL_CALL_CONFIRM(); info.ROLL_CALL_ID = data.rollCallId; info.PRISONERID = data.prisonerId; info.CONFIRM_TIME = DateTime.Now; info.UPDATE_TIME = DateTime.Now; info.STATUS = ; m_RollCallConfirmDal.InsertOrUpdate(info); jsonResult = 点名确认成功 info.ID }); } #region SendMsgData 发送消息数据 发送消息数据 </summary> [MyValidate] SendMsgData { 消息内容 [required] public WebApimsgContent msgContent { ; } 监室号(如果为空,并且devNos也为空,则发送到所有监室;如果为空,并且devNos不为空,则按devNos发送) string roomNo { 设备编码(逗号隔开)(仓内屏或仓外屏的设备编码) string devNos { ; } } APP安装消息反馈 InstallMsgFeedbackData { 安装记录ID string id { 安装是否成功 bool success { 安装失败原因 string errorMsg { 发起点名成功反馈 RollCallMsgFeedbackData { 点名ID 发起点名是否成功 发起点名失败原因 点名确认数据 RollCallConfirmData { string rollCallId { 在押人员ID string prisonerId { ; } } }
有个严重BUG说明一下,博客就不改了:
客户端或服务端离线的时候,服务端或客户端接收到长度为0的数据,造成死循环,需要加个判断(完整代码参见 https://gitee.com/s0611163/SocketHelper ):
if (e.BytesTransferred == ) { ReleaseClientSocket(clientSkt,skt); ; }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。