在上一节基础上,实现自定义协议,使用MessagePack序列化。
1上引入:Nuget: MessagePack,MessagePackAnalyzer
2..在类库项目,新建一个枚举(标识命令),一个消息接口,一个消息实现。
public enum COMMAND
{
/// <summary>
/// 心跳
/// </summary>
HeartBeat = 1000,
/// <summary>
/// 消息
/// </summary>
Message = 1,
/// <summary>
/// 上线通知
/// </summary>
OnlineNotify = 1001,
/// <summary>
/// 下线通知
/// </summary>
OfflineNotify = 1002,
/// <summary>
/// 空消息
/// </summary>
NULL = 0,
}
public interface IMessage
{
COMMAND Command { set; get; }
string Content { get; set; }
}
[MessagePackObject] //messagepack必须
public class Message : IMessage
{
[Key(0)]//排序
public COMMAND Command { get; set; }
[Key(1)]
public string Content { get; set; }
}
3.新建一个MessagePack序列化操作类。
public static class MessagePackHelper
{
public static byte[] SerializeToBinary<T>(T obj) => MessagePackSerializer.Serialize(obj);
public static T DeserializeWithBinary<T>(byte[] data) => MessagePackSerializer.Deserialize<T>(data);
public static string DeserializeJson(byte[] data) => MessagePackSerializer.ToJson(data);
public static string DeserializeJson<T>(T data) => MessagePackSerializer.ToJson(data);
public static byte[] SerializeJson(string data) => MessagePackSerializer.FromJson(data);
public static T DeserializeToJson<T>(string data) => MessagePackSerializer.Deserialize<T>(SerializeJson(data));
}
4.修改编码解码器
编码器:
public class ServerMessagePackEncoder<T> : MessageToByteEncoder<T>
{
protected override void Encode(IChannelHandlerContext context, T message, IByteBuffer output)
{
//序列化类
byte[] messageBytes = MessagePackHelper.SerializeToBinary(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}
public class ClientMessagePackEncoder<T> : MessageToByteEncoder<T>
{
protected override void Encode(IChannelHandlerContext context, T message, IByteBuffer output)
{
//序列化类
byte[] messageBytes = MessagePackHelper.SerializeToBinary(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}
解码器:
public class CommonServerDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
var temp = MessagePackHelper.DeserializeWithBinary<Message>(array);
output.Add(temp);
}
}
public class CommonClientDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
var temp = MessagePackHelper.DeserializeWithBinary<Message>(array);
output.Add(temp);
}
}
5.服务端里修改:
pipeline.AddLast(new ServerMessagePackEncoder<Message>());
pipeline.AddLast(new CommonServerDecoder());
客户端里修改:
pipeline.AddLast(new ClientMessagePackEncoder<Message>());
pipeline.AddLast(new CommonClientDecoder());
6.服务端接收和发送:
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is Message oo)
{
switch(oo.Command)
{
case COMMAND.Message:
Console.WriteLine($"解码器方式,从客户端接收:{oo.Content}:{DateTime.Now}");
Message ms = new Message { Command = COMMAND.Message, Content = "服务端从客户端接收到内容后返回,我是服务端" };
context.WriteAndFlushAsync(ms);
return;
default:
Console.WriteLine($"解码器方式,客户端心跳:{oo.Content}");
Message hb = new Message { Command = COMMAND.Message, Content = "服务端接收到心跳连接" };
context.WriteAndFlushAsync(hb);//写入输出流
return;
}
}
}
将服务端所有要发送的位置,改成上面的写法。
客户端接收和发送:
public override void ChannelActive(IChannelHandlerContext context)
{
Console.WriteLine("我是客户端.");
Console.WriteLine($"连接至服务端{context}.");
Message ms = new Message { Command = COMMAND.Message, Content = "客户端1。" };
context.WriteAndFlushAsync(ms);
}
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is Message oo)
{
Console.WriteLine($"Decoder方式,从服务端接收:{oo.Content}");
}
}
这样就使用,自定义协议加MessagePack序列化,实现了上一节一样的效果。
附加:List方式接收
如果要实现List接收。就在解码器里将消息加入List。
List<Message> tl = new List<Message> { temp, new Message { Command = COMMAND.Message, Content = "888" } };
output.Add(tl);
接收的时候做下面的判断:
if (message is List<Message> lo)
{
lo.ForEach(s => Console.WriteLine($"Decoder方式,从服务端接收:{s.Content}"));
}
项目下载地址:项目下载