tencent cloud

消息队列 MQTT 版

产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

.NET

聚焦模式
字号
最后更新时间: 2026-01-30 15:23:30

功能概述

MQTTnet 是一个高性能的 .NET MQTT 客户端库,支持 MQTT 协议 3.1.1 和 5.0 版本。该库完全使用 C# 编写,提供了功能齐全的 MQTT 客户端和服务器实现。

主要特性

支持 .NET Framework 4.5.2+ 和 .NET Core/NET 5+
同时支持同步和异步操作
完整支持 MQTT v3.1.1 和 v5.0 协议
支持 TCP、WebSocket 和 TLS/SSL 加密连接
支持共享订阅、保留消息、遗嘱消息等高级特性

云资源准备

请您先参见相关文档完成云资源准备,获取以下信息:
MQTT Broker 地址
端口号(通常为 1883 或 8883)
用户名和密码(如需要认证)
TLS/SSL 证书(如使用加密连接)

环境准备

系统要求

.NET 6.0 或更高版本(推荐)
.NET Framework 4.5.2+ 或 .NET Core 2.0+

安装 MQTTnet SDK

方法一:使用 NuGet 包管理器
dotnet add package MQTTnet
方法二:通过 Visual Studio
1.1 右键单击项目 > "管理 NuGet 程序包"
1.2 搜索 "MQTTnet"
1.3 安装最新稳定版本
方法三:编辑 .csproj 文件

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.3.7.1207" />
</ItemGroup>
</Project>

示例代码

MQTT5
MQTT5 TLS
MQTT3.1.1
MQTT3.1.1 TLS
using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT5_TCP_Sample
{
static async Task Main(string[] args)
{
// ============ 连接配置 ============
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithCleanStart(true)
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}
}

using System;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT5_TLS_Sample
{
static async Task Main(string[] args)
{

// ============ 连接配置 ============
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// CA证书路径(可选,用于验证服务器证书)
String caCertPath = null; // 例如: "/path/to/ca.crt"

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项(包含TLS)
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithCleanStart(true)
.WithTlsOptions(tls =>
{
tls.UseTls();
tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);
// 如果提供了CA证书,加载它
if (!string.IsNullOrEmpty(caCertPath))
{
var caCert = new X509Certificate2(caCertPath);
tls.WithClientCertificates(new[] { caCert });
}
// 证书验证回调
tls.WithCertificateValidationHandler(context =>
{
var certificate = context.Certificate as X509Certificate2;
// 测试环境: 接受所有证书
// return true;
// 生产环境: 严格证书验证
return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);
});
})
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接(TLS加密)");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}

/// <summary>
/// 验证服务器证书
/// </summary>
static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// 如果没有错误,直接通过
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

// 如果证书为空,拒绝
if (certificate == null)
{
Console.WriteLine("证书验证失败: 证书为空");
return false;
}

// 1. 检查证书有效期
DateTime now = DateTime.Now;
if (now < certificate.NotBefore || now > certificate.NotAfter)
{
Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");
return false;
}

// 2. 检查证书链
if (chain != null && chain.ChainStatus.Length > 0)
{
foreach (var status in chain.ChainStatus)
{
// 忽略离线吊销检查错误(可选)
if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||
status.Status == X509ChainStatusFlags.OfflineRevocation)
{
continue;
}
if (status.Status != X509ChainStatusFlags.NoError)
{
Console.WriteLine($"证书验证失败: {status.StatusInformation}");
return false;
}
}
}

// 3. 检查证书主题名称(可选,根据实际需求)
// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";
// if (!certificate.Subject.Contains(expectedSubject))
// {
// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");
// return false;
// }

Console.WriteLine("证书验证通过");
return true;
}
}



using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT311_TCP_Sample
{
static async Task Main(string[] args)
{
// ============ 连接配置 ============
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}
}


using System;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT311_TLS_Sample
{
static async Task Main(string[] args)
{

// ============ 连接配置 ============
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// CA证书路径(可选,用于验证服务器证书)
String caCertPath = null; // 例如: "/path/to/ca.crt"

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项(包含TLS)
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.WithTlsOptions(tls =>
{
tls.UseTls();
tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);
// 如果提供了CA证书,加载它
if (!string.IsNullOrEmpty(caCertPath))
{
var caCert = new X509Certificate2(caCertPath);
tls.WithClientCertificates(new[] { caCert });
}
// 证书验证回调
tls.WithCertificateValidationHandler(context =>
{
var certificate = context.Certificate as X509Certificate2;
// 测试环境: 接受所有证书
// return true;
// 生产环境: 严格证书验证
return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);
});
})
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接(TLS加密)");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}

/// <summary>
/// 验证服务器证书
/// </summary>
static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// 如果没有错误,直接通过
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

// 如果证书为空,拒绝
if (certificate == null)
{
Console.WriteLine("证书验证失败: 证书为空");
return false;
}

// 1. 检查证书有效期
DateTime now = DateTime.Now;
if (now < certificate.NotBefore || now > certificate.NotAfter)
{
Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");
return false;
}

// 2. 检查证书链
if (chain != null && chain.ChainStatus.Length > 0)
{
foreach (var status in chain.ChainStatus)
{
// 忽略离线吊销检查错误(可选)
if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||
status.Status == X509ChainStatusFlags.OfflineRevocation)
{
continue;
}
if (status.Status != X509ChainStatusFlags.NoError)
{
Console.WriteLine($"证书验证失败: {status.StatusInformation}");
return false;
}
}
}

// 3. 检查证书主题名称(可选,根据实际需求)
// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";
// if (!certificate.Subject.Contains(expectedSubject))
// {
// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");
// return false;
// }

Console.WriteLine("证书验证通过");
return true;
}
}




参数说明

参数
说明
ADDRESS
broker 连接地址,在控制台目标集群基本信息 > 接入信息模块中复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883。

CLIENTID
客户端 ID,在控制台集群详情页客户端管理页面获取。

USERNAME
用户名,在控制台认证管理页面获取;
PASSWORD
密码,在控制台认证管理页面获取;

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈