当前位置: 首页 > 工具软件 > MessagePack > 使用案例 >

DotNetty系列四:自定义协议,序列化类库MessagePack

陈志
2023-12-01

在上一节基础上,实现自定义协议,使用MessagePack序列化。

1上引入:Nuget:  MessagePack,MessagePackAnalyzer

2..在类库项目,新建一个枚举(标识命令),一个消息接口,一个消息实现。

    public enum COMMAND
    {
        /// <summary>
        /// 心跳
        /// </summary>
        HeartBeat = 1000,
        /// <summary>
        /// 消息
        /// </summary>
        Message = 1,
        /// <summary>
        /// 上线通知
        /// </summary>
        OnlineNotify = 1001,
        /// <summary>
        /// 下线通知
        /// </summary>
        OfflineNotify = 1002,
        /// <summary>
        /// 空消息
        /// </summary>
        NULL = 0,
    }

    public interface IMessage
    {
        COMMAND Command { set; get; }
        string Content { get; set; }
    }

    [MessagePackObject] //messagepack必须
    public class Message : IMessage
    {
        [Key(0)]//排序
        public COMMAND Command { get; set; }
        [Key(1)]
        public string Content { get; set; }
    }

3.新建一个MessagePack序列化操作类。

public static class MessagePackHelper
    {
        public static byte[] SerializeToBinary<T>(T obj) => MessagePackSerializer.Serialize(obj);

        public static T DeserializeWithBinary<T>(byte[] data) => MessagePackSerializer.Deserialize<T>(data);

        public static string DeserializeJson(byte[] data) => MessagePackSerializer.ToJson(data);

        public static string DeserializeJson<T>(T data) => MessagePackSerializer.ToJson(data);

        public static byte[] SerializeJson(string data) => MessagePackSerializer.FromJson(data);

        public static T DeserializeToJson<T>(string data) => MessagePackSerializer.Deserialize<T>(SerializeJson(data));
   }

4.修改编码解码器

编码器:

    public class ServerMessagePackEncoder<T> : MessageToByteEncoder<T>
    {
        protected override void Encode(IChannelHandlerContext context, T message, IByteBuffer output)
        {
            //序列化类
            byte[] messageBytes = MessagePackHelper.SerializeToBinary(message);
            IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
            initialMessage.WriteBytes(messageBytes);

            output.WriteBytes(initialMessage);
        }

    }

    public class ClientMessagePackEncoder<T> : MessageToByteEncoder<T>
    {
        protected override void Encode(IChannelHandlerContext context, T message, IByteBuffer output)
        {
            //序列化类
            byte[] messageBytes = MessagePackHelper.SerializeToBinary(message);
            IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
            initialMessage.WriteBytes(messageBytes);

            output.WriteBytes(initialMessage);
        }
    }

解码器:

    public class CommonServerDecoder : ByteToMessageDecoder
    {
        protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
        {
            byte[] array = new byte[input.ReadableBytes];
            input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
            input.Clear();
            var temp = MessagePackHelper.DeserializeWithBinary<Message>(array);
            output.Add(temp);
        }
    }

    public class CommonClientDecoder : ByteToMessageDecoder
    {
        protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
        {
            byte[] array = new byte[input.ReadableBytes];
            input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
            input.Clear();
            var temp = MessagePackHelper.DeserializeWithBinary<Message>(array);
            output.Add(temp);
        }
    }

5.服务端里修改:

                        pipeline.AddLast(new ServerMessagePackEncoder<Message>());
                        pipeline.AddLast(new CommonServerDecoder());

客户端里修改:

                        pipeline.AddLast(new ClientMessagePackEncoder<Message>());
                        pipeline.AddLast(new CommonClientDecoder());

6.服务端接收和发送:

        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            if (message is Message oo)
            {
                switch(oo.Command)
                {
                    case COMMAND.Message:
                        Console.WriteLine($"解码器方式,从客户端接收:{oo.Content}:{DateTime.Now}");
                        Message ms = new Message { Command = COMMAND.Message, Content = "服务端从客户端接收到内容后返回,我是服务端" };
                        context.WriteAndFlushAsync(ms);
                        return;
                    default:
                        Console.WriteLine($"解码器方式,客户端心跳:{oo.Content}");
                        Message hb = new Message { Command = COMMAND.Message, Content = "服务端接收到心跳连接" };
                        context.WriteAndFlushAsync(hb);//写入输出流
                        return;
                }
            }
        }
将服务端所有要发送的位置,改成上面的写法。

客户端接收和发送:

        public override void ChannelActive(IChannelHandlerContext context)
        {
            Console.WriteLine("我是客户端.");
            Console.WriteLine($"连接至服务端{context}.");

            Message ms = new Message { Command = COMMAND.Message, Content = "客户端1。" };
            context.WriteAndFlushAsync(ms);
        }

        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            if (message is Message oo)
            {
                Console.WriteLine($"Decoder方式,从服务端接收:{oo.Content}");
            }
        }

这样就使用,自定义协议加MessagePack序列化,实现了上一节一样的效果。

附加:List方式接收

如果要实现List接收。就在解码器里将消息加入List。

            List<Message> tl = new List<Message> { temp, new Message { Command = COMMAND.Message, Content = "888" } };
            output.Add(tl);

接收的时候做下面的判断:

            if (message is List<Message> lo)
            {
                lo.ForEach(s => Console.WriteLine($"Decoder方式,从服务端接收:{s.Content}"));
            }

项目下载地址:项目下载

 类似资料: