我们游戏中断线和超时这一块出问题非常多。
这一次我专门抽出了一周时间,了解了下c#下的socket相关函数重写了网络模块。
首要目标是避免另开线程,使用nio。然后是内部处理超时,断线和其它IO异常,重连3次失败后,才提交给游戏自己处理。重连时,自动登陆并续发消息。
这里本来还需要添加一个补发消息的机制(断线后网络传输中导致丢失的消息),但是测试只有续发消息,已经解决掉了原来99%问题。所以补发消息先放一下,先再重构其它地方。
重构后测试的结果非常好,以前各种网络问题都没有复现,断线对玩家来说是不可见的,只要长时间断线(三次重连失败)才会给玩家一个提示框,让玩家手动点击去重连。
//---------------------------------------------- // NET : NetManager // AUTHOR: zhu kun qian //---------------------------------------------- using System.IO; public class NetMsg { // 有必要用id吗? // public cmd.CmdType cmdType; public uint loop;// 唯一的循环id public byte[] data;// protobuf二进制数据 public byte[] totalData;//完整的消息二进制数据 public MemoryStream netdata; // 这里能避免一次array copy不? // TODO:暂不考虑这避免arry copy的消耗,以后有时间时,可以考虑处理下。 }
//---------------------------------------------- // NET : NetManager // AUTHOR: zhu kun qian //---------------------------------------------- using System.Net; using System.Net.Sockets; using System.Runtime.Remoting.Messaging; using UnityEngine; using System; using System.Collections; using System.Collections.Generic; using System.IO; /* * 没有使用任何阻塞函数。 * * connect是block函数,所以改为使用ConnectAsync, beginConnect也是异步,也可以使用 * * 连接成功后,socket修改为non-blocking */ public class NetManagerSocket { // 如果断开连接,直接new socket public string ip; public int port; private Socket socket; private byte[] readBuf = new byte[64 * 1024];// 最高缓存接收64k数据。 private int readBufIndex = 0;//已经从readBuf中读取的字节数 private int available = 0;// readbuf已经从socket中接收的字节数 public SocketError socketError = SocketError.Success;// 这里应该是传递到上层,还是在这里处理? // --------------------------------------- // -- decoder相关 private int length = 0;// protobuf消息体中的长度 private int needRead = 0;// protobuf消息体中剩余需要读取的长度 // --------------------------------------- public void connect() { socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 5000);// 设置5秒发送超时 socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, false); socket.NoDelay = true; // socket.Blocking = false; socket.Blocking = true;//设置为非阻塞 socketError = SocketError.Success; Debug.Log("dontLinger:" + socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger)); try { // 这里需要修改为异步connect // http://msdn.microsoft.com/en-us/library/d7ew360f%28v=vs.110%29.aspx // Connect method will block, unless you specifically set the Blocking property to false prior to calling Connect. // If you are using a connection-oriented protocol like TCP and you do disable blocking, Connect will throw a SocketException because it needs time to make the connection // 结论:很奇怪,并没有按文档上描述的执行,设置为blocking,并没有抛出异常。 // socket.Connect(ip, port); // The asynchronous BeginConnect operation must be completed by calling the EndConnect method. // IAsyncResult result= socket.BeginConnect(ip, port, new AsyncCallback(connectCallback), socket); // Debug.Log("asyncResult is completed:"+result.IsCompleted); // 异步处理必须设置为blocking,why? //* SocketAsyncEventArgs eventArgs = new SocketAsyncEventArgs(); eventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(ip), port); eventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(connectCallback2); eventArgs.UserToken = socket; socket.ConnectAsync(eventArgs); // */ // 这里需要添加超时处理吗? } catch (Exception ex) { Debug.LogError("socket connection exception:" + ex); socketError = SocketError.ConnectionRefused; } } private void connectCallback2(object src, SocketAsyncEventArgs result) { Debug.Log("connectCallback2, isCompleted:" + result + " " + result.SocketError); if (result.SocketError != SocketError.Success) { socketError = result.SocketError; } else { Socket socket = (Socket)src; socket.Blocking = false; } } void connectCallback(IAsyncResult result) { Debug.Log("connectCallback22, isCompleted:" + result.IsCompleted); // connected = true; try { Socket socket = (Socket)result.AsyncState; socket.EndConnect(result); Debug.Log("socket is connected:" + socket.Connected); socket.Send(new byte[] { }); } catch (Exception e) { Debug.LogError("error:" + e); socketError = SocketError.ConnectionRefused; } } public void disconnect() { if (socket != null) { socket.Close(); socket = null; } } // public NetMsg read() { // 这里同步读也可以,但只读数据接收到足够一个消息的情况 // 使用availed预判进行同步读,不会阻塞主线程 // 解析消息放在这里。 if (socket != null && socket.Available > 0) { if (available == 0) { if (socket.Available < 12) { // 一个数据包,最小为12字节 return null; } // 开始从socket中读入一条数据 // 如果足够,就直接解析为消息返回 // 如果不够,就将数据放在cache中 socketRead(2); if (socketError != SocketError.Success) { return null; } length = readUshort(); if (length == 0) { socketRead(4); if (socketError != SocketError.Success) { return null; } length = readInt(); } int socketAvailable = socket.Available; needRead = length; if (socketAvailable < needRead) { // 不足时,socket有多少缓存数据就读多少 socketRead(socketAvailable); if (socketError != SocketError.Success) { return null; } needRead = needRead - socketAvailable; return null; } else { // 数据足够,解析数据 socketRead(needRead); if (socketError != SocketError.Success) { return null; } return readMsg(); } } else { // 继续从socket中接收数据 int socketAvailable = socket.Available; if (socketAvailable < needRead) { // 数据依旧不足 socketRead(socketAvailable); if (socketError != SocketError.Success) { return null; } needRead = needRead - socketAvailable; return null; } else { // 数据足够,解析数据 socketRead(needRead); if (socketError != SocketError.Success) { return null; } return readMsg(); } } } return null; } private void readReset() { // 读入一个完整消息后,重置数据 available = 0; readBufIndex = 0; length = 0; needRead = 0; } private NetMsg readMsg() { NetMsg netMsg = new NetMsg(); UInt32 loop = readUInt(); short cmdId = readShort(); byte[] protoData = null; if (cmdId > 0) { protoData = new byte[length - 10]; Array.Copy(readBuf, readBufIndex, protoData, 0, length - 10); } else { // 有压缩 MemoryStream inms = new MemoryStream(readBuf, readBufIndex, length - 10); MemoryStream outms = new MemoryStream(); SevenZipTool.Unzip(inms, outms); protoData = outms.ToArray(); cmdId = (short)-cmdId; } netMsg.loop = loop; netMsg.cmdType = (cmd.CmdType)cmdId; netMsg.data = protoData; netMsg.netdata=new MemoryStream(protoData); readReset(); return netMsg; } private short readShort() { short ret = BitConverter.ToInt16(readBuf, readBufIndex); readBufIndex += 2; return Endian.SwapInt16(ret); } private ushort readUshort() { ushort ret = BitConverter.ToUInt16(readBuf, readBufIndex); readBufIndex += 2; return Endian.SwapUInt16(ret); } private int readInt() { int ret = BitConverter.ToInt32(readBuf, readBufIndex); readBufIndex += 4; return Endian.SwapInt32(ret); } private uint readUInt() { uint ret = BitConverter.ToUInt32(readBuf, readBufIndex); readBufIndex += 4; return Endian.SwapUInt32(ret); } private void socketRead(int readLen) { //从socket中读入数据入在readBuf中 socket.Receive(readBuf, available, readLen, SocketFlags.None, out socketError); if (socketError != SocketError.Success) { Debug.LogError("socket Read error:" + socketError); } available += readLen; } public int send(NetMsg netMsg, int offset) { if (netMsg.totalData == null) { encode(netMsg); } int sendNum = socket.Send(netMsg.totalData, offset, netMsg.totalData.Length - offset, SocketFlags.None, out socketError); if (socketError != SocketError.Success) { Debug.LogError("socket send error:" + socketError); return 0; } if (sendNum + offset == netMsg.totalData.Length) { return -1;//标志,全部发送完成 } return sendNum; } private void encode(NetMsg netMsg) { MemoryStream outstream = new MemoryStream(); byte[] _t = null; int totalLength = netMsg.data.Length + 10; if (totalLength > 65535) { _t = BitConverter.GetBytes(Endian.SwapInt16((short)0)); outstream.Write(_t, 0, _t.Length); _t = BitConverter.GetBytes(Endian.SwapInt32(totalLength)); outstream.Write(_t, 0, _t.Length); } else { _t = BitConverter.GetBytes(Endian.SwapInt16((short)totalLength)); outstream.Write(_t, 0, _t.Length); } _t = BitConverter.GetBytes(Endian.SwapUInt32(netMsg.loop)); outstream.Write(_t, 0, _t.Length); _t = BitConverter.GetBytes(Endian.SwapInt16((short)netMsg.cmdType)); outstream.Write(_t, 0, _t.Length); outstream.Write(netMsg.data, 0, netMsg.data.Length); _t = BitConverter.GetBytes(Endian.SwapInt32((int)0)); outstream.Write(_t, 0, _t.Length); _t = outstream.ToArray(); netMsg.totalData = _t; } public bool isConnected() { return socket != null && (socket.Connected); } }
//---------------------------------------------- // NET : NetManager // AUTHOR: zhu kun qian //---------------------------------------------- using System; using System.Linq; using System.Net.Sockets; using UnityEngine; using System.Collections.Generic; using System.IO; public class NetManager { public uint loop;// 永不重复,一直加1 public string ip = ""; public int port = 0; private NetManagerSocket socket = null; private int maxQuerySize = 100;//接收和发送最多cache 100条消息 private int maxReconnectTimes = 3;//重连最多重试3次 // 发送相关 public Queue<NetMsg> msgQuery = new Queue<NetMsg>(); // 发送消息队列 public NetMsg reconnectTokenLogin = null;//重连发送的登陆消息 public bool loginSuccess = false; private int sendBytes = 0; private float sendBytesTimeout = 0f; // 发送消息超时 // 接收的消息队列 private Queue<NetMsg> receviedMsgQueue = new Queue<NetMsg>();// 从服务器接收到的消息 private int reconnectTryTimes = 0; // 重连次数 private float connectTimeout = 0f; // 连接超时 public EventDelegate.Callback reconnectErrorCallback; // 如果出现内部无法处理的得连(1、连试3次无法重连成功 2、累积的消息数量过量,需要重启游戏客户端) // 如果断线后,是使用原socket重连,还是使用新的socket?新new出来 public static NetManager Instance = null; public NetManager() { Instance = this; } public bool isConnected() { return socket != null && socket.isConnected(); } public void connect() { if (socket != null) { Debug.LogError("socket is not closed,try close"); socket.disconnect(); } Debug.Log("start connect ip:" + ip + " port:" + port); socket = new NetManagerSocket(); socket.ip = ip; socket.port = port; socket.connect(); sendBytes = 0; sendBytesTimeout = 0f; connectTimeout = 0f; } public void disconnect() { if (socket == null) { Debug.LogError("socket is null"); return; } socket.disconnect(); socket = null; } public void onUpdate() { // 每祯执行,没有阻塞 if (socket != null) { // 改为在socket中处理呢 if (socket.socketError != SocketError.Success) { // 如果遇到sokcet错误,不需要等等超时,立即重连 Debug.LogError("socket error:" + socket.socketError); tryReconnect(); return; } if (socket.isConnected()) { NetMsg msg = socket.read(); if (msg != null) { receviedMsgQueue.Enqueue(msg); } NetMsg netMsg = null; if (reconnectTokenLogin != null) { netMsg = reconnectTokenLogin; } else { if (loginSuccess) { lock (msgQuery) { if (msgQuery.Count > 0) { netMsg = msgQuery.First(); } } } } if (netMsg != null) { socketSend(netMsg); } } else { if (connectTimeout == 0f) { connectTimeout = Time.realtimeSinceStartup; } else if (Time.realtimeSinceStartup - connectTimeout > 5) { // 连接5秒超时,处理重连 tryReconnect(); } } } } private void tryReconnect() { if (reconnectTryTimes >= 3) { // 跳出错误处理回调,交给外部处理 if (reconnectErrorCallback != null) { reconnectErrorCallback(); } disconnect(); return; } Debug.LogError("socket connect timeout, try reconnect:" + reconnectTryTimes + " " + socket.socketError); reconnectTryTimes++; disconnect(); connect(); // 重试需要,重新发送登陆消息505 LoginState.Instance.loginWithTokenSub(true); } public NetMsg readMsg() { if (receviedMsgQueue.Count == 0) { return null; } return receviedMsgQueue.Dequeue(); } // true:超时 private void socketSend(NetMsg netMsg) { // 发送数据 bool newMsg = false;// 新发送的消息 if (sendBytes == 0) { newMsg = true; } int num = socket.send(netMsg, sendBytes); if (num > 0) { sendBytes += num; } if (num == -1) { // 全部发送完成 if (cmd.CmdType.tokenLogin == netMsg.cmdType) { reconnectTokenLogin = null; } else { lock (msgQuery) { msgQuery.Dequeue(); } } sendBytes = 0; sendBytesTimeout = 0f; } else { // 未发送完成,处理超时逻辑 if (newMsg) { sendBytesTimeout = Time.realtimeSinceStartup; } else { // 检查时间是否超时 if (Time.realtimeSinceStartup - sendBytesTimeout > 5) { // 超过5秒 Debug.LogError("socket timeout.try reconnect"); // 重连重发 if (socket.socketError != SocketError.Success) { Debug.LogError("socket error:" + socket.socketError); } socket.socketError = SocketError.TimedOut; } } } } public void SendCmd<T>(cmd.CmdType cmdType, T protoObj) { send(cmdType,protoObj); } public void send<T>(cmd.CmdType cmdType, T protoObj) { NetMsg netMsg = new NetMsg(); netMsg.loop = ++loop; netMsg.cmdType = cmdType; MemoryStream outms = new MemoryStream(); ProtoBuf.Serializer.Serialize(outms, protoObj); netMsg.data = outms.ToArray(); // todo:因为放在onupdate中,感觉这个lock也是可以避免掉的。暂时先加上,以后测试后再考虑去掉。 // 只要能确认不会多线程操作,就可以去掉这个lock if (cmdType == cmd.CmdType.tokenLogin) { reconnectTokenLogin = netMsg; loginSuccess = false; return; } lock (msgQuery) { msgQuery.Enqueue(netMsg); // 在onupdate中发送,这样只差3ms,是可以接受的 } if (msgQuery.Count > maxQuerySize) { Debug.LogError("msgQuery more than max size:" + msgQuery.Count); } } }
2015.01.19记:我重构时,未添加断线补发未送达的消息,我们客户端写的逻辑不严谨,在登陆游戏按钮上连续多次点击,会导致卡住登陆。我这里只是使用禁止多次点击来避免这个问题,如果使用有断线补发未送达的消息也可以不会出现这个问题。总之,不要依赖客户端代码很强壮,最好还是添加上断线补发消息。
0 条评论。