This document describes how to use open-source SDK to send and receive messages by using the SDK for C++ as an example and helps you better understand the message sending and receiving processes.
// Client configuration information
ClientConfiguration config;
// Set the role token
AuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION);
config.setAuth(auth);
// Create a client
Client client(SERVICE_URL, config);
Parameter | Description |
---|---|
SERVICE_URL | Cluster access address, which can be viewed and copied on the Cluster page in the console.![]() |
AUTHENTICATION | Role token, which can be copied in the Token column on the Role Management page.![]() |
// Producer configurations
ProducerConfiguration producerConf;
producerConf.setBlockIfQueueFull(true);
producerConf.setSendTimeout(5000);
// Producer
Producer producer;
// Create a producer
Result result = client.createProducer(
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
"persistent://pulsar-xxx/sdk_cpp/topic1",
producerConf,
producer);
if (result != ResultOk) {
std::cout << "Error creating producer: " << result << std::endl;
return -1;
}
NoteYou need to enter the complete path of the topic name, i.e.,
persistent://clusterid/namespace/Topic
, where theclusterid/namespace/topic
part can be copied directly from the Topic page in the console.
// Message content
std::string content = "hello cpp client, this is a msg";
// Create a message object
Message msg = MessageBuilder().setContent(content)
.setPartitionKey("mykey") // Business key
.setProperty("x", "1") // Set message parameters
.build();
// Send the message
Result result = producer.send(msg);
if (result != ResultOk) {
// The message failed to be sent
std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
} else {
// The message was successfully sent
std::cout << "The message " << content << " sent successfully" << std::endl;
}
// Consumer configuration information
ConsumerConfiguration consumerConfiguration;
consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest);
// Consumer
Consumer consumer;
// Subscribe to a topic
Result result = client.subscribe(
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
"persistent://pulsar-xxx/sdk_cpp/topic1",
// Subscription name
"sub_topic1",
consumerConfiguration,
consumer);
if (result != ResultOk) {
std::cout << "Failed to subscribe: " << result << std::endl;
return -1;
}
Note:
- You need to enter the complete path of the topic name, i.e.,
persistent://clusterid/namespace/Topic
, where theclusterid/namespace/topic
part can be copied directly from the Topic page in the console.- You need to enter the subscription name in the
subscriptionName
parameter, which can be viewed on the Consumption Management page.
Message msg;
// Obtain the message
consumer.receive(msg);
// Simulate the business processing logic
std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << std::endl;
// Return `ack` as the acknowledgement
consumer.acknowledge(msg);
// Return `nack` if the consumption fails, and the message will be delivered again
// consumer.negativeAcknowledge(msg);
Note:The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar C++ client.
Was this page helpful?