微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

C# .NET Socket SocketHelper 高性能 5000客户端 异步接收数据

    网上有很多Socket框架,但是我想,C#既然有Socket类,难道不是给人用的吗?

    写了一个SocketServerHelper和SocketClientHelper,分别只有5、6百行代码,比不上大神写的,和业务代码耦合也比较重,但对新手非常友好,容易看懂。

    支持返回值或回调,支持不定长度的数据包。客户端和服务端均支持断线重连。

    自己本机测试,5000个客户端并发发送消息正常,cpu压力有点大。由于局域网机子性能差,局域网只测试了500个客户端并发发送消息正常。

    短短1000多行代码,花了好多天心血,改了无数BUG,越写代码,越觉得自己资质平平,逻辑思维不够用。写Socket代码不像写一般的代码,实在不行加个try catch完事,这个东西既要稳定,又要性能,真的是每一个逻辑分支,每一个异常分支,都要想清楚,都要处理好,代码里我还是Exception用习惯了,没细分。

    有时候为了解决一个BUG,找了一整天,也找不出BUG在哪,现在终于测试难过了,达到了自己的预想。

    通过这几天的踩坑,测试,得出结论:

    1、Socket TCP 不会丢包,TCP是可靠的。(本机测试、局域网测试,可能没有遇到更恶劣的网络环境)

    2、Socket TCP 能够保证顺序,接收到的顺序和发送的顺序一致

    3、代码里有数据校验,但是错误的分支永远都不会走,校验是一定能通过的,不存在数据校验不通过,把错误的数据包简单丢弃的情况,否则说明代码写的还是有BUG

    以下是主要代码

    SocketServerHelper代码

using Models;
 Newtonsoft.Json;
 System;
 System.Collections.Concurrent;
 System.Collections.Generic;
 System.Configuration;
 System.Linq;
 System.Net;
 System.Net.sockets;
 System.Runtime.InteropServices;
 System.Text;
 System.Threading;
 System.Threading.Tasks;

namespace Utils
{
    /// <summary>
    /// Socket服务端帮助类
    </summary>
    public class SocketServerHelper
    {
        #region 变量
        private int _serverPort;
        private Socket serverSocket;
        private ConcurrentDictionary<ClientSocket,string> clientSocketList = new ConcurrentDictionary<ClientSocket,1)">string>();
        private ConcurrentDictionary<string,ClientSocket> _dictRoomNoClientSocket = new ConcurrentDictionary<();

        int _CallbackTimeout = 20;
        <summary>
         等待回调超时时间(单位:秒)
        </summary>
         CallbackTimeout
        {
            get { return _CallbackTimeout; }
            set { value = _CallbackTimeout; }
        }

        int _WaitResultTimeout =  等待返回结果超时时间(单位:秒)
         WaitResultTimeout
        {
             _WaitResultTimeout; }
             _WaitResultTimeout; }
        }

        object _lockSend = new objectevent EventHandler<ReceivedSocketResultEventArgs> ReceivedSocketResultEvent;

         System.Timers.Timer _checkClientTimer;
        #endregion

        #region SocketServerHelper 构造函数
        public SocketServerHelper( serverPort)
        {
            _serverPort = serverPort;
        }
        #region 启动服务
         启动服务
        bool StartServer()
        {
            try
            {
                IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any,_serverPort);
                serverSocket =  Socket(ipEndPoint.AddressFamily,SocketType.Stream,ProtocolType.Tcp);
                serverSocket.Bind(ipEndPoint);
                serverSocket.Listen(5000);
                Thread thread = new Thread(new ThreadStart(delegate ()
                {
                    while (true)
                    {
                        Socket client = null;
                        ClientSocket clientSocket = ;

                        
                        {
                            client = serverSocket.Accept();
                            client.SendTimeout = 20000;
                            client.ReceiveTimeout = ;
                            client.SendBufferSize = 10240;
                            client.ReceiveBufferSize = ;
                            clientSocket =  ClientSocket(client);
                            clientSocketList.TryAdd(clientSocket,);
                            LogUtil.Log("监听到新的客户端,当前客户端数:" + clientSocketList.Count);
                        }
                        catch (Exception ex)
                        {
                            LogUtil.Error(ex);
                            Thread.Sleep(1);
                            continue;
                        }

                        if (client == null) 
                        {
                            byte[] buffer = byte[];
                            socketasynceventargs args =  socketasynceventargs();
                            clientSocket.socketAsyncArgs = args;
                            clientSocket.socketAsyncCompleted = (s,e) =>
                            {
                                ReceiveData(clientSocket,e);
                            };
                            args.SetBuffer(buffer,0,buffer.Length);
                            args.Completed += clientSocket.socketAsyncCompleted;
                            client.ReceiveAsync(args);
                        }
                         (Exception ex)
                        {
                            LogUtil.Error(ex);
                        }
                    }
                }));
                thread.IsBackground = ;
                thread.Start();

                //检测客户端
                _checkClientTimer =  System.Timers.Timer();
                _checkClientTimer.AutoReset = false;
                _checkClientTimer.Interval = 1000;
                _checkClientTimer.Elapsed += CheckClient;
                _checkClientTimer.Start();

                LogUtil.Log(服务已启动");
                return ;
            }
             (Exception ex)
            {
                LogUtil.Error(ex,1)">启动服务出错;
            }
        }
        #region 检测客户端
         检测客户端
        void CheckClient( sender,System.Timers.ElapsedEventArgs e)
        {
            
            {
                foreach (ClientSocket clientSkt in clientSocketList.Keys.ToArray())
                {
                    Socket skt = clientSkt.socket;
                    ClientSocket temp;
                    string strTemp;

                    DateTime Now = DateTime.Now;
                    if (Now.Subtract(clientSkt.LastHeartbeat).TotalSeconds > 60)
                    {
                        clientSocketList.TryRemove(clientSkt,1)">out strTemp);
                        LogUtil.Log(客户端已失去连接,当前客户端数: clientSocketList.Count);
                        ActionUtil.TryDoAction(() => { if (skt.Connected) skt.disconnect(); });
                        ActionUtil.TryDoAction(() =>
                        {
                            skt.Close();
                            skt.dispose();
                            if (clientSkt.socketAsyncArgs != )
                            {
                                if (clientSkt.socketAsyncCompleted != )
                                {
                                    clientSkt.socketAsyncArgs.Completed -= clientSkt.socketAsyncCompleted;
                                }
                                clientSkt.socketAsyncArgs.dispose();
                            }
                            clientSkt.socketAsyncCompleted = ;
                            clientSkt.socketAsyncArgs = ;
                        });
                    }
                }

            }
            检测客户端出错);
            }
            finally
            {
                _checkClientTimer.Start();
            }
        }
        #region 接收数据
         处理接收的数据包
        void ReceiveData(ClientSocket clientSkt,socketasynceventargs e)
        {
            if (clientSkt == ;
            Socket skt = clientSkt.socket;

            
            {
                copyTo(e.Buffer,clientSkt.Buffer,e.BytesTransferred);

                #region 校验数据
                if (clientSkt.Buffer.Count < 4)
                {
                    if (skt.Connected)
                    {
                        if (!skt.ReceiveAsync(e)) ReceiveData(clientSkt,e);
                    }
                    ;
                }
                else
                {
                    byte[] bArrHeader = ];
                    copyTo(clientSkt.Buffer,bArrHeader,1)">0,string strHeader = Encoding.ASCII.GetString(bArrHeader);
                    if (strHeader.toupper() == 0XFF)
                    {
                        5)
                        {
                             (skt.Connected)
                            {
                                ;
                        }
                        byte[] bArrType = ];
                            copyTo(clientSkt.Buffer,bArrType,1)">4,bArrType.Length);
                            if (bArrType[0] == 0) { } 心跳包
                            else 2 || bArrType[4) 注册包、返回值包
                            {
                                9)
                                {
                                     (skt.Connected)
                                    {
                                        ;
                                }
                                
                                {
                                    byte[] bArrLength = ];
                                    copyTo(clientSkt.Buffer,bArrLength,1)">5,bArrLength.Length);
                                    int dataLength = BitConverter.ToInt32(bArrLength,1)">);
                                    if (dataLength == 0 || clientSkt.Buffer.Count < dataLength + )
                                    {
                                         (skt.Connected)
                                        {
                                            ;
                                    }
                                }
                            }
                            
                            {
                                LogUtil.Error(string.Format(type错误,丢掉错误数据,重新接收,roomNo={0},devNo={1}错误的数据丢掉
                                 (skt.Connected)
                                {
                                    ;
                            }
                        }
                    }
                    
                    {
                        LogUtil.Error(不是0XFF,丢掉错误数据,重新接收,roomNo={0},devNo={1}错误的数据丢掉
                         (skt.Connected)
                        {
                            ;
                    }
                }
                #endregion

                SocketData data = ;
                do
                {
                    data = ProcessSocketData(clientSkt);
                } while (data != );

                 (skt.Connected)
                {
                    处理接收的数据包 异常);
            }
        }

         字节数组转字符串
        string ByteArrToString(List<byte> byteList)
        {
            List<string> list = new List<();

            foreach (byte b  byteList)
            {
                list.Add(b.ToString(X2));
            }

            string.Join("  #region 处理接收的数据包
         SocketData ProcessSocketData(ClientSocket clientSkt)
        {
            int readLength = ;
            SocketData data = ResolveBuffer(clientSkt.Buffer,1)"> readLength);
            if (data != )
            {
                if (readLength > ) clientSkt.RemoveBufferData(readLength);
                if (data.Type == 0) 收到心跳包
                {
                    clientSkt.LastHeartbeat = DateTime.Now;

                    心跳应答
                    if (clientSkt.RoomNo != null || clientSkt.DevNo != lock (clientSkt.LockSend)
                        {
                            byte[] bArrHeader = Encoding.ASCII.GetBytes();
                            SocketHelper.Send(clientSkt.socket,bArrHeader);
                            SocketHelper.Send(clientSkt.socket,1)">byte[] { 0x01 });
                        }
                    }
                    
                    {
                        LogUtil.Log(没有注册信息);
                    }

                    LogUtil.Log("收到心跳包,客户端连接正常,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }

                2) 收到注册
                {
                    if (data.socketRegisterData != null && clientSkt != )
                    {
                        ClientSocket temp;
                        if (data.socketRegisterData.RoomNo != null) _dictRoomNoClientSocket.TryRemove(data.socketRegisterData.RoomNo,1)"> temp);
                        if (data.socketRegisterData.DevNo != null) _dictDevNoClientSocket.TryRemove(data.socketRegisterData.DevNo,1)"> temp);
                        clientSkt.RoomNo = data.socketRegisterData.RoomNo;
                        clientSkt.DevNo = data.socketRegisterData.DevNo;
                        ) _dictRoomNoClientSocket.TryAdd(data.socketRegisterData.RoomNo,clientSkt);
                        ) _dictDevNoClientSocket.TryAdd(data.socketRegisterData.DevNo,clientSkt);
                        LogUtil.Log(收到注册包,roomNo=" + clientSkt.RoomNo + ,devNo= clientSkt.DevNo);

                        注册反馈
                        0x05 });
                        }
                    }
                }

                收到返回值包
                {
                    ThreadHelper.Run(() =>
                    {
                        if (data.socketResult != ) clientSkt.CallbackDict.TryAdd(data.socketResult.callbackId,data.socketResult);

                        if (ReceivedSocketResultEvent != )
                        {
                            ReceivedSocketResultEvent(null,1)"> Models.ReceivedSocketResultEventArgs(data.socketResult));
                        }
                    });

                    LogUtil.Log("收到返回值包,roomNo=" + clientSkt.RoomNo + ",devNo=" + clientSkt.DevNo);
                }
            }
             data;
        }
        #region ResolveBuffer
         解析字节数组
        private SocketData ResolveBuffer(List<byte> buffer,1)">out  readLength)
        {
            SocketData socketData = ;
            readLength = ;

            if (buffer.Count < ];
                copyTo(buffer,bArrHeader.Length);
                readLength += bArrHeader.Length;
                 Encoding.ASCII.GetString(bArrHeader);
                5) ;
                    ];
                    copyTo(buffer,bArrType.Length);
                    readLength += bArrType.Length;
                    byte bType = bArrType[];
                    socketData =  SocketData();
                    socketData.Type = bType;

                    if (socketData.Type == 29) ;
                        ];
                        copyTo(buffer,bArrLength.Length);
                        readLength += bArrLength.Length;
                        );

                        0 || buffer.Count < dataLength + byte[] dataBody = byte[dataLength];
                        copyTo(buffer,dataBody,1)">9,dataBody.Length);
                        readLength += dataBody.Length;
                        string jsonString = Encoding.UTF8.GetString(dataBody);
                        socketData.socketRegisterData = JsonConvert.DeserializeObject<SocketRegisterData>(jsonString);
                    }

                     Encoding.UTF8.GetString(dataBody);
                        socketData.socketResult = JsonConvert.DeserializeObject<SocketResult>(jsonString);
                    }
                }
                
                {
                    LogUtil.Error(不是0XFF);
                    ;
                }

            }
            解析字节数组 出错;
            }

             socketData;
        }
        #region copyTo
         数组复制
        void copyTo(byte[] bArrSource,List<byte> listTarget,1)">int sourceIndex,1)"> length)
        {
            for (int i = 0; i < length; i++if (sourceIndex + i < bArrSource.Length)
                {
                    listTarget.Add(bArrSource[sourceIndex + i]);
                }
            }
        }

        void copyTo(List<byte> listSource,1)">byte[] bArrTarget,1)">int targetIndex,1)">if (targetIndex + i < bArrTarget.Length && sourceIndex + i < listSource.Count)
                {
                    bArrTarget[targetIndex + i] = listSource[sourceIndex + i];
                }
            }
        }
        #region 停止服务
         停止服务
         StopServer()
        {
            foreach (ClientSocket clientSocket  clientSocketList.Keys.ToArray())
                {
                    Socket socket = clientSocket.socket;
                    ActionUtil.TryDoAction(() => { if (socket.Connected) socket.disconnect(); });
                    ActionUtil.TryDoAction(() =>
                    {
                        socket.Close();
                        socket.dispose();
                    });
                }
                clientSocketList.Clear();
                _dictDevNoClientSocket.Clear();
                _dictRoomNoClientSocket.Clear();
                if (serverSocket != )
                {
                    ActionUtil.TryDoAction(() => { if (serverSocket.Connected) serverSocket.disconnect(
                    {
                        serverSocket.Close();
                        serverSocket.dispose();
                    });
                }
                LogUtil.Log(服务已停止停止服务出错);
            }
        }
        #region 释放资源
         释放资源
         dispose()
        {
            if (_checkClientTimer != )
            {
                _checkClientTimer.Stop();
                _checkClientTimer.Close();
            }
        }
        #region Send
         Send 单个发送 并等待结果
        </summary>
        <returns>false:发送失败 true:发送成功,但接收端是否处理成功要等待返回结果</returns>
        public SocketResult Send(WebApimsgContent msgContent,1)">string roomNo,1)"> devNo)
        {
            SocketData data =  SocketData();
            data.Type = 3;
            data.MsgContent = msgContent;

            ClientSocket clientSocket = ;
            if (roomNo != null) _dictRoomNoClientSocket.TryGetValue(roomNo,1)"> clientSocket);
            if (devNo != null) _dictDevNoClientSocket.TryGetValue(devNo,1)"> clientSocket);

            if (clientSocket != if (.IsNullOrWhiteSpace(msgContent.callbackId))
                {
                    msgContent.callbackId = Guid.NewGuid().ToString(N);
                }

                Send(clientSocket,data);
                 WaitSocketResult(clientSocket,msgContent.callbackId);
            }
            
            {
                SocketResult socketResult =  SocketResult();
                socketResult.success = ;
                socketResult.errorMsg = 客户端不存在 socketResult;
            }
        }

         Send 单个发送
        void Send(WebApimsgContent msgContent,1)">string devNo,Action<SocketResult> callback = )
        {
            SocketData data = );
                }

                if (callback != )
                {
                    WaitCallback(clientSocket,msgContent.callbackId,callback);
                }

                Send(clientSocket,data);
            }
            ) callback(socketResult);
            }
        }

         等待回调
        void WaitCallback(ClientSocket clientSocket,1)">string callbackId,1)">)
        {
            DateTime dt = DateTime.Now.AddSeconds(_CallbackTimeout);
            System.Timers.Timer timer =  System.Timers.Timer();
            timer.AutoReset = ;
            timer.Interval = 100;
            timer.Elapsed += (s,1)">
                {
                    SocketResult socketResult;
                    if (!clientSocket.CallbackDict.TryGetValue(callbackId,1)">out socketResult) && DateTime.Now < dt)
                    {
                        timer.Start();
                        ;
                    }
                    SocketResult sktResult;
                    clientSocket.CallbackDict.TryRemove(callbackId,1)"> sktResult);
                    if (socketResult == )
                    {
                        socketResult =  SocketResult();
                        socketResult.success = ;
                        socketResult.errorMsg = 超时;
                    }

                    ) callback(socketResult);

                    timer.Close();
                }
                 (Exception ex)
                {
                    LogUtil.Error(WaitCallback error ex);
                }
            };
            timer.Start();
        }

         等待SocketResult
        private SocketResult WaitSocketResult(ClientSocket clientSocket,1)"> callbackId)
        {
            SocketResult socketResult;
            DateTime dt = DateTime.Now.AddSeconds(_WaitResultTimeout);
            while (!clientSocket.CallbackDict.TryGetValue(callbackId,1)"> dt)
            {
                Thread.Sleep(10);
            }
            SocketResult sktResult;
            clientSocket.CallbackDict.TryRemove(callbackId,1)"> sktResult);
            )
            {
                socketResult =  socketResult;
        }

         Send
        false:发送失败 true:发送成功,但不表示对方已收到 Send(ClientSocket clientSocket,SocketData data)
        {
            bool bl = ;
            Socket socket = clientSocket.socket;

             (clientSocket.LockSend)
            {
                "); 发送header
                bl = SocketHelper.Send(socket,bArrHeader);

                if (bl) bl = SocketHelper.Send(socket,1)">0x01 }); 发送type
0x03 }); 发送type

                    if (data.MsgContent != byte[] bArrData = if (bl) bArrData = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.MsgContent));
                        发送length
                        发送body
                    }
                }
            }
        }
        

    }

}
View Code

    SocketClientHelper代码

 Socket客户端帮助类
     SocketClientHelper
    {
         _serverIP;
         Socket clientSocket;
         socketasynceventargs _socketAsyncArgs;
        public EventHandler<socketasynceventargs> _socketAsyncCompleted { get; set; }
         System.Timers.Timer heartbeatTimer;
        event EventHandler<SocketReceivedEventArgs> SocketReceivedEvent;
         System.Timers.Timer _checkServerTimer;
         DateTime _lastHeartbeat;
        private List<byte> _buffer =  _roomNo;
         _devNo;
        bool _registerSuccess = ;

         RoomNo
        {
             _roomNo; }
        }

         DevNo
        {
             _devNo; }
        }

         删除接收到的一个void RemoveBufferData( count)
        {
            0; i < count; i++if (_buffer.Count > )
                {
                    _buffer.RemoveAt();
                }
            }
        }
        #region SocketClientHelper 构造函数
        public SocketClientHelper(string serverIP,1)"> serverPort)
        {
            _serverIP = serverIP;
            _serverPort =#region 连接服务器
         连接服务器
         ConnectServer()
        {
            if (clientSocket == null || !clientSocket.Connected)
                {
                    )
                    {
                        clientSocket.Close();
                        clientSocket.dispose();
                    }
                    string ip = ConfigurationManager.AppSettings[ServerIP];
                    string hostName = ConfigurationManager.AppSettings[HostNameint port = Convert.ToInt32(ConfigurationManager.AppSettings[ServerPort]);
                    IPEndPoint ipep = if (hostName != )
                    {
                        IPHostEntry host = Dns.GetHostEntry(hostName);
                        IPAddress ipAddr = host.AddressList[];
                        ipep =  IPEndPoint(ipAddr,port);
                    }
                    
                    {
                        ipep =  IPEndPoint(IPAddress.Parse(ip),port);
                    }
                    clientSocket =  Socket(ipep.AddressFamily,ProtocolType.Tcp);
                    clientSocket.SendTimeout = ;
                    clientSocket.ReceiveTimeout = ;
                    clientSocket.SendBufferSize = ;
                    clientSocket.ReceiveBufferSize = ;

                    
                    {
                        clientSocket.Connect(ipep);
                    }
                     (Exception ex)
                    {
                        LogUtil.Error(ex);
                        null || !clientSocket.Connected) ;

                    _lastHeartbeat =];
                        _socketAsyncArgs =  socketasynceventargs();
                        _socketAsyncArgs.SetBuffer(buffer,buffer.Length);
                        _socketAsyncCompleted = (s,1)">
                        {
                            ReceiveData(clientSocket,e);
                        };
                        _socketAsyncArgs.Completed += _socketAsyncCompleted;
                        clientSocket.ReceiveAsync(_socketAsyncArgs);
                    }
                     (Exception ex)
                    {
                        LogUtil.Error(ex);
                    }

                    检测服务端
                    _checkServerTimer =  System.Timers.Timer();
                    _checkServerTimer.AutoReset = ;
                    _checkServerTimer.Interval = ;
                    _checkServerTimer.Elapsed += CheckServer;
                    _checkServerTimer.Start();

                    LogUtil.Log(已连接服务器连接服务器失败#region 检测服务端
         检测服务端
        void CheckServer(
            {
                DateTime Now = DateTime.Now;
                if (Now.Subtract(_lastHeartbeat).TotalSeconds > )
                {
                    LogUtil.Log(服务端已失去连接if (clientSocket.Connected) clientSocket.disconnect();
                        clientSocket.Close();
                        clientSocket.dispose();
                        _socketAsyncArgs.Completed -= _socketAsyncCompleted;
                        _socketAsyncCompleted = ;
                        _socketAsyncArgs.dispose();
                        _socketAsyncArgs = ;
                    }
                     (Exception ex)
                    {
                        LogUtil.Error(ex);
                    }

                    Thread.Sleep(3000int tryCount = while (!ConnectServer() && tryCount++ < 10000) 重连
                    {
                        Thread.Sleep();
                    }
                    RegisterToServer(_roomNo,_devNo); 重新注册
检测服务端出错
            {
                _checkServerTimer.Start();
            }
        }
        #region 断开服务器
         断开服务器
         disconnectServer()
        {
            );
                    clientSocket.Close();
                    clientSocket.dispose();
                }
                LogUtil.Log(已断开服务器断开服务器失败#region 释放资源 
        if (heartbeatTimer != )
            {
                heartbeatTimer.Stop();
                heartbeatTimer.Close();
            }
            if (_checkServerTimer != )
            {
                _checkServerTimer.Stop();
                _checkServerTimer.Close();
            }
        }
        #region 心跳
         StartHeartbeat()
        {
            heartbeatTimer =  System.Timers.Timer();
            heartbeatTimer.AutoReset = ;
            heartbeatTimer.Interval = 10000;
            heartbeatTimer.Elapsed += new System.Timers.ElapsedEventHandler((obj,eea) => (_lockSend)
                {
                    );
                        SocketHelper.Send(clientSocket,bArrHeader);
                        SocketHelper.Send(clientSocket,1)">0x00 });
                    }
                     (Exception ex)
                    {
                        LogUtil.Error(向服务器发送心跳包出错: ex.Message);
                    }
                    
                    {
                        heartbeatTimer.Start();
                    }
                }
            });
            heartbeatTimer.Start();
        }
        #region 停止心跳
         StopHeartbeat()
        {
            heartbeatTimer.Stop();
        }
        #region 注册
         注册
        bool RegisterToServer( devNo)
        {
            _registerSuccess = ;
            SocketData data = ;
            data.socketRegisterData =  SocketRegisterData();
            data.socketRegisterData.RoomNo = roomNo;
            data.socketRegisterData.DevNo = devNo;
            _roomNo = roomNo;
            _devNo = devNo;
            Send(data);

            DateTime dt = DateTime.Now;
            while (!_registerSuccess && DateTime.Now.Subtract(dt).TotalMilliseconds < )
            {
                Thread.Sleep( _registerSuccess;
        }
         ReceiveData(Socket socket,_buffer,1)">if (_buffer.Count <  (socket.Connected)
                    {
                        socket.ReceiveAsync(e)) ReceiveData(socket,1)">];
                    copyTo(_buffer,1)"> (socket.Connected)
                            {
                                ];
                            copyTo(_buffer,1)">1 || bArrType[5) { } 心跳应答包、注册反馈包
                            3) 消息包
 (socket.Connected)
                                    {
                                        ];
                                    copyTo(_buffer,1)">0 || _buffer.Count < dataLength +  (socket.Connected)
                                        {
                                            
                            {
                                LogUtil.Error(type错误,丢掉错误数据,重新接收);
                                _buffer.Clear();  (socket.Connected)
                                {
                                    
                    {
                        LogUtil.Error(不是0XFF,丢掉错误数据,重新接收);
                        _buffer.Clear();  (socket.Connected)
                        {
                             ProcessSocketData(socket);
                }  (socket.Connected)
                {
                     SocketData ProcessSocketData(Socket socket)
        {
            ;
            SocketData data = ResolveBuffer(_buffer,1)">) RemoveBufferData(readLength);
                1) 心跳应答
                {
                    _lastHeartbeat =LogUtil.Log("收到心跳应答包,服务端正常");
消息数据
if (SocketReceivedEvent != )
                    {
                        SocketReceivedEventArgs args =  SocketReceivedEventArgs(data.MsgContent);
                        args.Callback =  CallbackSocket(socket);
                        ThreadHelper.Run((obj) =>
                            {
                                SocketReceivedEvent(this,obj as SocketReceivedEventArgs);
                            }
                             (Exception ex)
                            {
                                LogUtil.Error(ex);
                            }
                        },args);
                    }
                }

                注册反馈
                {
                    _registerSuccess = ;
                    LogUtil.Log(收到注册反馈包,注册成功);
                }
            }
             Encoding.UTF8.GetString(dataBody);
                        socketData.MsgContent = JsonConvert.DeserializeObject<WebApimsgContent>;
                }
            }
             Send(SocketData data)
        {
            Send(clientSocket,data);
        }

         Send(Socket socket,1)"> (_lockSend)
            {
                发送header
                bool bl =0x00 }); 0x02 });  Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.socketRegisterData));
                                            }
                }

                0x04 });  Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data.socketResult));
                        

    }
}
View Code

    SocketHelper代码(里面同步接收的方法Receive和ReceiveByte没有用到):

 Socket封装
    static  SocketHelper
    {
        bool Send(Socket socket,1)">[] data)
        {
            if (socket == null || !socket.Connected) ;

                int sendTotal = while (sendTotal < data.Length)
                {
                    int sendLength = data.Length - sendTotal;
                    if (sendLength > 1024) sendLength = 1024int sendOnce = socket.Send(data,sendTotal,sendLength,SocketFlags.None);
                    sendTotal += sendOnce;
                }
                 (Exception ex)
            {
                LogUtil.Error(ex);
                #region Receive
         Receive
        byte[] Receive(Socket socket,1)">[length];
                int receiveCount = while ((receiveCount = socket.Receive(buffer,length,SocketFlags.None)) == )
                {
                    Thread.Sleep();
                }
                while (receiveCount < length)
                {
                    int revCount = socket.Receive(buffer,receiveCount,buffer.Length - receiveCount,SocketFlags.None);
                    receiveCount += revCount;
                }
                 buffer;
            }
             (Exception ex)
            {
                ;
            }
        }

        byte? ReceiveByte(Socket socket)
        {
            ];
                1,1)">return buffer[];
            }
            #region IsZero
         IsZero
        bool IsZero( data)
            {
                if (b != ;
                }
            }
            LogUtil.Error(接收的字节数组内容全是0);
             bl;
        }
        

    }

}
View Code

    代码中接收消息是异步接收,提高性能,发送消息是同步发送,主要是为了和Android端对接方便,Android端按我这种方式发就可以了。

    下面是模拟500个客户端的程序代码下载链接

    SocketHelper批量客户端代码

     由于网络、客户端可能不在线等原因,消息不一定能送达,所以为了保证消息送达,需要使用数据库,将发送失败的消息存入数据库,定时重发,发送成功或者超时2天则删除失败记录,下面是自己画的时序图,可能画的不太专业:

    业务相关代码

    MsgUtil代码

 PrisonWebApi.Controllers.Common;
 PrisonWebApi.DAL;
 System.ComponentModel.DataAnnotations;
 System.Timers;
 System.Web;

 Web API 消息工具类
     MsgUtil
    {
        static WebApimsgDal m_WebApimsgDal = ;
        static System.Timers.Timer _timer;
         SocketServerHelper _socketServerHelper;
        #region Init 初始化
         初始化
         Init()
        {
            ThreadHelper.Run(() =>
            {
                m_WebApimsgDal = ServiceHelper.Get<WebApimsgDal>();
                int port = int.Parse(ConfigurationManager.AppSettings[SocketServerPort]);
                _socketServerHelper =  SocketServerHelper(port);
                _socketServerHelper.ReceivedSocketResultEvent += _socketServerHelper_ReceivedSocketResultEvent;
                _socketServerHelper.StartServer();

                _timer =  System.Timers.Timer();
                _timer.AutoReset = ;
                _timer.Interval = 40000; 注意,这个参数必须比Socket等待回调超时时间CallbackTimeout大
                _timer.Elapsed += MsgTask;
                _timer.Start();

                LogUtil.Log(Web API 消息工具类 初始化成功);
            },(ex) =>
            {
                LogUtil.Error(Web API 消息工具类 初始化失败);
            });
        }
        #region 定时任务
         定时任务
        void MsgTask(
                {
                    m_WebApimsgDal.DeleteTimeoutMsg(); 删除超时的消息
                    List<WEBAPI_MSG> list = m_WebApimsgDal.GetMsgList();
                    foreach (WEBAPI_MSG msg  list)
                    {
                        WebApimsgContent msgContent = JsonConvert.DeserializeObject<WebApimsgContent>(msg.MSGCONTENT);
                        msgContent.callbackId = msg.ID;

                        Send(msgContent,msg.RECEIVER,1)">);
                    }
                    if (list.Count > )
                    {
                        LogUtil.Log(已重发" + list.Count.ToString() + 条消息);
                    }
                }
                 (Exception ex)
                {
                    LogUtil.Error(ex);
                }
                
                {
                    _timer.Start();
                }
            });
        }
         接收数据
        void _socketServerHelper_ReceivedSocketResultEvent(bool> func = (callbackId) =>if (m_WebApimsgDal.Exists(()callbackId))
                    {
                        m_WebApimsgDal.DeleteById(()callbackId);
                    }
                }
                 (Exception ex)
                {
                    LogUtil.Error(ex,1)">删除消息出错;
            };

            if (e.socketResult != while (!func(e.socketResult.callbackId) && tryCount++ < #region Send 发送消息
         Send 发送消息
        )
        {
            _socketServerHelper.Send(msgContent,roomNo,devNo,callback);
        }

        static SocketResult Send(WebApimsgContent msgContent,1)"> devNo)
        {
             _socketServerHelper.Send(msgContent,devNo);
            }
            发送消息失败 dispose()
        {
            ThreadHelper.Run(() =>
            {
                _timer.Stop();
                _timer.Elapsed -= MsgTask;
                _timer.Close();
                _timer.dispose();
                _timer = ;

                _socketServerHelper.StopServer();
                _socketServerHelper.ReceivedSocketResultEvent -= _socketServerHelper_ReceivedSocketResultEvent;

                LogUtil.Log(Web API 消息工具类 释放资源成功Web API 消息工具类 释放资源失败

    }
}
View Code

    Web API 接口 MsgController 代码

 DBUtil;
 Swashbuckle.Swagger.Annotations;
 System.Globalization;
 System.Net.Http;
 System.Web;
 System.Web.Http;
 Utils;

 PrisonWebApi.Controllers.Common
{
     Web API 消息
    </summary>
    [RoutePrefix(api/msg)]
     MsgController : ApiController
    {
        #region 变量属性
        private WebApimsgDal m_WebApimsgDal = ServiceHelper.Get<WebApimsgDal>private TwoCJsDal m_TwoCJsDal = ServiceHelper.Get<TwoCJsDal>private BackstageAppInstallDal m_BackstageAppInstallDal = ServiceHelper.Get<BackstageAppInstallDal>private RollCallDal m_RollCallDal = ServiceHelper.Get<RollCallDal>private RollCallConfirmDal m_RollCallConfirmDal = ServiceHelper.Get<RollCallConfirmDal>#region 发送消息
         发送消息
        <param name="data">POST数据</param>
        [HttpPost]
        [Route(SendMsg)]
        [SwaggerResponse(HttpStatusCode.OK,1)">返回JSON",1)">typeof(JsonResult<SendMsgData>))]
        public HttpResponseMessage SendMsg([FromBody] SendMsgData data)
        {
            JsonResult jsonResult = if (data == null || data.msgContent == )
            {
                jsonResult = new JsonResult(请检查参数格式 ApiHelper.ToJson(jsonResult);
            }

            if (data.roomNo != null && data.devNos != 监室号和设备编码(指仓内屏或仓外屏的设备编码)不能都有值,应填写其中一个,或者都不填写string.IsNullOrWhiteSpace(data.msgContent.msgTime)) data.msgContent.msgTime = DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);

            .IsNullOrWhiteSpace(data.devNos))
            {
                string devNo in data.devNos.Split(''))
                    {
                        data.msgContent.callbackId = Guid.NewGuid().ToString();
                        MsgUtil.Send(data.msgContent,(socketResult) =>socketResult.success)
                            {
                                WEBAPI_MSG info =  WEBAPI_MSG();
                                info.ID = Guid.NewGuid().ToString();
                                info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime,CultureInfo.InvariantCulture);
                                info.RECEIVER = devNo;
                                info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                m_WebApimsgDal.Insert(info);
                            }
                        });
                    }
                }
                消息发送失败);
                    jsonResult =  ApiHelper.ToJson(jsonResult);
                }
            }
            .IsNullOrWhiteSpace(data.roomNo))
                {
                    
                    {
                        data.msgContent.callbackId = Guid.NewGuid().ToString( data.roomNo;
                                info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                m_WebApimsgDal.Insert(info);
                            }
                        });
                    }
                     (Exception ex)
                    {
                        LogUtil.Error(ex,1)">);
                        jsonResult =  ApiHelper.ToJson(jsonResult);
                    }
                }
                
                    {
                        List<string> roomNoList = m_TwoCJsDal.GetRoomNoListAll();
                        string roomNo  roomNoList)
                        {
                            data.msgContent.callbackId = Guid.NewGuid().ToString();
                            MsgUtil.Send(data.msgContent,1)">
                            {
                                socketResult.success)
                                {
                                    WEBAPI_MSG info =  WEBAPI_MSG();
                                    info.ID = Guid.NewGuid().ToString();
                                    info.MSGTIME = DateTime.ParseExact(data.msgContent.msgTime,CultureInfo.InvariantCulture);
                                    info.RECEIVER = roomNo;
                                    info.MSGCONTENT = JsonConvert.SerializeObject(data.msgContent);
                                    m_WebApimsgDal.Insert(info);
                                }
                            });
                        }
                    }
                     ApiHelper.ToJson(jsonResult);
                    }
                }
            }

            jsonResult = new JsonResult<CommonSubmitResult>( CommonSubmitResult()
            {
                msg = 消息发送成功
            });

             ApiHelper.ToJson(jsonResult);
        }
        #region APP安装消息反馈
         APP安装消息反馈
        InstallMsgFeedbacktypeof(JsonResult<CommonSubmitResult> HttpResponseMessage InstallMsgFeedback([FromBody] InstallMsgFeedbackData data)
        {
            JsonResult jsonResult =  ApiHelper.ToJson(jsonResult);
            }

            BACKSTAGE_APP_INSTALL info = m_BackstageAppInstallDal.Get(data.id);
            if (info !=  (data.success)
                {
                    info.STATUS = 1;
                    m_BackstageAppInstallDal.Update(info);
                }

                jsonResult =  CommonSubmitResult()
                {
                    msg = 反馈成功 info.ID
                });
            }
            
            {
                jsonResult = 反馈失败:安装记录不存在#region 发起点名成功反馈
         发起点名成功反馈
        RollCallMsgFeedback HttpResponseMessage RollCallMsgFeedback([FromBody] RollCallMsgFeedbackData data)
        {
            JsonResult jsonResult =  ApiHelper.ToJson(jsonResult);
            }

            ROLL_CALL info = m_RollCallDal.Get(data.id);
            2;
                    info.UPDATE_TIME = DateTime.Now;
                    m_RollCallDal.Update(info);
                }
                
                {
                    info.STATUS = 3;
                    info.ERROR_MSG = data.errorMsg;
                    info.UPDATE_TIME = DateTime.Now;
                    m_RollCallDal.Update(info);
                }

                jsonResult = 反馈失败:点名记录不存在#region 点名确认
         点名确认
        RollCallConfirm HttpResponseMessage RollCallConfirm([FromBody] RollCallConfirmData data)
        {
            JsonResult jsonResult =  ApiHelper.ToJson(jsonResult);
            }

            ROLL_CALL_CONFIRM info = m_RollCallConfirmDal.Get(data.rollCallId,data.prisonerId);
            if (info == null) info =  ROLL_CALL_CONFIRM();

            info.ROLL_CALL_ID = data.rollCallId;
            info.PRISONERID = data.prisonerId;
            info.CONFIRM_TIME = DateTime.Now;
            info.UPDATE_TIME = DateTime.Now;
            info.STATUS = ;
            m_RollCallConfirmDal.InsertOrUpdate(info);

            jsonResult = 点名确认成功 info.ID
            });

            

    }

    #region SendMsgData 发送消息数据
     发送消息数据
    </summary>
    [MyValidate]
     SendMsgData
    {
         消息内容
                [required]
        public WebApimsgContent msgContent { ; }

         监室号(如果为空,并且devNos也为空,则发送到所有监室;如果为空,并且devNos不为空,则按devNos发送)
        string roomNo {  设备编码(逗号隔开)(仓内屏或仓外屏的设备编码)
        string devNos { ; }
    }

     APP安装消息反馈
     InstallMsgFeedbackData
    {
         安装记录ID
        string id {  安装是否成功
        bool success {  安装失败原因
        string errorMsg {  发起点名成功反馈
     RollCallMsgFeedbackData
    {
         点名ID
         发起点名是否成功
         发起点名失败原因
         点名确认数据
     RollCallConfirmData
    {
        string rollCallId {  在押人员ID
        string prisonerId { ; }
    }
    

}
View Code

     有个严重BUG说明一下,博客就不改了:

 客户端或服务端离线的时候,服务端或客户端接收到长度为0的数据,造成死循环,需要加个判断(完整代码参见 https://gitee.com/s0611163/SocketHelper ):

if (e.BytesTransferred == )
{
    ReleaseClientSocket(clientSkt,skt);
    ;
}
View Code

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐