pip install 'pulsar-client==3.1.0'
# Create a client.client = pulsar.Client(authentication=pulsar.AuthenticationToken(# The authorized role token.AUTHENTICATION),# The address used to access the service.service_url=SERVICE_URL)
Parameter | Description |
SERVICE_URL | Address used to access the cluster. You can view and copy the address from the Cluster page in the console. |
AUTHENTICATION |
# Create a producer.producer = client.create_producer(# The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.topic='pulsar-xxx/sdk_python/topic1')
persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.# Send a message.producer.send(# The message content.'Hello python client, this is a msg.'.encode('utf-8'),# The message parameters.properties={'k': 'v'},# The business key.partition_key='yourKey')
# Asynchronous sending callback.def send_callback(send_result, msg_id):print('Message published: result:{} msg_id:{}'.format(send_result, msg_id))# Send a message.producer.send_async(# The message content.'Hello python client, this is a async msg.'.encode('utf-8'),# The asynchronous callback.callback=send_callback,# The message configuration.properties={'k': 'v'},# The business key.partition_key='yourKey')
# Subscribe to messages.consumer = client.subscribe(# The full topic path in the format of persistent://cluster (tenant) ID/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.topic='pulsar-xxx/sdk_python/topic1',# The subscription name.subscription_name='sub_topic1')
persistent://clusterid/namespace/topic. You can copy the clusterid/namespace/topic part from the Topic page in the console.
# Obtain messages.msg = consumer.receive()try:# Simulate the business logic.print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))# If the consumption succeeds, reply ack.consumer.acknowledge(msg)except:# If consumption fails, the message will be redelivered.consumer.negative_acknowledge(msg)

Feedback