基于MQTTnet客户端的扩展,使订阅消息处理像MVC的Controller一样简单

勾炜
2023-12-01

初始想法

需要在.net平台下进行MQTT客户端编程时,开源且简单的MQTTnet无疑是首选。但是MQTTnet的客户端中在编写应用消息处理的相关方法就有点繁琐了,一般来说一个订阅主题都应该对应一个处理方法,最简单粗暴的方法就是向下面这样用 if/else 或者 switch 硬写,这样不仅代码丑陋还不便扩展。

public async Task Run(string clientId)
{
    var option = new MqttClientOptionsBuilder()
        .WithTcpServer(m_ip, 1883)
        .WithClientId(clientId)
        .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);
    m_Client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(RecvMsg);
    await m_Client.ConnectAsync(option.Build());

    await m_Client.SubscribeAsync("test");
}    

private void RecvMsg(MqttApplicationMessageReceivedEventArgs args)
{
    if (!args.ProcessingFailed)
    {
        string topic = args.ApplicationMessage.Topic;
        switch (topic)
        {
            case "home/light": break;
            case "home/ac": break;
            case "home/fan": break;
        }
    }
}

那么既然一个主题对应一个方法,那么不就有点像MVC的Controller吗?能不能利用特性+反射将主题名映射到对应的方法,接收到消息之后匹配调用呢?如果实现了,那不就可以摆脱丑陋的 if/else 代码了吗?扩展时只需要继承相应的类,然后打上特性确定主题名,跟MVC的Controller异曲同工。

[MqttTopic("geely/car")]
public class CarHandler: TopicHandler
{
    [MqttTopic("speed")]
    public void SpeedSubscribe()
    {
        // 对应主题geely/car/speed
    }
}

考虑到大多数小伙伴可能时间比较宝贵,这里建议伸手党直接转跳最后一节获取源代码。只有一个类库,引入即可使用。

开始动手

TopicHandler基类和MqttTopic特性

参考MVC中的Controller都继承于ControllerBase,我们也需要一个基类用来向处理方法传递应用消息(ApplicationMessage)。在程序启动时扫描指定程序集里所有继承于TopicHandler的类并获取它里面定义的处理方法。

/// <summary>
/// MQTT主题订阅处理器
/// </summary>
public class TopicHandler
{
    /// <summary>
    /// 自己的Id
    /// </summary>
    public string ClietntId { get; internal set; }

    /// <summary>
    /// 消息
    /// </summary>
    public MqttApplicationMessage ApplicationMessage { get; internal set; }
}

众所周知,MVC中可以用HttpGet/HttpPost等特性指定路由和http方法,那么我们这里也需要一个MqttTopic特性来指定主题名和服务质量

/// <summary>
/// MQTT主题
/// </summary>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class MqttTopicAttribute : Attribute
{
    /// <summary>
    /// 主题名
    /// </summary>
    public string Topic { get; set; }

    /// <summary>
    /// 服务质量
    /// </summary>
    public MqttQualityOfServiceLevel QoS { get; set; }

    public MqttTopicAttribute(string topic, MqttQualityOfServiceLevel qos)
    {
        Topic = topic;
        QoS = qos;
    }

    public MqttTopicAttribute(string topic)
    {
        Topic = topic;
        QoS = MqttQualityOfServiceLevel.AtMostOnce;
    }
}

记录订阅主题和处理方法的映射关系

既然是参考MVC的Controller设计,那么在MVC的Controller中,一条路由只能对应一个Controller的方法,而且路由和方法在程序启动时就做好了绑定工作。我们的设计也参考以上的方案,一个订阅主题只对应一个处理方法,在程序启动时就指定需要加载的程序集,扫描指定程序集获取打上了MqttTopic特性的方法并记录下类型,方法和对应的主题名。当接收到主题的应用消息时,根据主题名去查找,找到类型和方法,随后创建对象并调用方法。不过既然涉及到对象的创建,这就带来另一个问题:如何进行TopicHandler派生类对象的创建?

解决对象创建问题

还是参考MVC中Controller的设计,众所周知,MVC中的Controller是通过依赖注入的方式创建的,每个请求都会创建一个对应的Controller,并且在Controller的构造方法中可以继续使用依赖注入获得其他对象。那么这里我们也采用依赖注入的形式,将每个TopicHandler以Scope的作用域自动注册进依赖注入容器,要用的时候再通过依赖注入容器去拿。这里就有一个硬性要求,就是你的程序中也必须使用依赖注入来管理对象的创建,原因是使用依赖注入有一个很基础的原则就是:必须从头到尾都使用依赖注入。如果你的程序没有使用依赖注入,并且也没有打算调整为使用依赖注入的话,那么非常抱歉浪费了你人生中宝贵的几分钟时间。

使用扩展方法

用过ASP.net Core MVC的朋友们都知道在程序启动时可以通过对 IServiceCollection 调用扩展方法来向API添加各种功能。

例如:

public void ConfigureServices(IServiceCollection services)
{
    string conStr = Configuration.GetSection("Connection")["MySQLConnection"];
    SurveyhelperContext.connectionString = conStr;
    string key = Configuration.GetValue<string>("SecurityKey");//ConfigHelper.Config["SecurityKey"];
    services.AddMemoryCache();    
    services.AddControllers()
        .AddNewtonsoftJson(options => { options.SerializerSettings.ContractResolver = new DefaultContractResolver(); });
}

所以继续参考MVC的设计我们也把上面的内容做成扩展方法 UseMqttTopicHandler ,在程序启动时对 ServiceCollection 调用即可。

// 使用依赖注入
ServiceCollection collection = new ServiceCollection();
// 1. 配置扩展处理器
collection.UseMqttTopicHandler(option => {
    // 2. 添加当前程序集的所有TopicHandler
    option.AddMqttTopicHandlers(this.GetType().Assembly);
});
// 构建依赖注入容器
var service = collection.BuildServiceProvider();

最终效果及完整代码

最终效果

public async Task Run()
{
    string ip = "127.0.0.1";
    int port = 1883;
    string clientId = "TestClient";

    // 使用依赖注入
    ServiceCollection collection = new ServiceCollection();
    // 1. 配置扩展处理器
    collection.UseMqttTopicHandler(option => {
        // 2. 添加当前程序集的所有TopicHandler
        option.AddMqttTopicHandlers(this.GetType().Assembly);
    });
    // 构建依赖注入容器
    var service = collection.BuildServiceProvider();

    // 配置客户端
    MqttClientOptionsBuilder option = new MqttClientOptionsBuilder();
    option.WithTcpServer(ip, port)
        .WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
        .WithClientId(clientId);

    MqttFactory mqttFactory = new MqttFactory();
    var client = mqttFactory.CreateMqttClient();
    // 3. 配置扩展后即可取得处理器,然后设置处理器
    var handler = service.GetRequiredService<IMqttApplicationMessageReceivedHandler>();
    client.ApplicationMessageReceivedHandler = handler;

    await client.ConnectAsync(option.Build());

    // 4. 订阅处理器对应的主题
    await client.SubscribeTopicsAsync();

    Console.WriteLine("输入回车退出");
    Console.ReadLine();

    await client.DisconnectAsync();
    service.Dispose();
}

使用步骤大致如上,这里要注意的是:客户端的ApplicationMessageReceivedHandler必须通过依赖注入获取(获取我们事先通过扩展方法注入进去的类型)。

Github

MQTTnet.Client.Extensions

Gitee

Github连不上的朋友们可以试试Gitee,后续如果有更新我将优先往Gitee推送

MQTTnet.Client.Extensions

如果这篇博文有幸帮助到你,请不要吝啬你的点赞和星星。

 类似资料: