MQTT 物联网通信
大约 9 分钟约 2608 字
MQTT 物联网通信
简介
MQTT(Message Queuing Telemetry Transport)是物联网领域最流行的轻量级消息协议,适用于低带宽、不稳定网络环境。WPF 上位机通过 MQTT 与下位机(PLC、传感器、网关)通信,实现数据采集、指令下发和状态监控。.NET 使用 MQTTnet 库接入 MQTT Broker。
特点
MQTT 客户端
连接管理
/// <summary>
/// MQTT 客户端服务
/// </summary>
using MQTTnet;
using MQTTnet.Client;
public class MqttService : ObservableObject, IDisposable
{
private IMqttClient? _client;
private MqttClientOptions? _options;
[ObservableProperty]
private bool _isConnected;
[ObservableProperty]
private string _connectionStatus = "未连接";
public event Action<string, string>? MessageReceived;
public async Task ConnectAsync(string broker, int port, string clientId)
{
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port)
.WithClientId(clientId)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithCredentials("username", "password")
.Build();
// 连接成功回调
_client.ConnectedAsync += e =>
{
IsConnected = true;
ConnectionStatus = $"已连接 {broker}:{port}";
return Task.CompletedTask;
};
// 断开连接回调
_client.DisconnectedAsync += async e =>
{
IsConnected = false;
ConnectionStatus = $"已断开:{e.Reason}";
// 自动重连
await Task.Delay(5000);
try
{
await _client.ConnectAsync(_options);
}
catch { }
};
// 消息接收回调
_client.ApplicationMessageReceivedAsync += e =>
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);
MessageReceived?.Invoke(topic, payload);
return Task.CompletedTask;
};
await _client.ConnectAsync(_options);
}
// 订阅主题
public async Task SubscribeAsync(string topic)
{
if (_client?.IsConnected != true) return;
await _client.SubscribeAsync(new MqttTopicFilterBuilder()
.WithTopic(topic)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
}
// 发布消息
public async Task PublishAsync(string topic, string payload, bool retain = false)
{
if (_client?.IsConnected != true) return;
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(retain)
.Build();
await _client.PublishAsync(message);
}
public async Task DisconnectAsync()
{
if (_client?.IsConnected == true)
await _client.DisconnectAsync();
}
public void Dispose()
{
_client?.Dispose();
}
}设备通信
上位机与设备通信
/// <summary>
/// 设备 MQTT 通信管理
/// </summary>
public class DeviceMqttManager
{
private readonly MqttService _mqtt;
private readonly ILogger<DeviceMqttManager> _logger;
// Topic 规范
// factory/{factoryId}/device/{deviceId}/status — 设备状态上报
// factory/{factoryId}/device/{deviceId}/data — 数据上报
// factory/{factoryId}/device/{deviceId}/command — 指令下发
// factory/{factoryId}/device/{deviceId}/response — 指令响应
public DeviceMqttManager(MqttService mqtt, ILogger<DeviceMqttManager> logger)
{
_mqtt = mqtt;
_logger = logger;
_mqtt.MessageReceived += OnMessageReceived;
}
// 订阅所有设备数据
public async Task SubscribeAllDevicesAsync(string factoryId)
{
await _mqtt.SubscribeAsync($"factory/{factoryId}/device/+/status");
await _mqtt.SubscribeAsync($"factory/{factoryId}/device/+/data");
await _mqtt.SubscribeAsync($"factory/{factoryId}/device/+/response");
}
// 发送指令到设备
public async Task SendCommandAsync(string factoryId, string deviceId, string command, object payload)
{
var topic = $"factory/{factoryId}/device/{deviceId}/command";
var json = JsonSerializer.Serialize(new
{
command,
payload,
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
});
await _mqtt.PublishAsync(topic, json);
}
// 处理接收到的消息
private void OnMessageReceived(string topic, string payload)
{
try
{
var parts = topic.Split('/');
string factoryId = parts[1];
string deviceId = parts[3];
string type = parts[4];
switch (type)
{
case "status":
HandleDeviceStatus(deviceId, payload);
break;
case "data":
HandleDeviceData(deviceId, payload);
break;
case "response":
HandleCommandResponse(deviceId, payload);
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "MQTT 消息处理失败:{Topic}", topic);
}
}
private void HandleDeviceStatus(string deviceId, string payload)
{
var status = JsonSerializer.Deserialize<DeviceStatus>(payload);
_logger.LogInformation("设备 {DeviceId} 状态:{Status}", deviceId, status?.State);
}
private void HandleDeviceData(string deviceId, string payload)
{
var data = JsonSerializer.Deserialize<Dictionary<string, double>>(payload);
// 更新 UI 数据
}
private void HandleCommandResponse(string deviceId, string payload)
{
var response = JsonSerializer.Deserialize<CommandResponse>(payload);
_logger.LogInformation("设备 {DeviceId} 响应:{Result}", deviceId, response?.Result);
}
}消息持久化与离线缓存
消息本地缓存
/// <summary>
/// MQTT 消息离线缓存 — 网络断开时暂存消息
/// </summary>
public class MqttMessageCache
{
private readonly ConcurrentQueue<MqttCachedMessage> _pendingQueue = new();
private readonly string _cacheFilePath;
private readonly ILogger<MqttMessageCache> _logger;
private readonly int _maxCacheSize = 10000;
public MqttMessageCache(string cacheDir, ILogger<MqttMessageCache> logger)
{
Directory.CreateDirectory(cacheDir);
_cacheFilePath = Path.Combine(cacheDir, "mqtt_offline_cache.json");
_logger = logger;
}
// 缓存消息(离线时调用)
public void Enqueue(string topic, string payload, MqttQualityOfServiceLevel qos)
{
if (_pendingQueue.Count >= _maxCacheSize)
{
_logger.LogWarning("离线消息缓存已满,丢弃最早的消息");
_pendingQueue.TryDequeue(out _);
}
_pendingQueue.Enqueue(new MqttCachedMessage
{
Topic = topic,
Payload = payload,
QoS = qos,
Timestamp = DateTimeOffset.UtcNow
});
}
// 网络恢复后批量发送缓存消息
public async Task FlushAsync(MqttService mqttService)
{
int sent = 0;
while (_pendingQueue.TryDequeue(out var message))
{
try
{
await mqttService.PublishAsync(message.Topic, message.Payload);
sent++;
}
catch (Exception ex)
{
_logger.LogError(ex, "缓存消息发送失败:{Topic}", message.Topic);
// 放回队列头部
_pendingQueue.Enqueue(message);
break;
}
}
if (sent > 0)
_logger.LogInformation("已发送 {Count} 条离线缓存消息", sent);
}
// 持久化到磁盘
public async Task SaveToFileAsync()
{
var messages = _pendingQueue.ToArray();
var json = JsonSerializer.Serialize(messages, new JsonSerializerOptions { WriteIndented = true });
await File.WriteAllTextAsync(_cacheFilePath, json);
}
// 从磁盘恢复
public async Task LoadFromFileAsync()
{
if (!File.Exists(_cacheFilePath)) return;
try
{
var json = await File.ReadAllTextAsync(_cacheFilePath);
var messages = JsonSerializer.Deserialize<MqttCachedMessage[]>(json);
if (messages != null)
{
foreach (var msg in messages)
_pendingQueue.Enqueue(msg);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "加载离线缓存失败");
}
}
}
public record MqttCachedMessage
{
public string Topic { get; set; } = "";
public string Payload { get; set; } = "";
public MqttQualityOfServiceLevel QoS { get; set; }
public DateTimeOffset Timestamp { get; set; }
}QoS 选择策略
/// <summary>
/// MQTT QoS 选择指南
/// </summary>
public static class MqttQoSStrategy
{
/*
* QoS 0 — 至多一次(At most once)
* 适用场景:传感器高频数据上报、状态广播、日志流
* 特点:最快、可能丢消息、不保证送达
*
* QoS 1 — 至少一次(At least once)
* 适用场景:设备控制指令、配方下发、告警通知
* 特点:保证送达、可能重复、需要去重处理
*
* QoS 2 — 恰好一次(Exactly once)
* 适用场景:计费数据、关键交易、不可重复的指令
* 特点:最慢、保证恰好一次、四次握手
*/
// 根据消息类型选择 QoS
public static MqttQualityOfServiceLevel SelectQoS(string messageType)
{
return messageType switch
{
"telemetry" => MqttQualityOfServiceLevel.AtMostOnce, // QoS 0
"status" => MqttQualityOfServiceLevel.AtMostOnce, // QoS 0
"command" => MqttQualityOfServiceLevel.AtLeastOnce, // QoS 1
"alarm" => MqttQualityOfServiceLevel.AtLeastOnce, // QoS 1
"recipe" => MqttQualityOfServiceLevel.ExactlyOnce, // QoS 2
"billing" => MqttQualityOfServiceLevel.ExactlyOnce, // QoS 2
_ => MqttQualityOfServiceLevel.AtMostOnce
};
}
}遗嘱消息与断线检测
Last Will 配置
/// <summary>
/// MQTT 遗嘱消息 — 设备异常断开时自动发布
/// </summary>
public static class MqttLastWillConfig
{
public static MqttClientOptions BuildWithLastWill(
string broker, int port, string clientId,
string factoryId, string deviceId)
{
var willMessage = new MqttApplicationMessageBuilder()
.WithTopic($"factory/{factoryId}/device/{deviceId}/status")
.WithPayload(JsonSerializer.Serialize(new
{
state = "offline",
reason = "unexpected_disconnect",
timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
}))
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true) // 保留消息,新订阅者也能收到
.Build();
return new MqttClientOptionsBuilder()
.WithTcpServer(broker, port)
.WithClientId(clientId)
.WithCleanSession(false) // 遗嘱消息需要 CleanSession=false
.WithKeepAlivePeriod(TimeSpan.FromSeconds(60))
.WithWillMessage(willMessage)
.Build();
}
}WPF ViewModel 集成
MQTT 数据绑定到 UI
/// <summary>
/// MQTT 数据监控 ViewModel
/// </summary>
public partial class DeviceMonitorViewModel : ObservableObject, IDisposable
{
private readonly MqttService _mqttService;
private readonly MqttMessageCache _messageCache;
[ObservableProperty]
private bool _isConnected;
[ObservableProperty]
private string _connectionStatus = "未连接";
[ObservableProperty]
private ObservableCollection<DeviceDataItem> _deviceDataList = new();
[ObservableProperty]
private string _lastMessage = "";
[ObservableProperty]
private int _messageCount;
// 消息节流 — 避免高频消息导致 UI 卡顿
private readonly SemaphoreSlim _throttle = new(1, 1);
private readonly int _throttleIntervalMs = 200;
public DeviceMonitorViewModel(MqttService mqttService, MqttMessageCache messageCache)
{
_mqttService = mqttService;
_messageCache = messageCache;
_mqttService.MessageReceived += OnMessageReceived;
_mqttService.PropertyChanged += (_, e) =>
{
if (e.PropertyName == nameof(MqttService.IsConnected))
IsConnected = _mqttService.IsConnected;
};
}
private async void OnMessageReceived(string topic, string payload)
{
// 节流处理高频消息
if (!await _throttle.WaitAsync(0))
return;
try
{
await App.Current.Dispatcher.InvokeAsync(() =>
{
LastMessage = $"[{topic}] {payload}";
MessageCount++;
// 解析并更新设备数据
var data = JsonSerializer.Deserialize<Dictionary<string, double>>(payload);
if (data != null)
{
var deviceId = topic.Split('/')[3];
var existing = DeviceDataList.FirstOrDefault(d => d.DeviceId == deviceId);
if (existing != null)
{
existing.Temperature = data.GetValueOrDefault("temperature", 0);
existing.Humidity = data.GetValueOrDefault("humidity", 0);
}
else
{
DeviceDataList.Add(new DeviceDataItem
{
DeviceId = deviceId,
Temperature = data.GetValueOrDefault("temperature", 0),
Humidity = data.GetValueOrDefault("humidity", 0)
});
}
}
}, DispatcherPriority.DataBind);
}
finally
{
_ = Task.Delay(_throttleIntervalMs).ContinueWith(_ => _throttle.Release());
}
}
[RelayCommand]
private async Task ConnectAsync()
{
await _mqttService.ConnectAsync("broker.example.com", 1883, "wpf_client_001");
await _mqttService.SubscribeAsync("factory/+/device/+/data");
}
public void Dispose()
{
_mqttService.MessageReceived -= OnMessageReceived;
_throttle.Dispose();
}
}
public class DeviceDataItem : ObservableObject
{
public string DeviceId { get; set; } = "";
[ObservableProperty]
private double _temperature;
[ObservableProperty]
private double _humidity;
}数据上报
设备端模拟
/// <summary>
/// 设备数据上报模拟
/// </summary>
public class DeviceSimulator
{
private readonly MqttService _mqtt;
private readonly string _deviceId;
private readonly string _factoryId;
private Timer? _reportTimer;
public DeviceSimulator(MqttService mqtt, string factoryId, string deviceId)
{
_mqtt = mqtt;
_factoryId = factoryId;
_deviceId = deviceId;
}
public void StartReporting(int intervalMs = 1000)
{
_reportTimer = new Timer(async _ =>
{
var data = new Dictionary<string, object>
{
["temperature"] = Random.Shared.Next(200, 350) / 10.0,
["humidity"] = Random.Shared.Next(400, 700) / 10.0,
["pressure"] = Random.Shared.Next(1000, 1020),
["timestamp"] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
var json = JsonSerializer.Serialize(data);
await _mqtt.PublishAsync($"factory/{_factoryId}/device/{_deviceId}/data", json);
}, null, 0, intervalMs);
}
public void Stop() => _reportTimer?.Dispose();
}优点
缺点
总结
MQTT 是物联网通信的首选协议。.NET 使用 MQTTnet 库,核心操作:连接 Broker → 订阅 Topic → 接收/发布消息。Topic 设计建议层级化(factory/deviceId/data)。QoS 选择:状态用 0、指令用 1、关键数据用 2。上位机订阅设备数据 Topic,发布指令 Topic 实现双向通信。
关键知识点
- 先分清主题属于界面层、ViewModel 层、线程模型还是设备接入层。
- WPF 文章真正的价值在于把 UI、数据、命令、线程和资源关系讲清楚。
- 上位机场景里,稳定性和异常恢复常常比界面花哨更重要。
- WPF 主题往往要同时理解依赖属性、绑定、可视树和命令系统。
项目落地视角
- 优先明确 DataContext、绑定路径、命令源和线程切换位置。
- 涉及设备时,补齐超时、重连、日志、告警和资源释放策略。
- 复杂控件最好提供最小可运行页面,便于后续复用和排障。
- 优先确认 DataContext、绑定路径、命令触发点和资源引用来源。
常见误区
- 把大量逻辑堆进 code-behind,导致页面难测难维护。
- 在后台线程直接操作 UI,或忽略 Dispatcher 切换。
- 只验证正常路径,不验证设备掉线、权限缺失和资源泄漏。
- 在 code-behind 塞太多状态与业务逻辑。
进阶路线
- 继续向 MVVM 工具链、控件可复用性、性能优化和视觉树诊断深入。
- 把主题放回真实设备流程,思考启动、连接、采集、显示、告警和恢复。
- 沉淀成控件库、通信层和诊断工具,提高整套客户端的复用度。
- 继续补齐控件库、主题系统、诊断工具和启动性能优化。
适用场景
- 当你准备把《MQTT 物联网通信》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合桌面业务系统、监控看板、工控上位机和设备交互界面。
- 当系统需要同时处理复杂 UI、后台任务和硬件通信时,这类主题尤为关键。
落地建议
- 优先明确 View、ViewModel、服务层和设备层边界,避免代码隐藏过重。
- 涉及实时刷新时,提前设计 UI 线程切换、节流和资源释放。
- 对硬件通信、日志、告警和异常恢复建立标准流程。
排错清单
- 先检查 DataContext、绑定路径、INotifyPropertyChanged 和命令状态是否正常。
- 排查 Dispatcher 调用、死锁、后台线程直接操作 UI 的问题。
- 出现内存增长时,优先检查事件订阅、图像资源和窗口生命周期。
复盘问题
- 如果把《MQTT 物联网通信》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《MQTT 物联网通信》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《MQTT 物联网通信》最大的收益和代价分别是什么?
