tencent cloud

librdkafka SDK
Last updated:2026-01-20 17:10:14
librdkafka SDK
Last updated: 2026-01-20 17:10:14

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
This article focuses on introducing the key parameters and best practices of the aforementioned librdkafka client, as well as common issues.

Producer Practices

Version Selection

When using librdkafka, librdkafka automatically selects the appropriate protocol version for communication based on the version of the Kafka cluster. Since Kafka versions are updated frequently, generally, using the latest librdkafka version can achieve the best compatibility and performance.

Producer Parameters and Tuning

Producer Parameters

librdkafka primarily involves the following key parameters. The related parameters and their default values are as follows:

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Kafka cluster address(es), multiple addresses separated by commas, default is empty
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Maximum number of attempts for message delivery, including the initial attempt, default is 2
rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);

// Backoff time between retries in milliseconds, default is 100
rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);

// Client request timeout in milliseconds, default is 5000
rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);

// Client send buffer size in bytes, default is 131072
rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);

// Maximum number of messages in the client send buffer, default is 100000
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);

// Maximum total size of messages in the client send buffer in bytes, default is 1000000
rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);

// linger time for the client send buffer in milliseconds, default is 0
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);

// Whether to enable message compression, default is 0 (not enabled)
rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);

// Message compression level, default is 0 (automatically selected)
rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);

// Client ID, default is rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);

// Maximum number of concurrent requests for the producer, i.e., the number of unacknowledged requests by the broker, default is 1000000
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "1000000", NULL, 0);

// Maximum number of connection retries to the Kafka cluster for the client, default is 3
rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);

// Connection retry interval to the Kafka cluster for the client in milliseconds, default is 1000
rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);

// Maximum connection retry interval to the Kafka cluster for the client in milliseconds, default is 10000
rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);

// Fallback time for client API version in milliseconds, default is 10000
rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);

// Security protocol, default is plaintext
rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);

// For other SSL and SASL related parameters, see the librdkafka official documentation

// Create a producer instance.
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);

Parameter Description Tuning

Optimization of the max.in.flight.requests.per.connection Parameter
max.in.flight.requests.per.connection is defined in librdkafka as the number of requests that can be sent concurrently on a single connection, with a default value of 1000000. The max.in.flight parameter is an alias for max.in.flight.requests.per.connection, both representing the same meaning. In the standard Java SDK, this parameter is defined as max.in.flight.requests.per.connection with a default value of 5. Setting this value too high may cause pressure on the server and lead to stability issues. Therefore, it is recommended to align the SDK value of librdkafka with that of the open-source Java SDK, setting the default value to 5.
max.in.flight.requests.per.connection:5
max.in.flight:5
On acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism when producers send messages. Its default value is -1, meaning the producer only returns after the message is sent to the Leader Broker and both the Leader acknowledgment and corresponding Follower messages are fully written. The acks parameter also supports the following optional values: 0, 1, -1. In cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput. Therefore:
In scenarios involving online business messages where throughput requirements are not high, setting the acks parameter to -1 ensures that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can improve throughput.
About buffering Parameter Optimization (Caching)
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the client's message sending throughput. For librdkafka, it provides a default batching time of 5 ms to accumulate messages. If messages are small, you can appropriately increase the queue.buffering.max.ms value.
About Compression Parameter Optimization
librdkafka supports the following compression parameters: none, gzip, snappy, lz4, zstd.
In the librdkafka client, the following compression algorithms are supported:
none: No compression algorithm is used.
gzip: Uses the GZIP compression algorithm.
snappy: Uses the Snappy compression algorithm.
lz4: Uses the LZ4 compression algorithm.
zstd: Uses the ZSTD compression algorithm.
To use a compression algorithm in the Producer client, set the compression.type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression.type to lz4. Although compression and decompression occur on the client side, trading computational resources for bandwidth optimization, Brokers incur additional computational costs for validating compressed messages—especially with Gzip compression, where server-side computational overhead can be significant. In some cases, this may result in diminishing returns, where increased computation reduces the Broker's message processing capacity and ultimately lowers bandwidth throughput. For such scenarios, consider the following approach:
In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","CompressionLZ4"}
At the Producer end, send messageCompression as a normal message.
On the Consumer side, read the message key to obtain the compression method used and independently decompress.

Create Producer Instance

If the application requires higher throughput, an asynchronous producer can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, a synchronous producer should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Producer message sending callback
void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
rkmessage->len, rkmessage->partition);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set ack=1, which means the message is considered successfully sent after the leader replica receives it.
rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
// Set to 5, which means a single connection sends 5 requests simultaneously.
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "5", NULL, 0);

// Set the producer message sending callback
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

// Create a producer instance.
char errstr[512];
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\\n", errstr);
return 1;
}

// Create a topic instance.
rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
if (!topic) {
fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}

// Send messages.
const char *message = "Hello, Kafka!";
if (rd_kafka_produce(
topic,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
(void *)message,
strlen(message),
NULL,
0,
NULL) == -1) {
fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
}

// Wait for all messages to be sent.
while (rd_kafka_outq_len(producer) > 0) {
rd_kafka_poll(producer, 1000);
}

// Terminate the topic instance.
rd_kafka_topic_destroy(topic);

// Terminate the producer instance.
rd_kafka_destroy(producer);

return 0;
}

Consumer Practices

Consumer Parameters and Tuning

Consumer Parameters

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID; default is empty.
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Set the auto-commit interval for consumers in milliseconds; default is 5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// Set the auto-commit switch for consumers; default is true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// Set the auto offset reset policy for the consumer; default is latest
rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);

// Set the client ID; default is rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

Parameter Description and Tuning

For auto-commit offset requests, it is recommended that the auto.commit.interval.ms value not be set below 1000ms. Excessively high frequency offset requests can lead to high Broker CPU usage, affecting read and write operations of other normal services.

Create Consumer Instance

Provides a subscription-based model for creating consumers, offering two methods for offset commits: manual offset commit and automatic offset commit.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Consumer message handling callback
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Set the auto-commit switch for consumers; default is true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// Set the auto-commit interval for consumers in milliseconds; default is 5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// Subscribe to a Topic.
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// Consumption message
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);
}
}

// Unsubscribe
rd_kafka_unsubscribe(consumer);

// Terminate the consumer instance.
rd_kafka_destroy(consumer);

return 0;
}

Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Consumer message handling callback
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Disable the consumer's auto-commit feature
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// Subscribe to a Topic.
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// Consume messages and manually commit offsets
int message_count = 0;
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);

// Manually commit offsets every 10 messages
if (++message_count % 10 == 0) {
if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
printf("Offset %"PRId64" committed\\n", rkmessage->offset);
}
}

rd_kafka_message_destroy(rkmessage);
}
}

// Unsubscribe
rd_kafka_unsubscribe(consumer);

// Terminate the consumer instance.
rd_kafka_destroy(consumer);

return 0;
}

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback