需要在.net平台下进行MQTT客户端编程时,开源且简单的MQTTnet无疑是首选。但是MQTTnet的客户端中在编写应用消息处理的相关方法就有点繁琐了,一般来说一个订阅主题都应该对应一个处理方法,最简单粗暴的方法就是向下面这样用 if/else 或者 switch 硬写,这样不仅代码丑陋还不便扩展。
public async Task Run(string clientId)
{
var option = new MqttClientOptionsBuilder()
.WithTcpServer(m_ip, 1883)
.WithClientId(clientId)
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311);
m_Client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(RecvMsg);
await m_Client.ConnectAsync(option.Build());
await m_Client.SubscribeAsync("test");
}
private void RecvMsg(MqttApplicationMessageReceivedEventArgs args)
{
if (!args.ProcessingFailed)
{
string topic = args.ApplicationMessage.Topic;
switch (topic)
{
case "home/light": break;
case "home/ac": break;
case "home/fan": break;
}
}
}
那么既然一个主题对应一个方法,那么不就有点像MVC的Controller吗?能不能利用特性+反射将主题名映射到对应的方法,接收到消息之后匹配调用呢?如果实现了,那不就可以摆脱丑陋的 if/else 代码了吗?扩展时只需要继承相应的类,然后打上特性确定主题名,跟MVC的Controller异曲同工。
[MqttTopic("geely/car")]
public class CarHandler: TopicHandler
{
[MqttTopic("speed")]
public void SpeedSubscribe()
{
// 对应主题geely/car/speed
}
}
考虑到大多数小伙伴可能时间比较宝贵,这里建议伸手党直接转跳最后一节获取源代码。只有一个类库,引入即可使用。
参考MVC中的Controller都继承于ControllerBase,我们也需要一个基类用来向处理方法传递应用消息(ApplicationMessage)。在程序启动时扫描指定程序集里所有继承于TopicHandler的类并获取它里面定义的处理方法。
/// <summary>
/// MQTT主题订阅处理器
/// </summary>
public class TopicHandler
{
/// <summary>
/// 自己的Id
/// </summary>
public string ClietntId { get; internal set; }
/// <summary>
/// 消息
/// </summary>
public MqttApplicationMessage ApplicationMessage { get; internal set; }
}
众所周知,MVC中可以用HttpGet/HttpPost等特性指定路由和http方法,那么我们这里也需要一个MqttTopic特性来指定主题名和服务质量
/// <summary>
/// MQTT主题
/// </summary>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method)]
public class MqttTopicAttribute : Attribute
{
/// <summary>
/// 主题名
/// </summary>
public string Topic { get; set; }
/// <summary>
/// 服务质量
/// </summary>
public MqttQualityOfServiceLevel QoS { get; set; }
public MqttTopicAttribute(string topic, MqttQualityOfServiceLevel qos)
{
Topic = topic;
QoS = qos;
}
public MqttTopicAttribute(string topic)
{
Topic = topic;
QoS = MqttQualityOfServiceLevel.AtMostOnce;
}
}
既然是参考MVC的Controller设计,那么在MVC的Controller中,一条路由只能对应一个Controller的方法,而且路由和方法在程序启动时就做好了绑定工作。我们的设计也参考以上的方案,一个订阅主题只对应一个处理方法,在程序启动时就指定需要加载的程序集,扫描指定程序集获取打上了MqttTopic特性的方法并记录下类型,方法和对应的主题名。当接收到主题的应用消息时,根据主题名去查找,找到类型和方法,随后创建对象并调用方法。不过既然涉及到对象的创建,这就带来另一个问题:如何进行TopicHandler派生类对象的创建?
还是参考MVC中Controller的设计,众所周知,MVC中的Controller是通过依赖注入的方式创建的,每个请求都会创建一个对应的Controller,并且在Controller的构造方法中可以继续使用依赖注入获得其他对象。那么这里我们也采用依赖注入的形式,将每个TopicHandler以Scope的作用域自动注册进依赖注入容器,要用的时候再通过依赖注入容器去拿。这里就有一个硬性要求,就是你的程序中也必须使用依赖注入来管理对象的创建,原因是使用依赖注入有一个很基础的原则就是:必须从头到尾都使用依赖注入。如果你的程序没有使用依赖注入,并且也没有打算调整为使用依赖注入的话,那么非常抱歉浪费了你人生中宝贵的几分钟时间。
用过ASP.net Core MVC的朋友们都知道在程序启动时可以通过对 IServiceCollection 调用扩展方法来向API添加各种功能。
例如:
public void ConfigureServices(IServiceCollection services)
{
string conStr = Configuration.GetSection("Connection")["MySQLConnection"];
SurveyhelperContext.connectionString = conStr;
string key = Configuration.GetValue<string>("SecurityKey");//ConfigHelper.Config["SecurityKey"];
services.AddMemoryCache();
services.AddControllers()
.AddNewtonsoftJson(options => { options.SerializerSettings.ContractResolver = new DefaultContractResolver(); });
}
所以继续参考MVC的设计我们也把上面的内容做成扩展方法 UseMqttTopicHandler ,在程序启动时对 ServiceCollection 调用即可。
// 使用依赖注入
ServiceCollection collection = new ServiceCollection();
// 1. 配置扩展处理器
collection.UseMqttTopicHandler(option => {
// 2. 添加当前程序集的所有TopicHandler
option.AddMqttTopicHandlers(this.GetType().Assembly);
});
// 构建依赖注入容器
var service = collection.BuildServiceProvider();
最终效果
public async Task Run()
{
string ip = "127.0.0.1";
int port = 1883;
string clientId = "TestClient";
// 使用依赖注入
ServiceCollection collection = new ServiceCollection();
// 1. 配置扩展处理器
collection.UseMqttTopicHandler(option => {
// 2. 添加当前程序集的所有TopicHandler
option.AddMqttTopicHandlers(this.GetType().Assembly);
});
// 构建依赖注入容器
var service = collection.BuildServiceProvider();
// 配置客户端
MqttClientOptionsBuilder option = new MqttClientOptionsBuilder();
option.WithTcpServer(ip, port)
.WithProtocolVersion(Formatter.MqttProtocolVersion.V311)
.WithClientId(clientId);
MqttFactory mqttFactory = new MqttFactory();
var client = mqttFactory.CreateMqttClient();
// 3. 配置扩展后即可取得处理器,然后设置处理器
var handler = service.GetRequiredService<IMqttApplicationMessageReceivedHandler>();
client.ApplicationMessageReceivedHandler = handler;
await client.ConnectAsync(option.Build());
// 4. 订阅处理器对应的主题
await client.SubscribeTopicsAsync();
Console.WriteLine("输入回车退出");
Console.ReadLine();
await client.DisconnectAsync();
service.Dispose();
}
使用步骤大致如上,这里要注意的是:客户端的ApplicationMessageReceivedHandler必须通过依赖注入获取(获取我们事先通过扩展方法注入进去的类型)。
Github连不上的朋友们可以试试Gitee,后续如果有更新我将优先往Gitee推送
如果这篇博文有幸帮助到你,请不要吝啬你的点赞和星星。