tencent cloud

TDMQ for MQTT

DocumentationTDMQ for MQTTDevelopment GuideConfiguring Point-to-Point Subscription

Configuring Point-to-Point Subscription

PDF
Focus Mode
Font Size
Last updated: 2026-04-01 16:37:51

Scenario Description

In addition to the publish/subscribe (Pub/Sub) message model defined by the MQTT standard protocol, TDMQ for MQTT supports Point-to-Point (P2P) mode.
Note:
The current feature is in grayscale. If you need this feature, please contact us.

What Is P2P Messaging Mode?

When you need to send a message to a specified consumer, you can use the P2P messaging mode. Compared to the Pub/Sub messaging mode, which mainly supports 1:N (one publisher and multiple subscribers) and M:N scenarios, the P2P messaging mode offers an efficient point-to-point communication solution.
When the P2P mode is used, the publisher clearly specifies the target recipient information of the message, which needs to be consumed only by a designated client. When sending a message, the sender specifies the recipient by setting a topic that complies with the naming rules. The recipient can consume the message without subscribing in advance.
Using the P2P mode can not only save the cost for the recipient to register subscription relationships, but also reduce the message delivery latency.

Publishing a P2P Message

String firstTopic = ...;
String targetClientId = ...;
String topic = firstTopic + "/p2p/" + targetClientId;
MqttMessage message = ...;
mqttClient.publish(topic, message);

Subscribing to a P2P Message

The client receiving the message requires no subscription. It only needs to be properly initialized and connected to the target cluster to receive P2P messages.

Notes on Using the Paho Golang SDK

After receiving a message, the Paho Golang SDK performs matchAndDispatch based on the topic of the received message.
// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
}
A message received may not match any topic filter. To properly handle such messages, configure defaultHandler: ClientOptions:SetDefaultPublishHandler(messagePubHandler) to ensure the fallback logic works as expected.
package main

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}

func sub(client mqtt.Client) {
topic := "home/room"
// Message handler per topic-filter
token := client.Subscribe(topic, 1, messagePubHandler)
result := token.Wait()
fmt.Printf("Subscribed to topic %s, %s", topic, result)
}

func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("home/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

func main() {
// Acquire your instance access point from MQTT Console
var broker = "mqtt-xxx.mqtt.tencenttdmq.com"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("mqttGolangClient")
opts.SetUsername("YOUR-USERNAME")
opts.SetPassword("YOUR-PASSWORD")
// Need to configure defaultHandler to make P2P message fallback properly
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
time.Sleep(time.Minute * 3)
}


Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback