我们游戏中断线和超时这一块出问题非常多。
这一次我专门抽出了一周时间,了解了下c#下的socket相关函数重写了网络模块。
首要目标是避免另开线程,使用nio。然后是内部处理超时,断线和其它IO异常,重连3次失败后,才提交给游戏自己处理。重连时,自动登陆并续发消息。
这里本来还需要添加一个补发消息的机制(断线后网络传输中导致丢失的消息),但是测试只有续发消息,已经解决掉了原来99%问题。所以补发消息先放一下,先再重构其它地方。
重构后测试的结果非常好,以前各种网络问题都没有复现,断线对玩家来说是不可见的,只要长时间断线(三次重连失败)才会给玩家一个提示框,让玩家手动点击去重连。
//----------------------------------------------
// NET : NetManager
// AUTHOR: zhu kun qian
//----------------------------------------------
using System.IO;
public class NetMsg
{
// 有必要用id吗?
//
public cmd.CmdType cmdType;
public uint loop;// 唯一的循环id
public byte[] data;// protobuf二进制数据
public byte[] totalData;//完整的消息二进制数据
public MemoryStream netdata;
// 这里能避免一次array copy不?
// TODO:暂不考虑这避免arry copy的消耗,以后有时间时,可以考虑处理下。
}
//----------------------------------------------
// NET : NetManager
// AUTHOR: zhu kun qian
//----------------------------------------------
using System.Net;
using System.Net.Sockets;
using System.Runtime.Remoting.Messaging;
using UnityEngine;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
/*
* 没有使用任何阻塞函数。
*
* connect是block函数,所以改为使用ConnectAsync, beginConnect也是异步,也可以使用
*
* 连接成功后,socket修改为non-blocking
*/
public class NetManagerSocket
{
// 如果断开连接,直接new socket
public string ip;
public int port;
private Socket socket;
private byte[] readBuf = new byte[64 * 1024];// 最高缓存接收64k数据。
private int readBufIndex = 0;//已经从readBuf中读取的字节数
private int available = 0;// readbuf已经从socket中接收的字节数
public SocketError socketError = SocketError.Success;// 这里应该是传递到上层,还是在这里处理?
// ---------------------------------------
// -- decoder相关
private int length = 0;// protobuf消息体中的长度
private int needRead = 0;// protobuf消息体中剩余需要读取的长度
// ---------------------------------------
public void connect()
{
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 5000);// 设置5秒发送超时
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, false);
socket.NoDelay = true;
// socket.Blocking = false;
socket.Blocking = true;//设置为非阻塞
socketError = SocketError.Success;
Debug.Log("dontLinger:" + socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger));
try
{
// 这里需要修改为异步connect
// http://msdn.microsoft.com/en-us/library/d7ew360f%28v=vs.110%29.aspx
// Connect method will block, unless you specifically set the Blocking property to false prior to calling Connect.
// If you are using a connection-oriented protocol like TCP and you do disable blocking, Connect will throw a SocketException because it needs time to make the connection
// 结论:很奇怪,并没有按文档上描述的执行,设置为blocking,并没有抛出异常。
// socket.Connect(ip, port);
// The asynchronous BeginConnect operation must be completed by calling the EndConnect method.
// IAsyncResult result= socket.BeginConnect(ip, port, new AsyncCallback(connectCallback), socket);
// Debug.Log("asyncResult is completed:"+result.IsCompleted);
// 异步处理必须设置为blocking,why?
//*
SocketAsyncEventArgs eventArgs = new SocketAsyncEventArgs();
eventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
eventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(connectCallback2);
eventArgs.UserToken = socket;
socket.ConnectAsync(eventArgs);
// */
// 这里需要添加超时处理吗?
}
catch (Exception ex)
{
Debug.LogError("socket connection exception:" + ex);
socketError = SocketError.ConnectionRefused;
}
}
private void connectCallback2(object src, SocketAsyncEventArgs result)
{
Debug.Log("connectCallback2, isCompleted:" + result + " " + result.SocketError);
if (result.SocketError != SocketError.Success)
{
socketError = result.SocketError;
}
else
{
Socket socket = (Socket)src;
socket.Blocking = false;
}
}
void connectCallback(IAsyncResult result)
{
Debug.Log("connectCallback22, isCompleted:" + result.IsCompleted);
// connected = true;
try
{
Socket socket = (Socket)result.AsyncState;
socket.EndConnect(result);
Debug.Log("socket is connected:" + socket.Connected);
socket.Send(new byte[] { });
}
catch (Exception e)
{
Debug.LogError("error:" + e);
socketError = SocketError.ConnectionRefused;
}
}
public void disconnect()
{
if (socket != null)
{
socket.Close();
socket = null;
}
}
//
public NetMsg read()
{
// 这里同步读也可以,但只读数据接收到足够一个消息的情况
// 使用availed预判进行同步读,不会阻塞主线程
// 解析消息放在这里。
if (socket != null && socket.Available > 0)
{
if (available == 0)
{
if (socket.Available < 12)
{
// 一个数据包,最小为12字节
return null;
}
// 开始从socket中读入一条数据
// 如果足够,就直接解析为消息返回
// 如果不够,就将数据放在cache中
socketRead(2);
if (socketError != SocketError.Success)
{
return null;
}
length = readUshort();
if (length == 0)
{
socketRead(4);
if (socketError != SocketError.Success)
{
return null;
}
length = readInt();
}
int socketAvailable = socket.Available;
needRead = length;
if (socketAvailable < needRead)
{
// 不足时,socket有多少缓存数据就读多少
socketRead(socketAvailable);
if (socketError != SocketError.Success)
{
return null;
}
needRead = needRead - socketAvailable;
return null;
}
else
{
// 数据足够,解析数据
socketRead(needRead);
if (socketError != SocketError.Success)
{
return null;
}
return readMsg();
}
}
else
{
// 继续从socket中接收数据
int socketAvailable = socket.Available;
if (socketAvailable < needRead)
{
// 数据依旧不足
socketRead(socketAvailable);
if (socketError != SocketError.Success)
{
return null;
}
needRead = needRead - socketAvailable;
return null;
}
else
{
// 数据足够,解析数据
socketRead(needRead);
if (socketError != SocketError.Success)
{
return null;
}
return readMsg();
}
}
}
return null;
}
private void readReset()
{
// 读入一个完整消息后,重置数据
available = 0;
readBufIndex = 0;
length = 0;
needRead = 0;
}
private NetMsg readMsg()
{
NetMsg netMsg = new NetMsg();
UInt32 loop = readUInt();
short cmdId = readShort();
byte[] protoData = null;
if (cmdId > 0)
{
protoData = new byte[length - 10];
Array.Copy(readBuf, readBufIndex, protoData, 0, length - 10);
}
else
{
// 有压缩
MemoryStream inms = new MemoryStream(readBuf, readBufIndex, length - 10);
MemoryStream outms = new MemoryStream();
SevenZipTool.Unzip(inms, outms);
protoData = outms.ToArray();
cmdId = (short)-cmdId;
}
netMsg.loop = loop;
netMsg.cmdType = (cmd.CmdType)cmdId;
netMsg.data = protoData;
netMsg.netdata=new MemoryStream(protoData);
readReset();
return netMsg;
}
private short readShort()
{
short ret = BitConverter.ToInt16(readBuf, readBufIndex);
readBufIndex += 2;
return Endian.SwapInt16(ret);
}
private ushort readUshort()
{
ushort ret = BitConverter.ToUInt16(readBuf, readBufIndex);
readBufIndex += 2;
return Endian.SwapUInt16(ret);
}
private int readInt()
{
int ret = BitConverter.ToInt32(readBuf, readBufIndex);
readBufIndex += 4;
return Endian.SwapInt32(ret);
}
private uint readUInt()
{
uint ret = BitConverter.ToUInt32(readBuf, readBufIndex);
readBufIndex += 4;
return Endian.SwapUInt32(ret);
}
private void socketRead(int readLen)
{ //从socket中读入数据入在readBuf中
socket.Receive(readBuf, available, readLen, SocketFlags.None, out socketError);
if (socketError != SocketError.Success)
{
Debug.LogError("socket Read error:" + socketError);
}
available += readLen;
}
public int send(NetMsg netMsg, int offset)
{
if (netMsg.totalData == null)
{
encode(netMsg);
}
int sendNum = socket.Send(netMsg.totalData, offset, netMsg.totalData.Length - offset, SocketFlags.None, out socketError);
if (socketError != SocketError.Success)
{
Debug.LogError("socket send error:" + socketError);
return 0;
}
if (sendNum + offset == netMsg.totalData.Length)
{
return -1;//标志,全部发送完成
}
return sendNum;
}
private void encode(NetMsg netMsg)
{
MemoryStream outstream = new MemoryStream();
byte[] _t = null;
int totalLength = netMsg.data.Length + 10;
if (totalLength > 65535)
{
_t = BitConverter.GetBytes(Endian.SwapInt16((short)0));
outstream.Write(_t, 0, _t.Length);
_t = BitConverter.GetBytes(Endian.SwapInt32(totalLength));
outstream.Write(_t, 0, _t.Length);
}
else
{
_t = BitConverter.GetBytes(Endian.SwapInt16((short)totalLength));
outstream.Write(_t, 0, _t.Length);
}
_t = BitConverter.GetBytes(Endian.SwapUInt32(netMsg.loop));
outstream.Write(_t, 0, _t.Length);
_t = BitConverter.GetBytes(Endian.SwapInt16((short)netMsg.cmdType));
outstream.Write(_t, 0, _t.Length);
outstream.Write(netMsg.data, 0, netMsg.data.Length);
_t = BitConverter.GetBytes(Endian.SwapInt32((int)0));
outstream.Write(_t, 0, _t.Length);
_t = outstream.ToArray();
netMsg.totalData = _t;
}
public bool isConnected()
{
return socket != null && (socket.Connected);
}
}
//----------------------------------------------
// NET : NetManager
// AUTHOR: zhu kun qian
//----------------------------------------------
using System;
using System.Linq;
using System.Net.Sockets;
using UnityEngine;
using System.Collections.Generic;
using System.IO;
public class NetManager
{
public uint loop;// 永不重复,一直加1
public string ip = "";
public int port = 0;
private NetManagerSocket socket = null;
private int maxQuerySize = 100;//接收和发送最多cache 100条消息
private int maxReconnectTimes = 3;//重连最多重试3次
// 发送相关
public Queue<NetMsg> msgQuery = new Queue<NetMsg>(); // 发送消息队列
public NetMsg reconnectTokenLogin = null;//重连发送的登陆消息
public bool loginSuccess = false;
private int sendBytes = 0;
private float sendBytesTimeout = 0f; // 发送消息超时
// 接收的消息队列
private Queue<NetMsg> receviedMsgQueue = new Queue<NetMsg>();// 从服务器接收到的消息
private int reconnectTryTimes = 0; // 重连次数
private float connectTimeout = 0f; // 连接超时
public EventDelegate.Callback reconnectErrorCallback; // 如果出现内部无法处理的得连(1、连试3次无法重连成功 2、累积的消息数量过量,需要重启游戏客户端)
// 如果断线后,是使用原socket重连,还是使用新的socket?新new出来
public static NetManager Instance = null;
public NetManager()
{
Instance = this;
}
public bool isConnected()
{
return socket != null && socket.isConnected();
}
public void connect()
{
if (socket != null)
{
Debug.LogError("socket is not closed,try close");
socket.disconnect();
}
Debug.Log("start connect ip:" + ip + " port:" + port);
socket = new NetManagerSocket();
socket.ip = ip;
socket.port = port;
socket.connect();
sendBytes = 0;
sendBytesTimeout = 0f;
connectTimeout = 0f;
}
public void disconnect()
{
if (socket == null)
{
Debug.LogError("socket is null");
return;
}
socket.disconnect();
socket = null;
}
public void onUpdate()
{
// 每祯执行,没有阻塞
if (socket != null)
{
// 改为在socket中处理呢
if (socket.socketError != SocketError.Success)
{
// 如果遇到sokcet错误,不需要等等超时,立即重连
Debug.LogError("socket error:" + socket.socketError);
tryReconnect();
return;
}
if (socket.isConnected())
{
NetMsg msg = socket.read();
if (msg != null)
{
receviedMsgQueue.Enqueue(msg);
}
NetMsg netMsg = null;
if (reconnectTokenLogin != null)
{
netMsg = reconnectTokenLogin;
}
else
{
if (loginSuccess)
{
lock (msgQuery)
{
if (msgQuery.Count > 0)
{
netMsg = msgQuery.First();
}
}
}
}
if (netMsg != null)
{
socketSend(netMsg);
}
}
else
{
if (connectTimeout == 0f)
{
connectTimeout = Time.realtimeSinceStartup;
}
else if (Time.realtimeSinceStartup - connectTimeout > 5)
{
// 连接5秒超时,处理重连
tryReconnect();
}
}
}
}
private void tryReconnect()
{
if (reconnectTryTimes >= 3)
{
// 跳出错误处理回调,交给外部处理
if (reconnectErrorCallback != null)
{
reconnectErrorCallback();
}
disconnect();
return;
}
Debug.LogError("socket connect timeout, try reconnect:" + reconnectTryTimes + " " + socket.socketError);
reconnectTryTimes++;
disconnect();
connect();
// 重试需要,重新发送登陆消息505
LoginState.Instance.loginWithTokenSub(true);
}
public NetMsg readMsg()
{
if (receviedMsgQueue.Count == 0)
{
return null;
}
return receviedMsgQueue.Dequeue();
}
// true:超时
private void socketSend(NetMsg netMsg)
{
// 发送数据
bool newMsg = false;// 新发送的消息
if (sendBytes == 0)
{
newMsg = true;
}
int num = socket.send(netMsg, sendBytes);
if (num > 0)
{
sendBytes += num;
}
if (num == -1)
{
// 全部发送完成
if (cmd.CmdType.tokenLogin == netMsg.cmdType)
{
reconnectTokenLogin = null;
}
else
{
lock (msgQuery)
{
msgQuery.Dequeue();
}
}
sendBytes = 0;
sendBytesTimeout = 0f;
}
else
{
// 未发送完成,处理超时逻辑
if (newMsg)
{
sendBytesTimeout = Time.realtimeSinceStartup;
}
else
{
// 检查时间是否超时
if (Time.realtimeSinceStartup - sendBytesTimeout > 5)
{
// 超过5秒
Debug.LogError("socket timeout.try reconnect");
// 重连重发
if (socket.socketError != SocketError.Success)
{
Debug.LogError("socket error:" + socket.socketError);
}
socket.socketError = SocketError.TimedOut;
}
}
}
}
public void SendCmd<T>(cmd.CmdType cmdType, T protoObj)
{
send(cmdType,protoObj);
}
public void send<T>(cmd.CmdType cmdType, T protoObj)
{
NetMsg netMsg = new NetMsg();
netMsg.loop = ++loop;
netMsg.cmdType = cmdType;
MemoryStream outms = new MemoryStream();
ProtoBuf.Serializer.Serialize(outms, protoObj);
netMsg.data = outms.ToArray();
// todo:因为放在onupdate中,感觉这个lock也是可以避免掉的。暂时先加上,以后测试后再考虑去掉。
// 只要能确认不会多线程操作,就可以去掉这个lock
if (cmdType == cmd.CmdType.tokenLogin)
{
reconnectTokenLogin = netMsg;
loginSuccess = false;
return;
}
lock (msgQuery)
{
msgQuery.Enqueue(netMsg);
// 在onupdate中发送,这样只差3ms,是可以接受的
}
if (msgQuery.Count > maxQuerySize)
{
Debug.LogError("msgQuery more than max size:" + msgQuery.Count);
}
}
}
2015.01.19记:我重构时,未添加断线补发未送达的消息,我们客户端写的逻辑不严谨,在登陆游戏按钮上连续多次点击,会导致卡住登陆。我这里只是使用禁止多次点击来避免这个问题,如果使用有断线补发未送达的消息也可以不会出现这个问题。总之,不要依赖客户端代码很强壮,最好还是添加上断线补发消息。
0 条评论。