tencent cloud

消息队列 MQTT 版

产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

Dart SDK

聚焦模式
字号
最后更新时间: 2026-01-30 15:23:30

功能概述

Dart/Flutter 平台提供了两个主要的 MQTT 客户端库:

mqtt_client

用于 MQTT 3.1/3.1.1 协议的客户端库,可用于 Flutter、Dart VM 以及 Web 平台。提供了 MqttServerClient 和 MqttBrowserClient 用于服务端和浏览器环境。

mqtt5_client

专门支持 MQTT 5.0 协议的客户端库,提供了完整的 MQTT 5.0 特性支持,包括增强的认证、消息属性、主题别名等。同样支持服务器和浏览器环境。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

选择合适的客户端库

根据您需要使用的 MQTT 协议版本选择对应的库:
使用 MQTT 3.1.1(推荐用于生产环境)
dependencies:
mqtt_client: ^10.11.0

使用 MQTT 5.0
dependencies:
mqtt5_client: ^4.15.2

然后运行:
flutter pub get
# 或
dart pub get

示例代码

MQTT 5.0
MQTT 5.0 TLS
MQTT 3
MQTT 3 TLS

import 'dart:async';
import 'dart:convert';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 1883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
final clientId = 'QuickStartMqtt5';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.keepAlivePeriod = 60;
client.autoReconnect = true;

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean();
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttPayloadBuilder();
builder.addString('Hello MQTT 5.0 - $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, qos[0], builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}


import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 8883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
final clientId = 'QuickStartMqtt5Tls';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.secure = true;

// 配置 TLS/SSL
SecurityContext context = SecurityContext.defaultContext;
client.securityContext = context;
client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean();
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttPayloadBuilder();
builder.addString('Hello MQTT 5.0 TLS - $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, qos[0], builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}


import 'dart:async';
import 'dart:convert';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 1883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
final clientId = 'QuickStart';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.setProtocolV311(); // 使用 MQTT 3.1.1 协议
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.connectTimeoutPeriod = 3000;

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttClientPayloadBuilder();
builder.addString('Hello MQTT $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}




import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 8883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
final clientId = 'ClientQuickStartTls';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.setProtocolV311(); // 使用 MQTT 3.1.1 协议
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.connectTimeoutPeriod = 3000;
client.secure = true;

// 配置 TLS/SSL
SecurityContext context = SecurityContext.defaultContext;
client.securityContext = context;
client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttClientPayloadBuilder();
builder.addString('Hello MQTT $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}




帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈