tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

Java SDK

聚焦模式
字号
最后更新时间: 2026-01-30 15:23:30

功能概述

Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如 Android)。
Eclipse Paho Java Client 提供了 MqttAsyncClient 和 MqttClient 异步和同步 API。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

通过 Maven 安装 Paho Java

MQTT 5 Paho SDK
MQTT 3.1.1 Paho SDK
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>

示例代码

MQTT 5
MQTT 5 TLS
MQTT 3.1.1
MQTT 3.1.1 TLS
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import java.nio.ByteBuffer;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class QuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// MQTT topic
String pubTopic = "home/test";
String[] topicFilters = new String[]{pubTopic, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(pubTopic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class QuickStartTls {

public static SSLSocketFactory buildSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, null, null);
return ctx.getSocketFactory();
}

public static void main(String[] args) throws Exception {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStartTls";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// MQTT topic
String topicName = "home/test";
String[] topicFilters = new String[]{topicName, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setSocketFactory(buildSSLSocketFactory());
options.setHttpsHostnameVerificationEnabled(false);
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
IMqttToken token = client.subscribe(topicFilters, qos);
int[] reasonCodes = token.getReasonCodes();
for (int i = 0; i < reasonCodes.length; i++) {
System.out.printf("Subscribed to topic %s with QoS=%d, Granted-QoS: %d%n",
topicFilters[i], qos[i], reasonCodes[i]);
}

if (token.isComplete()) {
List<UserProperty> userProperties = token.getResponseProperties().getUserProperties();
printUserProperties(userProperties);
}
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(topicName, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}

static void printUserProperties(List<UserProperty> userProperties) {
if (null != userProperties) {
for (UserProperty userProperty : userProperties) {
System.out.printf("User property: %s = %s%n", userProperty.getKey(), userProperty.getValue());
}
}
}
}


import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class QuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "QuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// 确认一级主题 "home" 在MQTT控制台已经创建
String pubTopic = "home/test";
String[] topicFilters = new String[]{pubTopic, "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

int total = 16;

try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setCleanSession(true);
options.setAutomaticReconnect(true);

MqttCallback callback = new MqttCallback(client, topicFilters, qos);
client.setCallback(callback);

client.connect(options);

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(pubTopic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(3);
}
TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}

static class MqttCallback implements MqttCallbackExtended {

private final MqttClient client;

private final String[] topicFilters;
private final int[] qos;

public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {
this.client = client;
this.topicFilters = topicFilters;
this.qos = qos;
}

public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d, Dup=%s, Retained=%s, content=[%s]%n",
topic, message.getQos(), message.isDuplicate(), message.isRetained(),
new String(message.getPayload(), StandardCharsets.UTF_8));
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
client.subscribe(topicFilters, qos);
System.out.printf("Subscribed %d topics%n", topicFilters.length);
} catch (MqttException e) {
e.printStackTrace();
}
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;

public class QuickStartTls {

public static SSLSocketFactory buildSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, null, null);
return ctx.getSocketFactory();
}

public static void main(String[] args) throws MqttException, InterruptedException, NoSuchAlgorithmException, KeyManagementException {

// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "ssl://mqtt-xxxx.mqtt.tencenttdmq.com:8883";
// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "ClientQuickStartTls";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";

// 确认一级主题 "home" 在MQTT控制台已经创建
String topic = "home/test";

String[] topicFilters = new String[]{"home/test"};
int[] qos = new int[]{1};

int total = 16;

try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setSocketFactory(buildSSLSocketFactory());
options.setHttpsHostnameVerificationEnabled(false);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);

client.setCallback(new MqttCallback(client, topicFilters, qos));

for (int i = 0; i < total; i++) {
String msg = "Hello MQTT " + i;
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(1);
System.out.printf("Prepare to publish message %d%n", i);
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.SECONDS.sleep(1);
}

TimeUnit.SECONDS.sleep(3);

client.disconnect();
}
}

static class MqttCallback implements MqttCallbackExtended {

private final MqttClient client;

private final String[] topicFilters;
private final int[] qos;

public MqttCallback(MqttClient client, String[] topicFilters, int[] qos) {
this.client = client;
this.topicFilters = topicFilters;
this.qos = qos;
}

public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload(), StandardCharsets.UTF_8));
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connect" + serverURI);
try {
client.subscribe(topicFilters, qos);
System.out.printf("Subscribed %d topics%n", topicFilters.length);
} catch (MqttException e) {
e.printStackTrace();
}
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.printf("Delivery completed: packet-id=%d%n", token.getMessageId());
}
}
}
参数
说明
pubTopic
待发布消息的目标主题。
消息主题可以包含多级(level), 通过'/'隔开。第一级主题(Topic)需要在控制台 Topic Tab 页创建。详情参见 Topic Names and Topic Filters
topicFilters
一个或者多个订阅表达式。
订阅表达式可以包含通配符(Wildcards),详情参见 Topic Names and Topic Filters
qos
服务质量数组,数组长度必须与 Topic Filters 数据保持一致。
最常用的 QoS 为1, 即至少投递一次(At-Least-Once),详情参见 Quality of Service levels and protocol flows
serverUri
MQTT 实例接入点,在控制台目标集群基本信息 > 接入信息模块复制。位置如下图所示。
标准接入点为: tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883
TLS 接入点为: ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883
标准 WebSocket 接入点: ws://mqtt-xxx.mqtt.tencenttdmq.com:80/mqtt
TLS WebSocket 接入点: wss://mqtt-xxx.mqtt.tencenttdmq.com:443/mqtt



clientId
设备唯一标识符: 例如车辆车架号 VIN、产品序列号等。
合法的 Client Identifier 包含: 数字0-9, 小写英文字母 a-z, 大写英文字母 A-Z; 总长度为1-23个字符。详情参见 Client Identifier
username
连接用户名,在控制台集群详情页认证管理页面复制。



password
连接用户名匹配的密码,在控制台集群详情页认证管理页面复制。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈