这是2014年在csdn上写的,现在迁过来
最近在GitHub上发现了FastSocket框架。初步用了一下发现实现很简单。几个Command类把需要执行的命令封闭起来,框架实现了协议,分包等功能。用户无需关心。用了一段时间发现少了两个比较主要的功能,心跳,回调。网上也没找到相关的文档。干脆自己实现。
一、首先是Server端。
第1步添加一个CallbackCommand
public class CallbackCommand : ICommand<AsyncBinaryCommandInfo>
{
public void ExecuteCommand(IConnection connection, AsyncBinaryCommandInfo commandInfo)
{
if (commandInfo.Buffer == null || commandInfo.Buffer.Length == 0)
{
connection.BeginDisconnect();
return;
}
commandInfo.Reply(connection, new byte[] { 0x00 });
var v = MyService._connection.Find(model => model.ConnectionID == connection.ConnectionID);
v.callBack = commandInfo;
}
public string Name { get { return "Callback"; } }
}
第2步 客户端连入的时候将connection信息记录进一个Hash表。
public override void OnConnected(IConnection connection)
{
base.OnConnected(connection);
_connection.Add(connection);
}
第3步 IConnection接口加入三个属性。
/// <summary>
/// 当前连接的最后心跳时间
/// </summary>
DateTime HeartTime { get; set; }
/// <summary>
/// 获取或设置与用户数据
/// </summary>
object UserData { get; set; }
object callBack { get; set; }
最后心跳时间、客户端接入的用户信息、以及回调对象。
第4步 添加两个子线程,用于测试回调及定时心跳,这两个子,这两个子线程 可以在服务启动的时候直接启动。
void TestSend() //主动推送消息到客户端
{
byte[] buffer = new byte[] { 0x00, 0x99, 0xAA, 0xFF };
Packet packet = PacketBuilder.ToAsyncBinary("TestSend", 0, buffer);
while (true)
{
foreach (var v in _connection)
{
try
{
((AsyncBinaryCommandInfo)v.callBack).Reply(v, buffer);
}
catch (Exception ex) { }
}
System.Threading.Thread.Sleep(1000);
}
}
/// <summary>
/// 心跳线程
/// </summary>
void SocketHeart()
{
while (!Globals.IsStop)
{
lock (_connection)
{
for (int i = _connection.Count - 1; i >= 0; i--)
if (_connection[i].HeartTime < DateTime.Now.AddSeconds(0 - (30 * 3)))//超过三个心跳包,干掉
{
MyService.WriteLog("Action:\"RemoveConnection\"", _connection[i]);
_connection.RemoveAt(i);
}
}
System.Threading.Thread.Sleep(1000 * 10);
}
}
Server端到这里就可以了。接下来的客户端。
客户端这里需要解释一下。客户端在发送一个指令到服务器端的同时,会启动一个监听,因为每一个指令都有一个唯一的SeqlID,客户端在发送后启动一个监听,收到来自Server端的回复,并检查SeqID相同以后会清除这个监听。因为回调不允许清除监听,否则将不能接收到消息。那我就从这个角度考虑,把问题解决掉。
问题的关键在于BaseSocketClient类的OnMessageReceied方法,其中有一个 this._requestCollection.Remove(response.SeqID); 其中 _requestCollection就是回调队列,Remove方法就是找到这个回调并加以删除。
protected override void OnMessageReceived(IConnection connection, MessageReceivedEventArgs e)
{
base.OnMessageReceived(connection, e);
int readlength;
TResponse response = null;
try
{
response = this._protocol.FindResponse(connection, e.Buffer, out readlength);
}
catch (Exception ex)
{
this.OnError(connection, ex);
connection.BeginDisconnect(ex);
e.SetReadlength(e.Buffer.Count);
return;
}
if (response != null)
{
this.OnResponse(connection, response)
<span style="">var request = this._requestCollection.Remove(response.SeqID);</span>
if (request == null)
ThreadPool.QueueUserWorkItem(_ =>
{
try { this.HandleUnknowResponse(connection, response); }
catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); }
});
else
ThreadPool.QueueUserWorkItem(_ =>
{
try { request.SetResult(response); }
catch (Exception ex) { SocketBase.Log.Trace.Error(ex.Message, ex); }
});
}
e.SetReadlength(readlength);
}
接下来找到Remove方法,将Remove加以修改。因为客户端启动的第一件事情就是执行回调操作,所以SeqID == 1 ,我们就把这个SeqID一直保留,同样找到对_dic执行Remove操作的地方加以修改。
public Request<TResponse> Remove(int seqID)
{
Request<TResponse> removed;
if (seqID == 1 && this._dic.ContainsKey(seqID)) return this._dic[seqID];
this._dic.TryRemove(seqID, out removed);
return removed;
}
倒数第二步。AsyncBinarySocketClient类里面添加回调操作。这里我做了一个回调事件。
public delegate void OnCallback(byte[] buffer);
public event OnCallback Callback = null;
并在Send方法里面添加该事件的实现。
public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, byte[] payload, Func<AsyncBinaryResponse, TResult> funcResultFactory, object asyncState = null)
{
if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName");
if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory");
var seqID = base.NextRequestSeqID();
var cmdLength = cmdName.Length;
var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + 6;
var sendBuffer = new byte[messageLength + 4];
//write message length
Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4);
//write seqID.
Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4);
//write response flag length.
Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 8, 2);
//write response flag
Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 10, cmdLength);
//write body buffer
if (payload != null && payload.Length > 0)
Buffer.BlockCopy(payload, 0, sendBuffer, 10 + cmdLength, payload.Length);
var source = new TaskCompletionSource<TResult>(asyncState);
base.Send(new Request<Response.AsyncBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer, ex => source.TrySetException(ex),
response =>
{
TResult result;
try { result = funcResultFactory(response); }
catch (Exception ex) { source.TrySetException(ex); return; }
source.TrySetResult(result);
<span style=""> if(cmdName == "Callback" && Callback != null)
{
Callback(result as byte[]);
}
</span> }));
return source.Task;
}
最后一步,客户端实现这个回调事件,并发送回调操作到服务器。
Globals.client.RegisterServerNode("127.0.0.1:8401", new System.Net.IPEndPoint(System.Net.IPAddress.Parse("127.0.0.1"), 8401));
Globals.client.Callback += client_Callback;
Globals.client.Send("Callback", new byte[] { 0x00 }, res => res.Buffer);