1.解决思路:在server端以下socket通讯各异步过程中,规避使用相同的saea对象;
接受连接(_listenSocket.AcceptAsync(acceptSaea)),
接收数据(token.UserSocket.ReceiveAsync(recvSaea)),
发送数据(token.UserSocket.SendAsync(sendSaea))
即上述acceptSaea、recvSaea、sendSaea是不同的saea引用.
下面稍详细分享一下.
2.如本例server类中,ProcessAccept是accept连接后的回调处理,采用从池中pop出saea对象,
_stackPool.TryPop(out recvSaea);
Console.WriteLine($"接受连接[ {ipClient} ],总池容量:[ {_stackPool.Count} ]");
saea对象会传递到ProcessReceive数据接收方法中,
bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
if (!willRaiseEvent)
ProcessReceive(recvSaea);
private void ProcessReceive(SocketAsyncEventArgs e)
{
...
}
在ProcessReceive方法的最后中将其回收(不使用池时,将其释放掉),
_stackPool.Push(e);
Console.WriteLine($"ProcessReceive->回收SAEA,总池容量:[ {_stackPool.Count} ]");
虽然ProcessReceive中仍需要saea对象作为发送时的参数,这时用新的saea对象就可以避免saea重复使用报错的问题,
_stackPool.TryPop(out sendSaea);
Console.WriteLine($"接收数据,总池容量:[ {_stackPool.Count} ]");
sendSaea.UserToken = token;
byte[] buffer = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, buffer, 0, e.BytesTransferred);
sendSaea.SetBuffer(buffer, 0, buffer.Length);
bool willRaiseEvent = token.UserSocket.SendAsync(sendSaea);
if (!willRaiseEvent)
ProcessSend(sendSaea);
ProcessReceive中回复发送给客户端数据后,会触发ProcessSend方法用于继续接收客户端数据,仍然使用新的saea用于继续接收数据,
private void ProcessSend(SocketAsyncEventArgs e)
{
...
_stackPool.TryPop(out recvSaea);
Console.WriteLine($"回复数据,总池容量:[ {_stackPool.Count} ]");
recvSaea.UserToken = token;
bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
if (!willRaiseEvent)
ProcessReceive(recvSaea);
...
}
ProcessSend的最后将ProcessReceive转递过来的saea对象回收(不使用池时,将其释放掉),
private void ProcessSend(SocketAsyncEventArgs e)
{
...
_stackPool.Push(e);
Console.WriteLine($"ProcessSend->回收SAEA,总池容量:[ {_stackPool.Count} ]");
}
3.本例中涉及的类:AsyncUserToken、SocketAsyncEventArgsPool、Server、Client;
4.自定义AsyncUserToken类时,一般至少有一个socket属性,用于存放当前socket连接;
public class AsyncUserToken
{
public Socket UserSocket { get; set; }
public String ID { get; set; }
}
5.SocketAsyncEventArgsPool连接池,看具体设计决定是否使用;
同时,一般IOCP范例中还会使用BufferMangager用于内存分配管理,本例未使用,如具体设计使用时,应考虑好如何在初始化分配固定存储空间后,后续如何动态增加的问题.
以下附本例各文件源码.
6.服务端源码:
(1)AsyncUserToken.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleSaeaSvr
{
public class AsyncUserToken
{
public Socket UserSocket { get; set; }
public String ID { get; set; }
}
}
(2) SocketAsyncEventArgsPool.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
/// <summary>
/// https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.socketasynceventargs.aspx
/// </summary>
namespace ConsoleSaeaSvr
{
/// <summary>
/// Represents a collection of reusable SocketAsyncEventArgs objects.
/// 表示可重用的SocketAsyncEventArgs对象的集合。
/// </summary>
public class SocketAsyncEventArgsPool
{
private ConcurrentStack<SocketAsyncEventArgs> m_pool;
public SocketAsyncEventArgsPool()
{
m_pool = new ConcurrentStack<SocketAsyncEventArgs>();
}
public void Push(SocketAsyncEventArgs item)
{
if (item == null) { throw new ArgumentException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
m_pool.Push(item);
}
public bool TryPop(out SocketAsyncEventArgs saea)
{
return m_pool.TryPop(out saea);
}
public int Count
{
get { return m_pool.Count; }
}
}
}
(3)Server.cs
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleSaeaSvr
{
public class Server
{
/// <summary>
/// 最大并发数
/// </summary>
private static int _numInitPoolCount;
/// <summary>
/// 用于每个Socket I/O 操作的缓冲区大小
/// </summary>
private static int _receiveBufferSize;
/// <summary>
/// the socket used to listen for incoming connection requests
/// </summary>
private Socket _listenSocket;
private static SocketAsyncEventArgsPool _stackPool = null;
private static ConcurrentDictionary<string, SocketAsyncEventArgs> _connectedPool = null;
public Server(int numInitPoolCount, int receivesBufferSize)
{
_numInitPoolCount = numInitPoolCount;
_receiveBufferSize = receivesBufferSize;
_stackPool = new SocketAsyncEventArgsPool();
_connectedPool = new ConcurrentDictionary<string, SocketAsyncEventArgs>();
}
/// <summary>
/// Starts the server such that it is listening for incoming connection requests.
/// </summary>
/// <param name="localEndPoint">The endpoint which the server will listening for connection requests on</param>
public void Start(IPEndPoint localEndPoint)
{
// create the socket which listens for incoming connections
_listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_listenSocket.Bind(localEndPoint);
// start the server with a listen backlog of 100 connections
_listenSocket.Listen(100);
// post accepts on the listening socket 在接收端口上接收
StartAccept(null);
//按任意键终止服务器进程....
Console.WriteLine("Server Started : Press any key to terminate the server process....");
Console.ReadKey();
}
public void Init()
{
// 分配一个大字节缓冲区, 所有的 I/O 操作都使用一个。 这样对抗内存分裂,预先分配SocketAsyncEventArgs对象的池
for (int i = 0; i < _numInitPoolCount; i++)
{
// 预先分配一组可重用的SocketAsyncEventArgs
SocketAsyncEventArgs saea = new SocketAsyncEventArgs();
saea.AcceptSocket = null;
saea.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
saea.UserToken = new AsyncUserToken();
// 给SocketAsyncEventArg对象从缓冲池分配一个字节缓冲区
saea.SetBuffer(new byte[_receiveBufferSize], 0, _receiveBufferSize);
// add SocketAsyncEventArg to the pool
_stackPool.Push(saea);
}
Console.WriteLine($"初始化总池容量[ {_stackPool.Count} ]");
}
/// <summary>
/// Begins an operation to accept a connection request from the client
/// </summary>
/// <param name="p">The context object to use when issuing the accept operation on the server's listening socket</param>
private void StartAccept(SocketAsyncEventArgs acceptSaea)
{
if (acceptSaea == null)
{
acceptSaea = new SocketAsyncEventArgs();
acceptSaea.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArgs_Completed);
}
else
// socket must be cleared since the context object is being reused
acceptSaea.AcceptSocket = null;
bool willRaiseEvent = _listenSocket.AcceptAsync(acceptSaea);
if (!willRaiseEvent)
ProcessAccept(acceptSaea);
}
/// <summary>
/// This method is the callback method associated with Socket.
/// AcceptAsync operations and is invoked when an accept operation is complete
/// </summary>
private void AcceptEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
private void ProcessAccept(SocketAsyncEventArgs e)
{
AsyncUserToken token = new AsyncUserToken();
token.UserSocket = e.AcceptSocket;
string ipClient = e.AcceptSocket.RemoteEndPoint.ToString();
// ReadEventArg object user token
SocketAsyncEventArgs recvSaea;
if (_stackPool.Count == 0) Init();
_stackPool.TryPop(out recvSaea);
Console.WriteLine($"接受连接[ {ipClient} ],总池容量:[ {_stackPool.Count} ]");
recvSaea.UserToken = token;
if (recvSaea.Buffer == null)
{
byte[] buffer = new byte[1024];
recvSaea.SetBuffer(buffer, 0, buffer.Length);
}
// As soon as the client is connected, post a receive to the connection
bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
if (!willRaiseEvent)
ProcessReceive(recvSaea);
// Accept the next connection request
StartAccept(e);
}
/// <summary>
/// This method is called whenever a receive or send operation is completed on a socket
/// </summary>
/// <param name="e">SocketAsyncEventArg associated with the completed receive operation</param>
private void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
/// <summary>
/// This method is invoked when an asynchronous receive operation completes.
/// If the remote host closed the connection, then the socket is closed.
/// If data was received then the data is echoed back to the client.
/// </summary>
/// <param name="e"></param>
private void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
// e.BytesTransferred获取套接字操作中传输的字节数。
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
SocketAsyncEventArgs sendSaea;
if (_stackPool.Count == 0) Init();
_stackPool.TryPop(out sendSaea);
Console.WriteLine($"接收数据,总池容量:[ {_stackPool.Count} ]");
sendSaea.UserToken = token;
byte[] buffer = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, buffer, 0, e.BytesTransferred);
sendSaea.SetBuffer(buffer, 0, buffer.Length);
bool willRaiseEvent = token.UserSocket.SendAsync(sendSaea);
if (!willRaiseEvent)
ProcessSend(sendSaea);
}
else
CloseClientSocket(e);
_stackPool.Push(e);
Console.WriteLine($"ProcessReceive->回收SAEA,总池容量:[ {_stackPool.Count} ]");
}
/// <summary>
/// This method is invoked when an asynchronous send operation completes.
/// The method issues another receive on the socket to read any additional data sent from the client
/// </summary>
private void ProcessSend(SocketAsyncEventArgs e)
{
// done echoing data back to the client 完成回传数据回到客户端
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.SocketError == SocketError.Success)
{
// read the next block of data send from the client 读取从客户端发送的下一个数据块
SocketAsyncEventArgs recvSaea;
if (_stackPool.Count == 0) Init();
_stackPool.TryPop(out recvSaea);
Console.WriteLine($"回复数据,总池容量:[ {_stackPool.Count} ]");
recvSaea.UserToken = token;
bool willRaiseEvent = token.UserSocket.ReceiveAsync(recvSaea);
if (!willRaiseEvent)
ProcessReceive(recvSaea);
}
else
CloseClientSocket(e);
_stackPool.Push(e);
Console.WriteLine($"ProcessSend->回收SAEA,总池容量:[ {_stackPool.Count} ]");
}
private void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
try
{
if (e != null)
{
e.AcceptSocket?.Close();
e.AcceptSocket = null;
}
try { token?.UserSocket?.Shutdown(SocketShutdown.Send); } catch { }
}
finally
{
token?.UserSocket?.Close();
}
Console.WriteLine($"关闭一个连接");
}
}
}
(4)Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleSaeaSvr
{
internal class Program
{
static void Main(string[] args)
{
string ip = "127.0.0.1";
int port = 3000;
IPAddress ipAdress = IPAddress.Parse(ip);
IPEndPoint ipe = new IPEndPoint(ipAdress, port);
Server server = new Server(100,1024);
server.Init();
server.Start(ipe);
}
}
}
7.客户端源码
(1)Client.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleSaeaClient
{
public class Client
{
public Socket m_socket;
IPEndPoint m_endPoint;
private SocketAsyncEventArgs m_connectSAEA;
private SocketAsyncEventArgs m_sendSAEA;
public bool Connected
{
get
{
return m_socket.Connected;
}
}
public Client(string ip, int port)
{
m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress iPAddress = IPAddress.Parse(ip);
m_endPoint = new IPEndPoint(iPAddress, port);
m_connectSAEA = new SocketAsyncEventArgs { RemoteEndPoint = m_endPoint };
}
public void Connect()
{
m_connectSAEA.Completed += OnConnectedCompleted;
m_socket.ConnectAsync(m_connectSAEA);
}
public void OnConnectedCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success) return;
Socket socket = sender as Socket;
string iPRemote = socket.RemoteEndPoint.ToString();
Console.WriteLine("Client : 连接服务器 [ {0} ] 成功", iPRemote);
SocketAsyncEventArgs receiveSAEA = new SocketAsyncEventArgs();
byte[] receiveBuffer = new byte[1024 * 4];
receiveSAEA.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
receiveSAEA.Completed += OnReceiveCompleted;
receiveSAEA.RemoteEndPoint = m_endPoint;
socket.ReceiveAsync(receiveSAEA);
}
private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.OperationAborted) return;
Socket socket = sender as Socket;
if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
{
string ipAdress = socket.RemoteEndPoint.ToString();
int lengthBuffer = e.BytesTransferred;
byte[] receiveBuffer = e.Buffer;
byte[] buffer = new byte[lengthBuffer];
Buffer.BlockCopy(receiveBuffer, 0, buffer, 0, lengthBuffer);
string msg = Encoding.Default.GetString(buffer);
Console.WriteLine("Client : receive message[ {0} ],from Server[ {1} ]", msg, ipAdress);
socket.ReceiveAsync(e);
}
else if (e.SocketError == SocketError.ConnectionReset && e.BytesTransferred == 0)
{
Console.WriteLine("Client: 服务器断开连接 ");
}
else
{
return;
}
}
public void Send(string msg)
{
byte[] sendBuffer = Encoding.Default.GetBytes(msg);
if (m_sendSAEA == null)
{
m_sendSAEA = new SocketAsyncEventArgs();
m_sendSAEA.Completed += OnSendCompleted;
}
m_sendSAEA.SetBuffer(sendBuffer, 0, sendBuffer.Length);
if (m_socket != null)
{
m_socket.SendAsync(m_sendSAEA);
}
}
private void OnSendCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success) return;
Socket socket = sender as Socket;
byte[] sendBuffer = e.Buffer;
string sendMsg = Encoding.Default.GetString(sendBuffer);
Console.WriteLine("Client : Send message [ {0} ] to Serer[ {1} ]", sendMsg, socket.RemoteEndPoint.ToString());
}
public void DisConnect()
{
if (m_socket != null)
{
try
{
m_socket.Shutdown(SocketShutdown.Both);
}
catch (SocketException se)
{
}
finally
{
m_socket.Close();
}
}
}
}
}
(2)Program.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleSaeaClient
{
internal class Program
{
private static List<Client> _clients = new List<Client>();
static void Main(string[] args)
{
string ip = "127.0.0.1";
int port = 3000;
for (int i = 0; i < 5; i++)
{
Client client = new Client(ip, port);
_clients.Add(client);
}
Console.WriteLine($"按键任意键开始...");
Console.ReadKey();
foreach (Client client in _clients)
{
Task.Run(() =>
{
if (!client.Connected)
client.Connect();
Thread.Sleep(100);
client.Send($"Task{Task.CurrentId}->Hello World");
});
Thread.Sleep(100);
}
Console.ReadKey();
}
}
}