This document describes how to access CKafka to send/receive messages with the SDK for Python through SASL_SSL over public network.
Create an access point.
Route Type: Public domain name access
, Access Mode: SASL_SSL
.Create a role.
On the User Management tab page, create a role and set the password.
Create a topic.
Create a topic on the Topic Management tab page as instructed in Creating a topic.
Add the Python dependent library.
Run the following command to install the dependent library:
pip install kafka-python
producer.py
.producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),
#
# Public network access through SASL_SSL
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()
Parameter | Description |
---|---|
bootstrap_servers |
Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console.![]() |
sasl_plain_username |
Username in the format of instance ID + # + username . The instance ID can be obtained in Basic Info on the instance details** page in the CKafka console, and the username is set when the user is created in User Management. |
sasl_plain_password |
User password, which is set when the user is created in User Management on the instance details page in the CKafka console. |
topic_name |
Topic name, which can be copied from the Topic Management page in the console.![]() |
CARoot.pem |
Certificate path that is required when the access mode is SASL_SSL . |
Compile and run producer.py
.
View the execution result.
On the Topic Management tab page on the instance details page in the CKafka console, select the target topic, and click More > Message Query to view the message just sent.
consumer.py
.consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),
#
# Public network access through SASL_SSL
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
Parameter | Description |
---|---|
bootstrap_servers |
Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console.![]() |
group_id |
Consumer group ID, which can be customized based on business requirements. |
sasl_plain_username |
Username in the format of instance ID + # + username . The instance ID can be obtained in Basic Info on the instance details** page in the CKafka console, and the username is set when the user is created in User Management. |
sasl_plain_password |
User password, which is set when the user is created in User Management on the instance details page in the CKafka console. |
topic_name |
Topic name, which can be copied from the Topic Management page in the console.![]() |
CARoot.pem |
Certificate path that is required when the access mode is SASL_SSL . |
Compile and run consumer.py
.
View the execution result.
On the Consumer Group tab page in the CKafka console, select the corresponding consumer group name, enter the topic name, and click View Details to view the consumption details.
Was this page helpful?