rd_kafka_conf_t *conf = rd_kafka_conf_new();// Kafka cluster address(es), multiple addresses separated by commas, default is emptyrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// Maximum number of attempts for message delivery, including the initial attempt, default is 2rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);// Backoff time between retries in milliseconds, default is 100rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);// Client request timeout in milliseconds, default is 5000rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);// Client send buffer size in bytes, default is 131072rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);// Maximum number of messages in the client send buffer, default is 100000rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);// Maximum total size of messages in the client send buffer in bytes, default is 1000000rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);// linger time for the client send buffer in milliseconds, default is 0rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);// Whether to enable message compression, default is 0 (not enabled)rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);// Message compression level, default is 0 (automatically selected)rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);// Client ID, default is rdkafkard_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);// Maximum number of concurrent requests for the producer, i.e., the number of unacknowledged requests by the broker, default is 1000000rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);rd_kafka_conf_set(conf, "max.in.flight", "1000000", NULL, 0);// Maximum number of connection retries to the Kafka cluster for the client, default is 3rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);// Connection retry interval to the Kafka cluster for the client in milliseconds, default is 1000rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);// Maximum connection retry interval to the Kafka cluster for the client in milliseconds, default is 10000rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);// Fallback time for client API version in milliseconds, default is 10000rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);// Security protocol, default is plaintextrd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);// For other SSL and SASL related parameters, see the librdkafka official documentation// Create a producer instance.rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
max.in.flight.requests.per.connection:5max.in.flight:5
{"Compression","CompressionLZ4"}#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// Producer message sending callbackvoid dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));} else {fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",rkmessage->len, rkmessage->partition);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// Set the Kafka cluster addressrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// Set ack=1, which means the message is considered successfully sent after the leader replica receives it.rd_kafka_conf_set(conf, "acks", "1", NULL, 0);// Set to 5, which means a single connection sends 5 requests simultaneously.rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", NULL, 0);rd_kafka_conf_set(conf, "max.in.flight", "5", NULL, 0);// Set the producer message sending callbackrd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);// Create a producer instance.char errstr[512];rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!producer) {fprintf(stderr, "Failed to create producer: %s\\n", errstr);return 1;}// Create a topic instance.rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);if (!topic) {fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_destroy(producer);return 1;}// Send messages.const char *message = "Hello, Kafka!";if (rd_kafka_produce(topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY,(void *)message,strlen(message),NULL,0,NULL) == -1) {fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));}// Wait for all messages to be sent.while (rd_kafka_outq_len(producer) > 0) {rd_kafka_poll(producer, 1000);}// Terminate the topic instance.rd_kafka_topic_destroy(topic);// Terminate the producer instance.rd_kafka_destroy(producer);return 0;}
rd_kafka_conf_t *conf = rd_kafka_conf_new();// Set the Kafka cluster addressrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// Set the consumer group ID; default is empty.rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// Set the auto-commit interval for consumers in milliseconds; default is 5000rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);// Set the auto-commit switch for consumers; default is truerd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);// Set the auto offset reset policy for the consumer; default is latestrd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);// Set the client ID; default is rdkafkard_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);// Create a consumer instance.char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// Consumer message handling callbackvoid msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] ""offset %"PRId64": %s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition, rkmessage->offset,rd_kafka_message_errstr(rkmessage));} else {printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// Set the Kafka cluster addressrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// Set the consumer group IDrd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// Set the auto-commit switch for consumers; default is truerd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);// Set the auto-commit interval for consumers in milliseconds; default is 5000rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);// Create a consumer instance.char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {fprintf(stderr, "Failed to create consumer: %s\\n", errstr);return 1;}// Subscribe to a Topic.rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_topic_partition_list_destroy(topics);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(topics);// Consumption messagewhile (1) {rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);if (rkmessage) {msg_consume(rkmessage, NULL);rd_kafka_message_destroy(rkmessage);}}// Unsubscriberd_kafka_unsubscribe(consumer);// Terminate the consumer instance.rd_kafka_destroy(consumer);return 0;}
#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// Consumer message handling callbackvoid msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] ""offset %"PRId64": %s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition, rkmessage->offset,rd_kafka_message_errstr(rkmessage));} else {printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// Set the Kafka cluster addressrd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// Set the consumer group IDrd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// Disable the consumer's auto-commit featurerd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);// Create a consumer instance.char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {fprintf(stderr, "Failed to create consumer: %s\\n", errstr);return 1;}// Subscribe to a Topic.rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_topic_partition_list_destroy(topics);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(topics);// Consume messages and manually commit offsetsint message_count = 0;while (1) {rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);if (rkmessage) {msg_consume(rkmessage, NULL);// Manually commit offsets every 10 messagesif (++message_count % 10 == 0) {if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));} else {printf("Offset %"PRId64" committed\\n", rkmessage->offset);}}rd_kafka_message_destroy(rkmessage);}}// Unsubscriberd_kafka_unsubscribe(consumer);// Terminate the consumer instance.rd_kafka_destroy(consumer);return 0;}
Feedback