tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

Go SDK

聚焦模式
字号
最后更新时间: 2026-01-30 15:23:30

功能概述

Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

安装 Eclipse Paho SDK:
MQTT 5.0
MQTT 3.1.1
go get github.com/eclipse/paho.golang

go get github.com/eclipse/paho.mqtt.golang

示例代码

MQTT 5.0
MQTT 3.1.1
package main

import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
)

// 接入点, 从控制台获取
const accessPoint = "mqtt://mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"

// Change this to something random if using a public test server
const clientID = "PahoGoClient"

const topic = "PahoGoTestTopic"

// 用户名, 从控制台获取
const username = "YOUR_USERNAME"

// 密码, 从控制台获取
var password = []byte("YOUR_PASSWORD")

func main() {
// App will run until cancelled by user (e.g. ctrl-c)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// We will connect to the Eclipse test server (note that you may see messages that other users publish)
u, err := url.Parse(accessPoint)
if err != nil {
panic(err)
}

cliCfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{u},
ConnectUsername: username,
ConnectPassword: password,

// Keepalive message should be sent every 60 seconds
KeepAlive: 60,

// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
CleanStartOnInitialConnection: false,

// SessionExpiryInterval - Seconds that a session will survive after disconnection.
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
// the server will not queue messages while it is down. The specific setting will depend upon your needs
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 259200 = 3 days)
// MQTT server permits expiry interval up to 3 days
SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
// the connection drops)
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: topic, QoS: 1},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\\n", err) },
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// OnPublishReceived is a slice of functions that will be called when a message is received.
// You can write the function(s) yourself or use the supplied Router
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
fmt.Printf("received message on topic %s; body: %s (retain: %t)\\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
return true, nil
}},
OnClientError: func(err error) { fmt.Printf("client error: %s\\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("server requested disconnect: %s\\n", d.Properties.ReasonString)
} else {
fmt.Printf("server requested disconnect; reason code: %d\\n", d.ReasonCode)
}
},
},
}

c, err := autopaho.NewConnection(ctx, cliCfg) // starts process; will reconnect until context cancelled
if err != nil {
panic(err)
}
// Wait for the connection to come up
if err = c.AwaitConnection(ctx); err != nil {
panic(err)
}

ticker := time.NewTicker(time.Second)
msgCount := 0
defer ticker.Stop()
for {
select {
case <-ticker.C:
msgCount++
// Publish a test message (use PublishViaQueue if you don't want to wait for a response)
if _, err = c.Publish(ctx, &paho.Publish{
QoS: 1,
Topic: topic,
Payload: []byte("TestMessage: " + strconv.Itoa(msgCount)),
}); err != nil {
if ctx.Err() == nil {
panic(err) // Publish will exit when context cancelled or if something went wrong
}
}
continue
case <-ctx.Done():
}
break
}

fmt.Println("signal caught - exiting")
<-c.Done() // Wait for clean shutdown (cancelling the context triggered the shutdown)
}

package main

import (
"fmt"
"log"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)
// 集群接入点, 从控制台获取
const accessPoint = "mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"

// 用户名, 从控制台获取
const username = "your-username"

// 密码, 从控制台获取
const password = "your-password"

// 客户端标识, 需保持集群唯一, 一般为产品序列号, 车辆VIN码等
const clientId = "VIN0001"

// 发送消息主题
const topic = "testtopic/1"

// 订阅表达式
const topicFilter = "testtopic/#"

// 发送、订阅QoS
const qos = 1

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("Received a message:")
fmt.Printf("TOPIC: %s\\n", msg.Topic())
fmt.Printf("MSG: %s\\n", msg.Payload())
}

func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker(accessPoint).SetClientID(clientId)

opts.SetKeepAlive(60 * time.Second)
opts.SetUsername(username)
opts.SetPassword(password)

// 设置消息回调处理函数
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

// 订阅主题
if token := c.Subscribe(topicFilter, qos, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// 发布消息
token := c.Publish(topic, qos, false, "Hello World")
token.Wait()

time.Sleep(6 * time.Second)

// 取消订阅
if token := c.Unsubscribe(topicFilter); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// 断开连接
c.Disconnect(250)
time.Sleep(1 * time.Second)
}



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈