C#中使用MQTTnet库实现MQTT通信
在C#中使用MQTTnet库实现MQTT通信的完整流程如下,涵盖客户端创建、连接、订阅/发布消息及错误处理:
一、环境准备
1. 安装MQTTnet库
通过NuGet安装:
bash
dotnet add package MQTTnet
2. 创建控制台应用
示例代码基于控制台应用演示。
二、客户端创建与连接
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
// 创建客户端工厂
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
// 配置连接参数
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com", 1883) // 默认公共代理
.WithClientId("CSharpClient_001") // 客户端ID
.WithCleanSession() // 断开后清除会话
.Build();
// 异步连接
await client.ConnectAsync(options);
三、订阅消息
// 订阅主题(示例:test/topic)
await client.SubscribeAsync(new[] {
new MqttTopicFilterBuilder()
.WithTopic("test/topic")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.Build()
});
// 处理消息到达事件
client.UseApplicationMessageReceivedHandler(e => {
Console.WriteLine(#34;Received: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
});
四、发布消息
// 发布消息到主题
var message = new MqttApplicationMessageBuilder()
.WithTopic("test/topic")
.WithPayload("Hello MQTTnet!")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true) // 保留消息
.Build();
await client.PublishAsync(message);
五、错误处理与断开连接
try {
// 主循环保持连接
while (true) {
await Task.Delay(Timeout.InfiniteTimeSpan);
}
} catch (Exception ex) {
Console.WriteLine(#34;Error: {ex.Message}");
} finally {
// 异步断开连接
await client.DisconnectAsync();
client.Dispose();
}
六、完整代码示例
using System.Text;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
class Program {
static async Task Main(string[] args) {
var factory = new MqttFactory();
var client = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com", 1883)
.WithClientId("CSharpClient_001")
.Build();
await client.ConnectAsync(options);
// 订阅
await client.SubscribeAsync(new[]
{ new MqttTopicFilterBuilder().WithTopic("test/#").Build() });
client.UseApplicationMessageReceivedHandler(e =>
Console.WriteLine(#34;Topic: {e.ApplicationMessage.Topic}, Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"));
// 发布
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic("test/topic")
.WithPayload("Hello World!")
.Build());
await Task.Delay(10000);
await client.DisconnectAsync();
}
}
七、关键配置说明
1. QoS级别
- AtMostOnce(0):可能丢失,性能最优
- AtLeastOnce(1):确保到达,可能重复
- ExactlyOnce(2):严格一次,开销最大
2. 保留消息
设置WithRetainFlag(true)后,代理会保留最后一条消息供新订阅者获取。
3. 心跳机制
通过WithKeepAlivePeriod()设置心跳间隔,默认30秒。
八、高级功能扩展
- TLS加密:使用.WithTls()配置SSL/TLS连接
- 认证:通过.WithCredentials(username, password)添加凭据
- Last Will:设置遗嘱消息(客户端异常断开时触发)
> 提示:实际应用中建议使用异步方法(如ConnectAsync)避免阻塞主线程。更多配置可参考。