当前位置: 首页 > 工具软件 > SAEA.Socket > 使用案例 >

C#-IOCP-SAEA使用(解决“现在已经正在使用此 SocketAsyncEventArgs“问题)

顾淳
2023-12-01

1.解决思路:在server端以下socket通讯各异步过程中,规避使用相同的saea对象;

接受连接(_listenSocket.AcceptAsync(acceptSaea)),

接收数据(token.UserSocket.ReceiveAsync(recvSaea)),

发送数据(token.UserSocket.SendAsync(sendSaea))

即上述acceptSaea、recvSaea、sendSaea是不同的saea引用.

下面稍详细分享一下.

2.如本例server类中,ProcessAccept是accept连接后的回调处理,采用从池中pop出saea对象,

_stackPool.TryPop(out recvSaea);
Console.WriteLine($"接受连接[ {ipClient} ],总池容量:[ {_stackPool.Count} ]");

saea对象会传递到ProcessReceive数据接收方法中,

bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
if (!willRaiseEvent)
    ProcessReceive(recvSaea);
private void ProcessReceive(SocketAsyncEventArgs e)
{
...
}

在ProcessReceive方法的最后中将其回收(不使用池时,将其释放掉),

_stackPool.Push(e);
Console.WriteLine($"ProcessReceive->回收SAEA,总池容量:[ {_stackPool.Count} ]");

虽然ProcessReceive中仍需要saea对象作为发送时的参数,这时用新的saea对象就可以避免saea重复使用报错的问题,

_stackPool.TryPop(out sendSaea);
Console.WriteLine($"接收数据,总池容量:[ {_stackPool.Count} ]");
sendSaea.UserToken = token;
byte[] buffer = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, buffer, 0, e.BytesTransferred);
sendSaea.SetBuffer(buffer, 0, buffer.Length);
bool willRaiseEvent = token.UserSocket.SendAsync(sendSaea);
if (!willRaiseEvent)
    ProcessSend(sendSaea);

ProcessReceive中回复发送给客户端数据后,会触发ProcessSend方法用于继续接收客户端数据,仍然使用新的saea用于继续接收数据,

private void ProcessSend(SocketAsyncEventArgs e)
{
...
    _stackPool.TryPop(out recvSaea);
    Console.WriteLine($"回复数据,总池容量:[ {_stackPool.Count} ]");
    recvSaea.UserToken = token;
    bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
    if (!willRaiseEvent)
        ProcessReceive(recvSaea);
...
}

 ProcessSend的最后将ProcessReceive转递过来的saea对象回收(不使用池时,将其释放掉),

private void ProcessSend(SocketAsyncEventArgs e)
{
...
_stackPool.Push(e);
Console.WriteLine($"ProcessSend->回收SAEA,总池容量:[ {_stackPool.Count} ]");
}

3.本例中涉及的类:AsyncUserToken、SocketAsyncEventArgsPool、Server、Client;

4.自定义AsyncUserToken类时,一般至少有一个socket属性,用于存放当前socket连接;

    public class AsyncUserToken
    {
        public Socket UserSocket { get; set; }
        public String ID { get; set; }
    }

5.SocketAsyncEventArgsPool连接池,看具体设计决定是否使用;

同时,一般IOCP范例中还会使用BufferMangager用于内存分配管理,本例未使用,如具体设计使用时,应考虑好如何在初始化分配固定存储空间后,后续如何动态增加的问题.

以下附本例各文件源码.

6.服务端源码:

(1)AsyncUserToken.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleSaeaSvr
{
    public class AsyncUserToken
    {
        public Socket UserSocket { get; set; }
        public String ID { get; set; }
    }
}

(2) SocketAsyncEventArgsPool.cs

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

/// <summary>
/// https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.socketasynceventargs.aspx
/// </summary>
namespace ConsoleSaeaSvr
{
    /// <summary>
    /// Represents a collection of reusable SocketAsyncEventArgs objects.  
    /// 表示可重用的SocketAsyncEventArgs对象的集合。
    /// </summary>
    public class SocketAsyncEventArgsPool
    {
        private ConcurrentStack<SocketAsyncEventArgs> m_pool;

        public SocketAsyncEventArgsPool()
        {
            m_pool = new ConcurrentStack<SocketAsyncEventArgs>();
        }

        public void Push(SocketAsyncEventArgs item)
        {
            if (item == null) { throw new ArgumentException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
            m_pool.Push(item);
        }

        public bool TryPop(out SocketAsyncEventArgs saea)
        {
            return m_pool.TryPop(out saea);
        }

        public int Count
        {
            get { return m_pool.Count; }
        }
    }
}

(3)Server.cs

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleSaeaSvr
{
    public class Server
    {
        /// <summary>
        /// 最大并发数
        /// </summary>
        private static int _numInitPoolCount;
        /// <summary>
        /// 用于每个Socket I/O 操作的缓冲区大小
        /// </summary>
        private static int _receiveBufferSize;
        /// <summary>
        /// the socket used to listen for incoming connection requests
        /// </summary>
        private Socket _listenSocket;
        private static SocketAsyncEventArgsPool _stackPool = null;
        private static ConcurrentDictionary<string, SocketAsyncEventArgs> _connectedPool = null;
        public Server(int numInitPoolCount, int receivesBufferSize)
        {
            _numInitPoolCount = numInitPoolCount;
            _receiveBufferSize = receivesBufferSize;
            _stackPool = new SocketAsyncEventArgsPool();
            _connectedPool = new ConcurrentDictionary<string, SocketAsyncEventArgs>();
        }
        /// <summary>
        /// Starts the server such that it is listening for incoming connection requests. 
        /// </summary>
        /// <param name="localEndPoint">The endpoint which the server will listening for connection requests on</param>
        public void Start(IPEndPoint localEndPoint)
        {
            // create the socket which listens for incoming connections
            _listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            _listenSocket.Bind(localEndPoint);
            // start the server with a listen backlog of 100 connections
            _listenSocket.Listen(100);
            // post accepts on the listening socket 在接收端口上接收
            StartAccept(null);
            //按任意键终止服务器进程....
            Console.WriteLine("Server Started : Press any key to terminate the server process....");
            Console.ReadKey();
        }
        public void Init()
        {
            // 分配一个大字节缓冲区, 所有的 I/O 操作都使用一个。 这样对抗内存分裂,预先分配SocketAsyncEventArgs对象的池
            for (int i = 0; i < _numInitPoolCount; i++)
            {
                // 预先分配一组可重用的SocketAsyncEventArgs
                SocketAsyncEventArgs saea = new SocketAsyncEventArgs();
                saea.AcceptSocket = null;
                saea.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                saea.UserToken = new AsyncUserToken();
                // 给SocketAsyncEventArg对象从缓冲池分配一个字节缓冲区
                saea.SetBuffer(new byte[_receiveBufferSize], 0, _receiveBufferSize);
                // add SocketAsyncEventArg to the pool
                _stackPool.Push(saea);
            }
            Console.WriteLine($"初始化总池容量[ {_stackPool.Count} ]");
        }
        /// <summary>
        /// Begins an operation to accept a connection request from the client 
        /// </summary>
        /// <param name="p">The context object to use when issuing the accept operation on the server's listening socket</param>
        private void StartAccept(SocketAsyncEventArgs acceptSaea)
        {
            if (acceptSaea == null)
            {
                acceptSaea = new SocketAsyncEventArgs();
                acceptSaea.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArgs_Completed);
            }
            else
                // socket must be cleared since the context object is being reused
                acceptSaea.AcceptSocket = null;
            bool willRaiseEvent = _listenSocket.AcceptAsync(acceptSaea);
            if (!willRaiseEvent)
                ProcessAccept(acceptSaea);
        }
        /// <summary>
        ///  This method is the callback method associated with Socket.
        ///  AcceptAsync operations and is invoked when an accept operation is complete
        /// </summary>
        private void AcceptEventArgs_Completed(object sender, SocketAsyncEventArgs e)
        {
            ProcessAccept(e);
        }
        private void ProcessAccept(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = new AsyncUserToken();
            token.UserSocket = e.AcceptSocket;
            string ipClient = e.AcceptSocket.RemoteEndPoint.ToString();
            // ReadEventArg object user token
            SocketAsyncEventArgs recvSaea;
            if (_stackPool.Count == 0) Init();
            _stackPool.TryPop(out recvSaea);
            Console.WriteLine($"接受连接[ {ipClient} ],总池容量:[ {_stackPool.Count} ]");
            recvSaea.UserToken = token;
            if (recvSaea.Buffer == null)
            {
                byte[] buffer = new byte[1024];
                recvSaea.SetBuffer(buffer, 0, buffer.Length);
            }
            // As soon as the client is connected, post a receive to the connection
            bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
            if (!willRaiseEvent)
                ProcessReceive(recvSaea);
            // Accept the next connection request
            StartAccept(e);
        }
        /// <summary>
        /// This method is called whenever a receive or send operation is completed on a socket 
        /// </summary>
        /// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
        private void IO_Completed(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }
        }
        /// <summary>
        /// This method is invoked when an asynchronous receive operation completes. 
        /// If the remote host closed the connection, then the socket is closed.  
        /// If data was received then the data is echoed back to the client.
        /// </summary>
        /// <param name="e"></param>
        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = (AsyncUserToken)e.UserToken;
            // e.BytesTransferred获取套接字操作中传输的字节数。 
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                SocketAsyncEventArgs sendSaea;
                if (_stackPool.Count == 0) Init();
                _stackPool.TryPop(out sendSaea);
                Console.WriteLine($"接收数据,总池容量:[ {_stackPool.Count} ]");
                sendSaea.UserToken = token;
                byte[] buffer = new byte[e.BytesTransferred];
                Array.Copy(e.Buffer, e.Offset, buffer, 0, e.BytesTransferred);
                sendSaea.SetBuffer(buffer, 0, buffer.Length);
                bool willRaiseEvent = token.UserSocket.SendAsync(sendSaea);
                if (!willRaiseEvent)
                    ProcessSend(sendSaea);
            }
            else
                CloseClientSocket(e);
            _stackPool.Push(e);
            Console.WriteLine($"ProcessReceive->回收SAEA,总池容量:[ {_stackPool.Count} ]");
        }
        /// <summary>
        /// This method is invoked when an asynchronous send operation completes.  
        /// The method issues another receive on the socket to read any additional data sent from the client
        /// </summary>
        private void ProcessSend(SocketAsyncEventArgs e)
        {
            // done echoing data back to the client     完成回传数据回到客户端
            AsyncUserToken token = (AsyncUserToken)e.UserToken;
            if (e.SocketError == SocketError.Success)
            {
                // read the next block of data send from the client     读取从客户端发送的下一个数据块
                SocketAsyncEventArgs recvSaea;
                if (_stackPool.Count == 0) Init();
                _stackPool.TryPop(out recvSaea);
                Console.WriteLine($"回复数据,总池容量:[ {_stackPool.Count} ]");
                recvSaea.UserToken = token;
                bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
                if (!willRaiseEvent)
                    ProcessReceive(recvSaea);
            }
            else
                CloseClientSocket(e);
            _stackPool.Push(e);
            Console.WriteLine($"ProcessSend->回收SAEA,总池容量:[ {_stackPool.Count} ]");
        }
        private void CloseClientSocket(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = e.UserToken as AsyncUserToken;
            try
            {
                if (e != null)
                {
                    e.AcceptSocket?.Close();
                    e.AcceptSocket = null;
                }
                try { token?.UserSocket?.Shutdown(SocketShutdown.Send); } catch { }
            }
            finally
            {
                token?.UserSocket?.Close();
            }
            Console.WriteLine($"关闭一个连接");
        }
    }
}

(4)Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleSaeaSvr
{
    internal class Program
    {
        static void Main(string[] args)
        {
            string ip = "127.0.0.1";
            int port = 3000;
            IPAddress ipAdress = IPAddress.Parse(ip);
            IPEndPoint ipe = new IPEndPoint(ipAdress, port);
            Server server = new Server(100,1024);
            server.Init();
            server.Start(ipe);
        }
    }
}

7.客户端源码

(1)Client.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleSaeaClient
{
    public class Client
    {
        public Socket m_socket;
        IPEndPoint m_endPoint;
        private SocketAsyncEventArgs m_connectSAEA;
        private SocketAsyncEventArgs m_sendSAEA;
        public bool Connected
        {
            get
            {
                return m_socket.Connected;
            }
        }

        public Client(string ip, int port)
        {
            m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            IPAddress iPAddress = IPAddress.Parse(ip);
            m_endPoint = new IPEndPoint(iPAddress, port);
            m_connectSAEA = new SocketAsyncEventArgs { RemoteEndPoint = m_endPoint };
        }

        public void Connect()
        {
            m_connectSAEA.Completed += OnConnectedCompleted;
            m_socket.ConnectAsync(m_connectSAEA);
        }

        public void OnConnectedCompleted(object sender, SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success) return;
            Socket socket = sender as Socket;
            string iPRemote = socket.RemoteEndPoint.ToString();

            Console.WriteLine("Client : 连接服务器 [ {0} ] 成功", iPRemote);

            SocketAsyncEventArgs receiveSAEA = new SocketAsyncEventArgs();
            byte[] receiveBuffer = new byte[1024 * 4];
            receiveSAEA.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
            receiveSAEA.Completed += OnReceiveCompleted;
            receiveSAEA.RemoteEndPoint = m_endPoint;
            socket.ReceiveAsync(receiveSAEA);
        }

        private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.OperationAborted) return;

            Socket socket = sender as Socket;

            if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
            {
                string ipAdress = socket.RemoteEndPoint.ToString();
                int lengthBuffer = e.BytesTransferred;
                byte[] receiveBuffer = e.Buffer;
                byte[] buffer = new byte[lengthBuffer];
                Buffer.BlockCopy(receiveBuffer, 0, buffer, 0, lengthBuffer);

                string msg = Encoding.Default.GetString(buffer);
                Console.WriteLine("Client : receive message[ {0} ],from Server[ {1} ]", msg, ipAdress);
                socket.ReceiveAsync(e);
            }
            else if (e.SocketError == SocketError.ConnectionReset && e.BytesTransferred == 0)
            {
                Console.WriteLine("Client: 服务器断开连接 ");
            }
            else
            {
                return;
            }
        }

        public void Send(string msg)
        {
            byte[] sendBuffer = Encoding.Default.GetBytes(msg);
            if (m_sendSAEA == null)
            {
                m_sendSAEA = new SocketAsyncEventArgs();
                m_sendSAEA.Completed += OnSendCompleted;
            }

            m_sendSAEA.SetBuffer(sendBuffer, 0, sendBuffer.Length);
            if (m_socket != null)
            {
                m_socket.SendAsync(m_sendSAEA);
            }
        }

        private void OnSendCompleted(object sender, SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success) return;
            Socket socket = sender as Socket;
            byte[] sendBuffer = e.Buffer;

            string sendMsg = Encoding.Default.GetString(sendBuffer);

            Console.WriteLine("Client : Send message [ {0} ] to Serer[ {1} ]", sendMsg, socket.RemoteEndPoint.ToString());
        }

        public void DisConnect()
        {
            if (m_socket != null)
            {
                try
                {
                    m_socket.Shutdown(SocketShutdown.Both);
                }
                catch (SocketException se)
                {
                }
                finally
                {
                    m_socket.Close();
                }
            }
        }
    }
}

(2)Program.cs

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleSaeaClient
{
    internal class Program
    {
        private static List<Client> _clients = new List<Client>();
        static void Main(string[] args)
        {
            string ip = "127.0.0.1";
            int port = 3000;
            for (int i = 0; i < 5; i++)
            {
                Client client = new Client(ip, port);
                _clients.Add(client);
            }
            Console.WriteLine($"按键任意键开始...");
            Console.ReadKey();
            foreach (Client client in _clients)
            {
                Task.Run(() =>
                {
                    if (!client.Connected)
                        client.Connect();
                    Thread.Sleep(100);
                    client.Send($"Task{Task.CurrentId}->Hello World");
                });
                Thread.Sleep(100);
            }
            Console.ReadKey();
        }
    }
}

 

 类似资料: