MQTT协议
MQTT协议是基于TCP传输协议之上的应用层协议,全程Message Queuing Telemetry Transport。主要用于物联网设备间的通信,在低带宽、不稳定网络环境下的优势非常明显。 当然普通的通信业务开发也是完全可以使用的。MQTT协议采用客户端-服务端架构模式,实现了发布/订阅模式。本篇文章的代码,我们也是主要来实现MQTT的发布订阅功能。本片文章的实现代码已经上传到仓库:https://gitee.com/jiamingyuanjin_admin/mqttapp
MQTT由IBM在1999年开发,并在2013年成为OASIS标准。MQTT协议非常适合物联网(IoT)设备之间的通信。
MQTT协议优点
1. 轻量级:MQTT协议头非常小,适合资源受限的设备和低带宽网络。
2. 发布/订阅模式:支持一对多的消息传递,简化了消息的分发。
3. QoS(服务质量)级别:提供三种服务质量级别(0、1、2),确保消息的可靠传输。
4. 持久会话:支持持久会话,客户端可以在断开连接后重新连接并继续接收未接收的消息。
5. 保留消息:支持保留消息,新的订阅者可以立即接收到最新的消息。
6. 低功耗:适合电池供电的设备,支持长时间的连接保持和低功耗操作。
MQTT协议缺点
1. 安全性:默认情况下,MQTT没有内置的加密和认证机制,需要额外配置TLS/SSL来确保安全。
2. 复杂性:对于简单的点对点通信,MQTT的发布/订阅模式可能显得过于复杂。
3. 依赖于Broker:MQTT通信依赖于Broker,如果Broker出现故障,整个系统的通信将受到影响。
MQTT协议使用场景
1. 物联网(IoT):MQTT广泛应用于物联网设备之间的通信,如智能家居、工业自动化、环境监测等。
2. 移动应用:适用于需要低带宽和低功耗的移动应用,如实时消息推送、位置跟踪等。
3. 远程监控:用于远程监控和控制系统,如远程医疗、智能农业等。
4. 车联网:用于车辆之间和车辆与基础设施之间的通信,如车载信息系统、自动驾驶等。
5. 智能城市:用于智能城市中的各种传感器和设备之间的通信,如智能交通、智能照明等。
MQTT核心角色
想要实现MQTT完整的功能,需要有三个角色:消息发布者(publisher)、代理服务器(broker)、消息订阅者(subscriber)。其中消息发布者负责将消息发布到代理服务器,消息订阅者可以订阅指定主题的消息。当发布者将消息发布到代理服务器时,服务器会通知所有的订阅者。
MQTT有一个很重要的概念:主题Topic,主题负责定义消息的数据结构。发布者就是将消息内容发布到一个主题,订阅者订阅主题。
MQTTnet
MQTT本身是一个标准的国际通信协议,类似于Http协议。任何开发语言都可以实现该协议。在C#中,微软官方帮助我们实现了该协议,其开源地址为:https://github.com/dotnet/MQTTnet
微软在该项目中,给我们提供了两个核心的类:MqttServer
和MqttClient
。其中MqttServer
作为代理服务器使用,负责接受和转发客户端的消息。消息发布者(publisher)和消息订阅者(subscriber)都用MqttClient
实现。本质上这两个都是一个客户端,连接到MqttServer
服务器。 同时,微软也给我们提供了具体的案例代码,强烈建议大家按照官方的案例代码实现自己的MQTT服务器和MQTT客户端。

MQTT服务端
MqttServer是我们启动MQTT服务器或者MQTT代理的核心类。这个类提供了一系列丰富的事件,我们可以非常方便的处理自定义的业务逻辑。下面是这些事件的详细含义:
1. ApplicationMessageEnqueuedOrDroppedAsync:当应用消息被加入队列或被丢弃时触发。 2. ApplicationMessageNotConsumedAsync:应用消息未被消费时触发。 3. ClientAcknowledgedPublishPacketAsync:当客户端确认发布数据包时触发。 4. ClientConnectedAsync:当客户端连接到服务器时触发。 5. ClientDisconnectedAsync:当客户端断开连接时触发。 6. ClientSubscribedTopicAsync:当客户端订阅某个主题时触发。 7. ClientUnsubscribedTopicAsync:当客户端取消订阅某个主题时触发。 8. InterceptingClientEnqueueAsync:当客户端的消息被加入队列时触发。 9. InterceptingInboundPacketAsync: 当拦截到入站数据包时触发。 10. InterceptingOutboundPacketAsync:当拦截到出站数据包时触发。 11. InterceptingPublishAsync:当拦截到发布消息时触发。 12. InterceptingSubscriptionAsync:当拦截到订阅请求时触发。 13. InterceptingUnsubscriptionAsync:当拦截到取消订阅请求时触发。 14. LoadingRetainedMessageAsync:当加载保留消息时触发。 15. PreparingSessionAsync:当准备会话时触发。 16. QueuedApplicationMessageOverwrittenAsync:当队列中的应用消息被覆盖时触发。 17. RetainedMessageChangedAsync:当保留消息发生变化时触发。 18. RetainedMessagesClearedAsync:当保留消息被清除时触发。 19. SessionDeletedAsync:当会话被删除时触发。 20. StartedAsync:当服务器启动时触发。 21. StoppedAsync:当服务器停止时触发。 22. ValidatingConnectionAsync:当验证客户端连接时触发。
MqttServer定义了一个MqttServer
用于启动MQTT服务器。下面的代码 ,我们实现一个MQTTServer,同时监听了客户端连接事件、客户端断开连接事件、客户端发布消息事件、客户端订阅主题事件、客户端取消订阅主题事件。在构建MQTT Server的时候,需要一系列的服务端启动参数,这些参数由MqttServerOptions
类表示,并且通过非常经典的一种设置模式:构建者模式来构建该类。
internal class Program { static async Task Main(string[] args) { await InitServer(); } static async Task InitServer() { var mqttServerOptions = new MqttServerOptionsBuilder() .WithDefaultEndpoint() .Build(); var mqttServer = new MqttServerFactory().CreateMqttServer(mqttServerOptions); //客户端连接事件 mqttServer.ClientConnectedAsync += e => { Console.WriteLine($"客户端 {e.ClientId} 连接成功......"); return Task.CompletedTask; }; //客户端断开连接事件 mqttServer.ClientDisconnectedAsync += e => { Console.WriteLine($"客户端 {e.ClientId} 断开连接......"); return Task.CompletedTask; }; //客户端发布消息事件,客户端每次 mqttServer.InterceptingPublishAsync += e => { Console.WriteLine($"客户端 senderid:{e.ClientId} 发布主题:{e.ApplicationMessage.Topic}......"); return Task.CompletedTask; }; //客户端订阅主题事件 mqttServer.ClientSubscribedTopicAsync += e => { Console.WriteLine($"客户端 {e.ClientId} 订阅事件 :{e.TopicFilter.Topic}......"); return Task.CompletedTask; }; //客户端取消订阅事件 mqttServer.ClientUnsubscribedTopicAsync += e => { Console.WriteLine($"客户端 {e.ClientId} 取消订阅事件 :{e.TopicFilter}......"); return Task.CompletedTask; }; await mqttServer.StartAsync(); Console.WriteLine("MQTT server 启动成功......"); Console.WriteLine("按下任意键退出......"); Console.ReadKey(); await mqttServer.StopAsync(); } }
MQTT发布者
MQTT客户端通过MqttClient
来实现,微软官方用IMqttClient
接口来表示,这意味着,我们完全可以在官方的基础上实现我们自己的MQTT客户端类。当然官方实现的默认MTQQ客户端类MqttClient
一般也基本上满足我们的业务需求。
和MqttServer
一样,在构建MqttClient
客户端类的时候,同样也需要客户端的启动参数,该参数用MqttClientOptions
表示,同样通过构建者模式来构建该类的实例对象。
下面是我们实现的MQTT发布者类,该类间隔1s就会向服务器发布主题消息,通过调用PublishAsync
方法向MQTT服务器发送主题消息。
internal class Program { static async Task Main(string[] args) { await Publish_Application_Message(); } public static async Task Publish_Application_Message() { var mqttFactory = new MqttClientFactory(); var mqttClient = mqttFactory.CreateMqttClient(); var mqttClientOptions = new MqttClientOptionsBuilder() .WithTcpServer("127.0.0.1") .Build(); //客户端向服务器发起连接 await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); Console.WriteLine("连接MQTT服务器成功..."); while (true) { var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic("mytopic") .WithPayload("time:" + DateTime.Now.ToString()) .Build(); var result = await mqttClient.PublishAsync(applicationMessage, CancellationToken.None); Console.WriteLine("成功向MQTT服务器发布消息....."); await Task.Delay(1000); } } }
MQTT订阅者
实现了MQTT的发布者之后,我们来实现MQTT的最后一个角色:订阅者。当订阅者订阅了某个主题的时候,如果发布者发送消息的服务器,服务器会将这些消息全部通知 给已订阅的订阅者。
internal class Program { static async Task Main(string[] args) { await Handle_Received_Application_Message(); } public static async Task Handle_Received_Application_Message() { var mqttFactory = new MqttClientFactory(); using (var mqttClient = mqttFactory.CreateMqttClient()) { var mqttClientOptions = new MqttClientOptionsBuilder().WithTcpServer("127.0.0.1").Build(); mqttClient.ApplicationMessageReceivedAsync += e => { Console.WriteLine($"接收到消息:clientid:{e.ClientId},topic:{e.ApplicationMessage.Topic},message:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload.First.Span)}"); return Task.CompletedTask; }; await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None); var mqttSubscribeOptions = mqttFactory. CreateSubscribeOptionsBuilder() .WithTopicFilter("mytopic") .Build(); await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None); Console.WriteLine("MQTT客户端成功订阅主题:mytopic......"); Console.WriteLine("按下任意键退出......"); Console.ReadLine(); var mqttUnSubscribeOptions = mqttFactory. CreateUnsubscribeOptionsBuilder() .WithTopicFilter("mytopic") .Build(); //取消订阅 await mqttClient.UnsubscribeAsync(mqttUnSubscribeOptions); Console.ReadLine(); } } }
运行项目
我这里的三个角色分别用三个控制台程序表示:

接下来我们运行MQTTServer、MQTTPublisher和MQTTSubscriber。看下运行效果:
服务端运行结果如下:可以看到客户端成功连接,并且不断的在发布消息到服务端。

发布者运行效果如下,可以看到发布者间隔1s向服务端发送消息。

订阅者运行效果如下,可以看到订阅者订阅了相同名称的主题之后,会不断的接收到发布者发布的消息。
