tencent cloud

SASL_PLAINTEXT Access in the Public Network
Last updated:2026-01-05 15:16:59
SASL_PLAINTEXT Access in the Public Network
Last updated: 2026-01-05 15:16:59

Scenarios

This document describes how the Go client accesses TDMQ for CKafka (CKafka) in the public network environment by using the SASL_PLAINTEXT method to send and receive messages.

Prerequisites

Operation Steps

Step 1: Preparing Go Dependency Libraries

1. Upload gokafkademo from the downloaded demo to the Linux server.
2. Log in to the Linux server, go to the gokafkademo directory, and run the following command to add dependency libraries.
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

Step 2: Preparing for Configurations

Create the configuration file kafka.json.
{
"topic": [
"xxxx"
],
"sasl": {
"username": "yourUserName",
"password": "yourPassword",
"instanceId":"instanceId"
},
"bootstrapServers": [
"xxx.ap-changsha-ec.ckafka.tencentcloudmq.com:6000"
],
"consumerGroupId": "yourConsumerId"
}
Parameter
Description
topic
Topic name. Copy the name on the Topic List page in the console.

sasl.username
Username. In the console, choose ACL Policy Management > User Management to create a user and set the username.
sasl.password
User password. In the console, choose ACL Policy Management > User Management to create a user and set the password.
sasl.instanceId
Instance ID. Obtain the ID from the basic information on the instance details page in the console.
bootstrapServers
Access network. On the instance details page in the console, select the Access Mode module to copy the network information in the Network column.
consumerGroupId
You can define the name and see the consumer on the Consumer Group page after successful demo running.

Step 3: Sending Messages

1. Write a message production program.
package main

import (
"fmt"
"gokafkademo/config"
"log"
"strings"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}

p, err := kafka.NewProducer(&kafka.ConfigMap{
// Set the access point. You can obtain the access point of the corresponding topic in the console.
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// The default SASL authentication mechanism type is PLAIN.
"sasl.mechanism": "PLAIN",
// Configure the ACL policy locally.
"security.protocol": "SASL_PLAINTEXT",
// username is in the format of instance ID + # + configured username, and password is the configured user password.
"sasl.username": fmt.Sprintf("%s#%s", cfg.SASL.InstanceId, cfg.SASL.Username),
"sasl.password": cfg.SASL.Password,

// 3 acknowledgment mechanisms are available for the Kafka producer, as described below:
// -1 or all: The broker responds to the producer to continue sending the next (batch of) message(s) only after the leader receives the data and synchronizes it to the follower in all ISRs.
// This configuration ensures high data reliability. As long as there is a synchronized replica alive, no message will be lost. Note: This configuration does not ensure all replicas are written before the data is returned.
// Can be used in conjunction with the topic level parameter min.insync.replicas.
// 0: The producer continues to send the next (batch of) message(s) without waiting for the broker acknowledgment that the synchronization is completed. This configuration provides high production performance but low data reliability. (Data may be lost if the broker server where the leader replica is stored fails, because the server will not receive any message if the producer is unaware of the failure.)
// 1: The producer sends the next (batch of) message(s) after the leader has successfully received the data as acknowledged. This configuration balances the production throughput and data reliability. (Messages may be lost if the broker server where the leader replica is stored fails but the replica is not copied.)
// The default value 1 is used if the configuration is not displayed. You can set it based on your business requirements.
"acks": 1,
// The number of retries when a request error occurs. It is recommended to set this value to greater than 0 to ensure that the message is not lost to the maximum extent during failed retries.
"retries": 0,
// The time between the failed request transmission and the next retry request.
"retry.backoff.ms": 100,
// The timeout period for producer network requests.
"socket.timeout.ms": 6000,
// Set the interval of internal retries on the client.
"reconnect.backoff.max.ms": 3000,
})
if err != nil {
log.Fatal(err)
}

defer p.Close()

// Send the generated message to the report handler.
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\\n", ev.TopicPartition)
}
}
}
}()

// Send messages asynchronously.
topic := cfg.Topic
for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
_ = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}

// Wait for messages to be sent.
p.Flush(10 * 1000)
2. Compile and run the program to send messages.
go run main.go
3. View the running results. An example is as follows.
Delivered message to test[0]@628
Delivered message to test[0]@629
4. On the Topic Management page in the CKafka console, select the target topic, and choose More > Message Query to view the message just sent. 

Step 4: Consuming Messages

1. Write a message consumption program.
package main

import (
"fmt"
"gokafkademo/config"
"log"
"strings"

"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}

c, err := kafka.NewConsumer(&kafka.ConfigMap{
// Set the access point. You can obtain the access point of the corresponding topic in the console.
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// The default SASL authentication mechanism type is PLAIN.
"sasl.mechanism": "PLAIN",
// Configure the ACL policy locally.
"security.protocol": "SASL_PLAINTEXT",
// username is in the format of instance ID + # + configured username, and password is the configured user password.
"sasl.username": fmt.Sprintf("%s#%s", cfg.SASL.InstanceId, cfg.SASL.Username),
"sasl.password": cfg.SASL.Password,
// The message consumer group that has been set.
"group.id": cfg.ConsumerGroupId,
"auto.offset.reset": "earliest",

// Consumer timeout interval when the Kafka consumer group mechanism is used. If the broker does not receive the heartbeat from the consumer within this interval, the consumer is considered to be failed, and the broker
// initiates the rebalancing process again. Currently, the value must be between the value (6000) of the broker configuration parameter group.min.session.timeout.ms and the value (300000) of group.max.session.timeout.ms.
"session.timeout.ms": 10000,
})

if err != nil {
log.Fatal(err)
}
// List of subscribed message topics.
err = c.SubscribeTopics([]string{"test", "test-topic"}, nil)
if err != nil {
log.Fatal(err)
}

for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to clear all errors.
fmt.Printf("Consumer error: %v (%v)\\n", err, msg)
}
}

c.Close()
}
2. Compile and run the program to consume messages.
go run main.go
3. View the running results. An example is as follows.
Message on test[0]@628: Confluent-Kafka
Message on test[0]@629: Golang Client Message
4. On the Consumer Group page in the CKafka console, select the target consumer group name, enter the topic name in the Topic Name area, and click View Details to view consumption details. 
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback