游戏中重构了网络模块

我们游戏中断线和超时这一块出问题非常多。

这一次我专门抽出了一周时间,了解了下c#下的socket相关函数重写了网络模块。

首要目标是避免另开线程,使用nio。然后是内部处理超时,断线和其它IO异常,重连3次失败后,才提交给游戏自己处理。重连时,自动登陆并续发消息。

这里本来还需要添加一个补发消息的机制(断线后网络传输中导致丢失的消息),但是测试只有续发消息,已经解决掉了原来99%问题。所以补发消息先放一下,先再重构其它地方。

重构后测试的结果非常好,以前各种网络问题都没有复现,断线对玩家来说是不可见的,只要长时间断线(三次重连失败)才会给玩家一个提示框,让玩家手动点击去重连。

//----------------------------------------------
//           NET        : NetManager
//           AUTHOR: zhu kun qian
//----------------------------------------------

using System.IO;

public class NetMsg
{
    // 有必要用id吗?
    // 
    public cmd.CmdType cmdType;
    public uint loop;// 唯一的循环id
    public byte[] data;// protobuf二进制数据
    public byte[] totalData;//完整的消息二进制数据
    public MemoryStream netdata;

    // 这里能避免一次array copy不?
    // TODO:暂不考虑这避免arry copy的消耗,以后有时间时,可以考虑处理下。
}

 

//----------------------------------------------
//           NET        : NetManager
//           AUTHOR: zhu kun qian
//----------------------------------------------

using System.Net;
using System.Net.Sockets;
using System.Runtime.Remoting.Messaging;
using UnityEngine;
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;

/*
 * 没有使用任何阻塞函数。
 * 
 * connect是block函数,所以改为使用ConnectAsync, beginConnect也是异步,也可以使用
 * 
 * 连接成功后,socket修改为non-blocking
*/
public class NetManagerSocket
{

    // 如果断开连接,直接new socket

    public string ip;
    public int port;

    private Socket socket;
    private byte[] readBuf = new byte[64 * 1024];// 最高缓存接收64k数据。

    private int readBufIndex = 0;//已经从readBuf中读取的字节数
    private int available = 0;// readbuf已经从socket中接收的字节数

    public SocketError socketError = SocketError.Success;// 这里应该是传递到上层,还是在这里处理?

    // ---------------------------------------
    // -- decoder相关
    private int length = 0;// protobuf消息体中的长度
    private int needRead = 0;// protobuf消息体中剩余需要读取的长度
    // ---------------------------------------

    public void connect()
    {
        socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
        socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 5000);// 设置5秒发送超时
        socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, false);
        socket.NoDelay = true;
        // socket.Blocking = false;
        socket.Blocking = true;//设置为非阻塞

        socketError = SocketError.Success;

        Debug.Log("dontLinger:" + socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger));
        try
        {
            // 这里需要修改为异步connect
            // http://msdn.microsoft.com/en-us/library/d7ew360f%28v=vs.110%29.aspx
            // Connect method will block, unless you specifically set the Blocking property to false prior to calling Connect.
            // If you are using a connection-oriented protocol like TCP and you do disable blocking, Connect will throw a SocketException because it needs time to make the connection
            // 结论:很奇怪,并没有按文档上描述的执行,设置为blocking,并没有抛出异常。

            // socket.Connect(ip, port);

            // The asynchronous BeginConnect operation must be completed by calling the EndConnect method.
            // IAsyncResult result= socket.BeginConnect(ip, port, new AsyncCallback(connectCallback), socket);

            // Debug.Log("asyncResult is completed:"+result.IsCompleted);

            // 异步处理必须设置为blocking,why?
            //*
            SocketAsyncEventArgs eventArgs = new SocketAsyncEventArgs();
            eventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
            eventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(connectCallback2);
            eventArgs.UserToken = socket;

            socket.ConnectAsync(eventArgs);

            // */
            // 这里需要添加超时处理吗?
        }
        catch (Exception ex)
        {
            Debug.LogError("socket connection exception:" + ex);
            socketError = SocketError.ConnectionRefused;
        }
    }

    private void connectCallback2(object src, SocketAsyncEventArgs result)
    {
        Debug.Log("connectCallback2, isCompleted:" + result + " " + result.SocketError);
        if (result.SocketError != SocketError.Success)
        {
            socketError = result.SocketError;
        }
        else
        {
            Socket socket = (Socket)src;
            socket.Blocking = false;
        }
    }

    void connectCallback(IAsyncResult result)
    {
        Debug.Log("connectCallback22, isCompleted:" + result.IsCompleted);
        // connected = true;
        try
        {
            Socket socket = (Socket)result.AsyncState;
            socket.EndConnect(result);

            Debug.Log("socket is connected:" + socket.Connected);
            socket.Send(new byte[] { });
        }
        catch (Exception e)
        {
            Debug.LogError("error:" + e);
            socketError = SocketError.ConnectionRefused;
        }
    }

    public void disconnect()
    {
        if (socket != null)
        {
            socket.Close();
            socket = null;
        }
    }

    //

    public NetMsg read()
    {
        // 这里同步读也可以,但只读数据接收到足够一个消息的情况
        // 使用availed预判进行同步读,不会阻塞主线程
        // 解析消息放在这里。
        if (socket != null && socket.Available > 0)
        {
            if (available == 0)
            {
                if (socket.Available < 12)
                {
                    // 一个数据包,最小为12字节
                    return null;
                }
                // 开始从socket中读入一条数据
                // 如果足够,就直接解析为消息返回
                // 如果不够,就将数据放在cache中

                socketRead(2);
                if (socketError != SocketError.Success)
                {
                    return null;
                }

                length = readUshort();

                if (length == 0)
                {
                    socketRead(4);
                    if (socketError != SocketError.Success)
                    {
                        return null;
                    }
                    length = readInt();
                }
                int socketAvailable = socket.Available;
                needRead = length;
                if (socketAvailable < needRead)
                {
                    // 不足时,socket有多少缓存数据就读多少
                    socketRead(socketAvailable);
                    if (socketError != SocketError.Success)
                    {
                        return null;
                    }

                    needRead = needRead - socketAvailable;
                    return null;
                }
                else
                {
                    // 数据足够,解析数据
                    socketRead(needRead);
                    if (socketError != SocketError.Success)
                    {
                        return null;
                    }
                    return readMsg();
                }
            }
            else
            {
                // 继续从socket中接收数据
                int socketAvailable = socket.Available;
                if (socketAvailable < needRead)
                {
                    // 数据依旧不足
                    socketRead(socketAvailable);
                    if (socketError != SocketError.Success)
                    {
                        return null;
                    }

                    needRead = needRead - socketAvailable;
                    return null;
                }
                else
                {
                    // 数据足够,解析数据
                    socketRead(needRead);
                    if (socketError != SocketError.Success)
                    {
                        return null;
                    }

                    return readMsg();
                }
            }
        }
        return null;
    }

    private void readReset()
    {
        // 读入一个完整消息后,重置数据
        available = 0;
        readBufIndex = 0;
        length = 0;
        needRead = 0;
    }
    private NetMsg readMsg()
    {
        NetMsg netMsg = new NetMsg();

        UInt32 loop = readUInt();
        short cmdId = readShort();

        byte[] protoData = null;
        if (cmdId > 0)
        {
            protoData = new byte[length - 10];
            Array.Copy(readBuf, readBufIndex, protoData, 0, length - 10);
        }
        else
        {
            // 有压缩
            MemoryStream inms = new MemoryStream(readBuf, readBufIndex, length - 10);
            MemoryStream outms = new MemoryStream();
            SevenZipTool.Unzip(inms, outms);
            protoData = outms.ToArray();
            cmdId = (short)-cmdId;
        }
        

        netMsg.loop = loop;
        netMsg.cmdType = (cmd.CmdType)cmdId;
        netMsg.data = protoData;
        netMsg.netdata=new MemoryStream(protoData);
        readReset();
        return netMsg;
    }

    private short readShort()
    {
        short ret = BitConverter.ToInt16(readBuf, readBufIndex);
        readBufIndex += 2;
        return Endian.SwapInt16(ret);
    }
    private ushort readUshort()
    {
        ushort ret = BitConverter.ToUInt16(readBuf, readBufIndex);
        readBufIndex += 2;
        return Endian.SwapUInt16(ret);
    }

    private int readInt()
    {
        int ret = BitConverter.ToInt32(readBuf, readBufIndex);
        readBufIndex += 4;
        return Endian.SwapInt32(ret);
    }

    private uint readUInt()
    {
        uint ret = BitConverter.ToUInt32(readBuf, readBufIndex);
        readBufIndex += 4;
        return Endian.SwapUInt32(ret);

    }
    private void socketRead(int readLen)
    {    //从socket中读入数据入在readBuf中
        socket.Receive(readBuf, available, readLen, SocketFlags.None, out socketError);
        if (socketError != SocketError.Success)
        {
            Debug.LogError("socket Read error:" + socketError);
        }
        available += readLen;
    }

    public int send(NetMsg netMsg, int offset)
    {
        if (netMsg.totalData == null)
        {
            encode(netMsg);
        }

        int sendNum = socket.Send(netMsg.totalData, offset, netMsg.totalData.Length - offset, SocketFlags.None, out socketError);
        if (socketError != SocketError.Success)
        {
            Debug.LogError("socket send error:" + socketError);
            return 0;
        }

        if (sendNum + offset == netMsg.totalData.Length)
        {
            return -1;//标志,全部发送完成
        }
        return sendNum;
    }


    private void encode(NetMsg netMsg)
    {
        MemoryStream outstream = new MemoryStream();
        byte[] _t = null;

        int totalLength = netMsg.data.Length + 10;
        if (totalLength > 65535)
        {
            _t = BitConverter.GetBytes(Endian.SwapInt16((short)0));
            outstream.Write(_t, 0, _t.Length);
            _t = BitConverter.GetBytes(Endian.SwapInt32(totalLength));
            outstream.Write(_t, 0, _t.Length);
        }
        else
        {
            _t = BitConverter.GetBytes(Endian.SwapInt16((short)totalLength));
            outstream.Write(_t, 0, _t.Length);
        }

        _t = BitConverter.GetBytes(Endian.SwapUInt32(netMsg.loop));
        outstream.Write(_t, 0, _t.Length);
        _t = BitConverter.GetBytes(Endian.SwapInt16((short)netMsg.cmdType));
        outstream.Write(_t, 0, _t.Length);
        outstream.Write(netMsg.data, 0, netMsg.data.Length);
        _t = BitConverter.GetBytes(Endian.SwapInt32((int)0));
        outstream.Write(_t, 0, _t.Length);

        _t = outstream.ToArray();
        netMsg.totalData = _t;
    }

    public bool isConnected()
    {
        return socket != null && (socket.Connected);
    }
}
//----------------------------------------------
//           NET        : NetManager
//           AUTHOR: zhu kun qian
//----------------------------------------------

using System;
using System.Linq;
using System.Net.Sockets;

using UnityEngine;
using System.Collections.Generic;
using System.IO;


public class NetManager
{
    public uint loop;// 永不重复,一直加1

    public string ip = "";
    public int port = 0;

    private NetManagerSocket socket = null;

    private int maxQuerySize = 100;//接收和发送最多cache 100条消息
    private int maxReconnectTimes = 3;//重连最多重试3次

    // 发送相关
    public Queue<NetMsg> msgQuery = new Queue<NetMsg>(); // 发送消息队列
    public NetMsg reconnectTokenLogin = null;//重连发送的登陆消息
    public bool loginSuccess = false;

    private int sendBytes = 0;
    private float sendBytesTimeout = 0f; // 发送消息超时

    // 接收的消息队列
    private Queue<NetMsg> receviedMsgQueue = new Queue<NetMsg>();// 从服务器接收到的消息

    private int reconnectTryTimes = 0; // 重连次数
    private float connectTimeout = 0f; // 连接超时
    public EventDelegate.Callback reconnectErrorCallback; // 如果出现内部无法处理的得连(1、连试3次无法重连成功 2、累积的消息数量过量,需要重启游戏客户端)
    

    // 如果断线后,是使用原socket重连,还是使用新的socket?新new出来
    public static NetManager Instance = null;
    public NetManager()
    {
        Instance = this;
    }

    public bool isConnected()
    {
        return socket != null && socket.isConnected();
    }
    public void connect()
    {
        if (socket != null)
        {
            Debug.LogError("socket is not closed,try close");
            socket.disconnect();
        }

        Debug.Log("start connect ip:" + ip + " port:" + port);

        socket = new NetManagerSocket();
        socket.ip = ip;
        socket.port = port;
        socket.connect();

        sendBytes = 0;
        sendBytesTimeout = 0f;

        connectTimeout = 0f;
    }

    public void disconnect()
    {
        if (socket == null)
        {
            Debug.LogError("socket is null");
            return;
        }
        socket.disconnect();
        socket = null;
    }

    public void onUpdate()
    {
        // 每祯执行,没有阻塞
        if (socket != null)
        {
            // 改为在socket中处理呢
            if (socket.socketError != SocketError.Success)
            {
                // 如果遇到sokcet错误,不需要等等超时,立即重连
                Debug.LogError("socket error:" + socket.socketError);
                tryReconnect();
                return;
            }

            if (socket.isConnected())
            {
                NetMsg msg = socket.read();
                if (msg != null)
                {
                    receviedMsgQueue.Enqueue(msg);
                }
                NetMsg netMsg = null;
                if (reconnectTokenLogin != null)
                {
                    netMsg = reconnectTokenLogin;
                }
                else
                {
                    if (loginSuccess)
                    {
                        lock (msgQuery)
                        {
                            if (msgQuery.Count > 0)
                            {
                                netMsg = msgQuery.First();
                            }
                        }
                    }
                }
                if (netMsg != null)
                {
                    socketSend(netMsg);
                }
            }
            else
            {
                if (connectTimeout == 0f)
                {
                    connectTimeout = Time.realtimeSinceStartup;
                }
                else if (Time.realtimeSinceStartup - connectTimeout > 5)
                {
                    // 连接5秒超时,处理重连
                    tryReconnect();
                }
            }
        }
    }

    private void tryReconnect()
    {
        if (reconnectTryTimes >= 3)
        {
            // 跳出错误处理回调,交给外部处理
            if (reconnectErrorCallback != null)
            {
                reconnectErrorCallback();
            }
            disconnect();
            return;
        }
        Debug.LogError("socket connect timeout, try reconnect:" + reconnectTryTimes + " " + socket.socketError);
        reconnectTryTimes++;
        disconnect();
        connect();
        // 重试需要,重新发送登陆消息505
        LoginState.Instance.loginWithTokenSub(true);

    }

    public NetMsg readMsg()
    {
        if (receviedMsgQueue.Count == 0)
        {
            return null;
        }
        return receviedMsgQueue.Dequeue();
    }

    // true:超时
    private void socketSend(NetMsg netMsg)
    {
        // 发送数据
        bool newMsg = false;// 新发送的消息
        if (sendBytes == 0)
        {
            newMsg = true;

        }
        int num = socket.send(netMsg, sendBytes);
        if (num > 0)
        {
            sendBytes += num;
        }
        if (num == -1)
        {
            // 全部发送完成
            if (cmd.CmdType.tokenLogin == netMsg.cmdType)
            {
                reconnectTokenLogin = null;
            }
            else
            {
                lock (msgQuery)
                {
                    msgQuery.Dequeue();
                }
            }
            sendBytes = 0;
            sendBytesTimeout = 0f;
        }
        else
        {
            // 未发送完成,处理超时逻辑
            if (newMsg)
            {
                sendBytesTimeout = Time.realtimeSinceStartup;
            }
            else
            {
                // 检查时间是否超时
                if (Time.realtimeSinceStartup - sendBytesTimeout > 5)
                {
                    // 超过5秒
                    Debug.LogError("socket timeout.try reconnect");
                    // 重连重发
                    if (socket.socketError != SocketError.Success)
                    {
                        Debug.LogError("socket error:" + socket.socketError);
                    }
                    socket.socketError = SocketError.TimedOut;
                }
            }
        }
    }

    public void SendCmd<T>(cmd.CmdType cmdType, T protoObj)
    {
        send(cmdType,protoObj);
    }
    public void send<T>(cmd.CmdType cmdType, T protoObj)
    {
        NetMsg netMsg = new NetMsg();
        netMsg.loop = ++loop;
        netMsg.cmdType = cmdType;

        MemoryStream outms = new MemoryStream();
        ProtoBuf.Serializer.Serialize(outms, protoObj);
        netMsg.data = outms.ToArray();

        // todo:因为放在onupdate中,感觉这个lock也是可以避免掉的。暂时先加上,以后测试后再考虑去掉。
        // 只要能确认不会多线程操作,就可以去掉这个lock
        if (cmdType == cmd.CmdType.tokenLogin)
        {
            reconnectTokenLogin = netMsg;
            loginSuccess = false;
            return;
        }
        lock (msgQuery)
        {
            msgQuery.Enqueue(netMsg);
            // 在onupdate中发送,这样只差3ms,是可以接受的
        }

        if (msgQuery.Count > maxQuerySize)
        {
            Debug.LogError("msgQuery more than max size:" + msgQuery.Count);
        }
    }
}








2015.01.19记:我重构时,未添加断线补发未送达的消息,我们客户端写的逻辑不严谨,在登陆游戏按钮上连续多次点击,会导致卡住登陆。我这里只是使用禁止多次点击来避免这个问题,如果使用有断线补发未送达的消息也可以不会出现这个问题。总之,不要依赖客户端代码很强壮,最好还是添加上断线补发消息。

发表评论?

0 条评论。

发表评论


注意 - 你可以用以下 HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>


Warning: Use of undefined constant XML - assumed 'XML' (this will throw an Error in a future version of PHP) in /opt/wordpress/wp-content/plugins/wp-syntaxhighlighter/wp-syntaxhighlighter.php on line 1048