tencent cloud

SASL_SSL Access in the Public Network
Last updated:2026-01-05 15:16:59
SASL_SSL Access in the Public Network
Last updated: 2026-01-05 15:16:59

Scenarios

This document uses the Python client as an example to describe how to access TDMQ for CKafka (CKafka) in the public network by using the SASL_SSL method and send and receive messages.

Prerequisites

Operation Steps

Step 1: Preparations

1. Create an access point.
1.1 On the Instance List page, click the target instance ID to go to the instance details page.
1.2 Choose Basic Info > Access Mode, and click Add a routing policy. In the pop-up window, choose Routing Type: Public Network Domain Name Access > Access Method: SASL_SSL.

2. Create a role. Choose ACL Policy Management > User Management. On the displayed page, create a role and set the password.

3. Create a Topic. On the Topic List page in the console, create a topic (see Creating a Topic).
4. Configure the ACL policy.
Configure the topic read/write permissions for the created role by seeing Configuring Topic Read/Write Permissions.
5. Add Python dependency libraries.
Run the following command to perform the installation:
pip install kafka-python

Step 2: Producing Messages

1. Modify the configuration parameters in the message production program producer.py.
producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),

#
# SASL_SSL access in the public network.
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)

message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()
Parameter
Description
bootstrap_servers
Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column.
sasl_plain_username
Username, in the format of instance ID + # + username. The instance ID can be obtained from the basic information on the instance details page in the CKafka console. Choose ACL Policy Management > User Management to create a user and set the username.
sasl_plain_password
User password. On the instance details page in the CKafka console, choose ACL Policy Management > User Management to create a user and set the password.
topic_name
Topic name. Copy the name on the Topic List page in the console.
CARoot.pem
The certificate path required when the SASL_SSL access method is used.
2. Compile and run producer.py.
3. View the running results.


4. On the Topic List page in the CKafka console, select the target topic, and choose More > Message Query to view the message just sent.

Step 3: Consuming Messages

1. Modify the configuration parameters in the consumption message program consumer.py.
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),

#
# SASL_SSL access in the public network.
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,

)

for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
Parameter
Description
bootstrap_servers
Access network. On the Basic Info page of the instance in the console, select the Access Mode module and copy the network information from the Network column.
group_id
Consumer group ID. Define the group ID according to business requirements.
sasl_plain_username
Username, in the format of instance ID + # + username. The instance ID can be obtained from the basic information on the instance details page in the CKafka console. Choose ACL Policy Management > User Management to create a user and set the username.
sasl_plain_password
User password. On the instance details page in the CKafka console, choose ACL Policy Management > User Management to create a user and set the password.
topic_name
Topic name. Copy the name on the Topic List page in the console.
CARoot.pem
The certificate path required when the SASL_SSL access method is used.
2. Compile and run consumer.py.
3. View the running results.


4. On the Consumer Group page in the CKafka console, select the target consumer group name, enter the topic name in the Topic Name area, and click View Details to view consumption details.

Issue Troubleshooting

SSL Certificate Errors

If the following SSL CERTIFICATE_VERIFY_FAILED error is reported when the above demo is used, check whether the downloaded certificate file (SSL Certificates) is correct. If the error persists, submit a ticket and contact backend engineers for troubleshooting.
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/sender-py", line 160, in run_once
self._client.poll(timeout_ms=poll_timeout_ms)
File"/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line602, in poll
self._poll(timeout / 1000)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/client_async.py", line 648, in _poll
conn.connect()
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 429, in connect
if self._try_handshake():
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/conn.py", line 508, in _try_handshake
self._sock.do_handshake()
File "/root/anaconda3/envs/py39/lib/python3.9/ssl.py", line 1343, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError:[SSL:CERTIFICATE_VERIFY_FAILED]certificate verify failed:certificate signature failure(_ssl.c:1133)
WARNING:kafka.conn:SSL connection closed by server during handshake.
INF0:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=ckafka-xxx.ap-beijing.ckafka.tencentcloudmq.com:50001 <handshake>[IPv4('x.x.x.x', 50001)]>:Closing connection.Kafka Connection Error:SSL connection closed by server during handshake
^CTraceback(most recent call last):
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py",line 49,in<module>
main()
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 43, in main
send_message(producer, 'skdy_osr_1005',message)
File "/var/user/ckafka/python-demo/kafka-python/users-test/sasl_ssl-producer.py", line 32, in send_message
future = producer.send(topic, value = msg)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 576, in send
self._wait_on_metadata(topic, self.config['max_block_ms']/1000.0)
File "/root/anaconda3/envs/py39/lib/python3.9/site-packages/kafka/producer/kafka.py", line 699, in _wait_on_metadata
metadata_event.wait(max_wait-elapsed)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 581,in wait
signaled = self._cond.wait(timeout)
File "/root/anaconda3/envs/py39/lib/python3.9/threading-py", line 316, in wait
gotit =waiter.acquire(True, timeout)
KeyboardInterrupt
INFo:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFo:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed with in timeout 0.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback