yum install openssl openssl-develyum install cyrus-sasl{,-plain}
// producer.cpp#include <iostream>#include <string>#include "librdkafka/rdkafkacpp.h"int main() {std::string brokers = "$broker"; // Change to your Kafka address.std::string topic = "$topic"; // Topic name.std::string ca_file = "./CARoot.pem"; // CA certificate path (in the same directory as the program).std::string sasl_username = "$username"; // SASL username.std::string sasl_password = "$password"; // SASL password.std::string errstr;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));// ====== Global configuration. ======conf->set("metadata.broker.list", brokers, errstr);conf->set("security.protocol", "sasl_ssl", errstr);conf->set("ssl.ca.location", ca_file, errstr);conf->set("sasl.mechanisms", "PLAIN", errstr);conf->set("sasl.username", sasl_username, errstr);conf->set("sasl.password", sasl_password, errstr);// acks=1conf->set("acks", "1", errstr);// Retry mechanism.conf->set("message.send.max.retries", "5", errstr);conf->set("retry.backoff.ms", "1000", errstr);// max.in.flight.requests.per.connection=5conf->set("max.in.flight.requests.per.connection", "5", errstr);// ====== Create a producer. ======std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return 1;}// ====== Create a topic object. ======std::unique_ptr<RdKafka::Topic> topic_obj(RdKafka::Topic::create(producer.get(), topic, tconf.get(), errstr));if (!topic_obj) {std::cerr << "Failed to create topic object: " << errstr << std::endl;return 1;}// ====== Send messages. ======for (int i = 1; i <= 5; ++i) {std::string msg = "Message from C++ on CentOS #" + std::to_string(i);RdKafka::ErrorCode resp = producer->produce(topic_obj.get(),RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(msg.c_str()), msg.size(),nullptr, nullptr);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "Sent: " << msg << std::endl;}}// ====== Wait for sending completion. ======producer->flush(5000); // Wait up to 5 seconds.return 0;}
g++ -std=c++11 \\-I/usr/local/include \\-L/usr/local/lib \\producer.cpp \\-lrdkafka++ -lrdkafka \\-o producer
./producer
Parameter | Description |
broker | Access network. On the instance details page in the console, select the Access Mode module to copy the network information in the Network column. |
topic | Topic name. Copy the name on the Topic List page in the console. |
username | Username in the format of instance ID + # + configured username. Obtain the instance ID in the basic information area on the instance details page in the CKafka console, and choose ACL Policy Management > User Management in the console to create a user and set the username. |
password | Password is the configured user password. Choose ACL Policy Management > User Management in the console to create a user and set the password. |

// consumer.cpp#include <iostream>#include <string>#include "librdkafka/rdkafkacpp.h"class ConsumerCallback : public RdKafka::ConsumeCb {public:void consume_cb(RdKafka::Message &msg, void *opaque) override {if (msg.err()) {std::cerr << "Error: " << msg.errstr() << std::endl;} else {std::cout << "Received: " << std::string(static_cast<const char*>(msg.payload()), msg.len()) << std::endl;}}};int main() {std::string brokers = "$broker";std::string topic = "$topic";std::string group_id = "$group.id";std::string ca_file = "./CARoot.pem";std::string sasl_username = "$username";std::string sasl_password = "$password";std::string errstr;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));conf->set("metadata.broker.list", brokers, errstr);conf->set("security.protocol", "sasl_ssl", errstr);conf->set("ssl.ca.location", ca_file, errstr);conf->set("sasl.mechanisms", "PLAIN", errstr);conf->set("sasl.username", sasl_username, errstr);conf->set("sasl.password", sasl_password, errstr);conf->set("group.id", group_id, errstr);conf->set("auto.offset.reset", "earliest", errstr);conf->set("enable.auto.commit", "true", errstr);conf->set("auto.commit.interval.ms", "5000", errstr);conf->set("max.in.flight.requests.per.connection", "5", errstr);ConsumerCallback consume_callback;conf->set("consume.callback", &consume_callback, errstr);std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(conf.get(), errstr));if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;return 1;}RdKafka::ErrorCode resp = consumer->subscribe({topic});if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Subscribe failed: " << RdKafka::err2str(resp) << std::endl;return 1;}std::cout << "Consumer started, waiting for messages..." << std::endl;while (true) {consumer->consume(1000); // Wait up to 1 second each time.}return 0;}
g++ -std=c++11 -I/usr/local/include -L/usr/local/lib \\consumer.cpp -lrdkafka++ -lrdkafka -o consumer
./consumer
Parameter | Description |
broker | Access network. On the instance details page in the console, select the Access Mode module to copy the network information in the Network column. |
group.id | Consumer group name. You can define the name and see the consumer on the Consumer Group page after successful demo running. |
username | Username in the format of instance ID + # + configured username. Obtain the instance ID in the basic information area on the instance details page in the CKafka console, and choose ACL Policy Management > User Management in the console to create a user and set the username. |
password | Password is the configured user password. Choose ACL Policy Management > User Management in the console to create a user and set the password. |
topic1 topic2 | Topic name. Copy the name on the Topic List page in the console. |


Feedback