tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

librdkafka SDK

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:14

Background

TDMQ CKafka is a distributed streaming platform used for building real-time data pipelines and streaming applications. It provides features such as high throughput, low latency, scalability, and fault tolerance.
This article focuses on introducing the key parameters and best practices of the aforementioned librdkafka client, as well as common issues.

Producer Practices

Version Selection

When using librdkafka, librdkafka automatically selects the appropriate protocol version for communication based on the version of the Kafka cluster. Since Kafka versions are updated frequently, generally, using the latest librdkafka version can achieve the best compatibility and performance.

Producer Parameters and Tuning

Producer Parameters

librdkafka primarily involves the following key parameters. The related parameters and their default values are as follows:

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Kafka cluster address(es), multiple addresses separated by commas, default is empty
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Maximum number of attempts for message delivery, including the initial attempt, default is 2
rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);

// Backoff time between retries in milliseconds, default is 100
rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);

// Client request timeout in milliseconds, default is 5000
rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);

// Client send buffer size in bytes, default is 131072
rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);

// Maximum number of messages in the client send buffer, default is 100000
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);

// Maximum total size of messages in the client send buffer in bytes, default is 1000000
rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);

// linger time for the client send buffer in milliseconds, default is 0
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);

// Whether to enable message compression, default is 0 (not enabled)
rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);

// Message compression level, default is 0 (automatically selected)
rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);

// Client ID, default is rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);

// Maximum number of concurrent requests for the producer, i.e., the number of unacknowledged requests by the broker, default is 1000000
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "1000000", NULL, 0);

// Maximum number of connection retries to the Kafka cluster for the client, default is 3
rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);

// Connection retry interval to the Kafka cluster for the client in milliseconds, default is 1000
rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);

// Maximum connection retry interval to the Kafka cluster for the client in milliseconds, default is 10000
rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);

// Fallback time for client API version in milliseconds, default is 10000
rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);

// Security protocol, default is plaintext
rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);

// For other SSL and SASL related parameters, see the librdkafka official documentation

// Create a producer instance.
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);

Parameter Description Tuning

Optimization of the max.in.flight.requests.per.connection Parameter
max.in.flight.requests.per.connection is defined in librdkafka as the number of requests that can be sent concurrently on a single connection, with a default value of 1000000. The max.in.flight parameter is an alias for max.in.flight.requests.per.connection, both representing the same meaning. In the standard Java SDK, this parameter is defined as max.in.flight.requests.per.connection with a default value of 5. Setting this value too high may cause pressure on the server and lead to stability issues. Therefore, it is recommended to align the SDK value of librdkafka with that of the open-source Java SDK, setting the default value to 5.
max.in.flight.requests.per.connection:5
max.in.flight:5
On acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism when producers send messages. Its default value is -1, meaning the producer only returns after the message is sent to the Leader Broker and both the Leader acknowledgment and corresponding Follower messages are fully written. The acks parameter also supports the following optional values: 0, 1, -1. In cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput. Therefore:
In scenarios involving online business messages where throughput requirements are not high, setting the acks parameter to -1 ensures that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can improve throughput.
About buffering Parameter Optimization (Caching)
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the client's message sending throughput. For librdkafka, it provides a default batching time of 5 ms to accumulate messages. If messages are small, you can appropriately increase the queue.buffering.max.ms value.
About Compression Parameter Optimization
librdkafka supports the following compression parameters: none, gzip, snappy, lz4, zstd.
In the librdkafka client, the following compression algorithms are supported:
none: No compression algorithm is used.
gzip: Uses the GZIP compression algorithm.
snappy: Uses the Snappy compression algorithm.
lz4: Uses the LZ4 compression algorithm.
zstd: Uses the ZSTD compression algorithm.
To use a compression algorithm in the Producer client, set the compression.type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression.type to lz4. Although compression and decompression occur on the client side, trading computational resources for bandwidth optimization, Brokers incur additional computational costs for validating compressed messages—especially with Gzip compression, where server-side computational overhead can be significant. In some cases, this may result in diminishing returns, where increased computation reduces the Broker's message processing capacity and ultimately lowers bandwidth throughput. For such scenarios, consider the following approach:
In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","CompressionLZ4"}
At the Producer end, send messageCompression as a normal message.
On the Consumer side, read the message key to obtain the compression method used and independently decompress.

Create Producer Instance

If the application requires higher throughput, an asynchronous producer can be used to increase message delivery speed. Simultaneously, messages can be sent in batches to reduce network overhead and IO consumption. If the application demands higher reliability, a synchronous producer should be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Producer message sending callback
void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
rkmessage->len, rkmessage->partition);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set ack=1, which means the message is considered successfully sent after the leader replica receives it.
rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
// Set to 5, which means a single connection sends 5 requests simultaneously.
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "5", NULL, 0);

// Set the producer message sending callback
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

// Create a producer instance.
char errstr[512];
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\\n", errstr);
return 1;
}

// Create a topic instance.
rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
if (!topic) {
fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}

// Send messages.
const char *message = "Hello, Kafka!";
if (rd_kafka_produce(
topic,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
(void *)message,
strlen(message),
NULL,
0,
NULL) == -1) {
fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
}

// Wait for all messages to be sent.
while (rd_kafka_outq_len(producer) > 0) {
rd_kafka_poll(producer, 1000);
}

// Terminate the topic instance.
rd_kafka_topic_destroy(topic);

// Terminate the producer instance.
rd_kafka_destroy(producer);

return 0;
}

Consumer Practices

Consumer Parameters and Tuning

Consumer Parameters

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID; default is empty.
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Set the auto-commit interval for consumers in milliseconds; default is 5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// Set the auto-commit switch for consumers; default is true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// Set the auto offset reset policy for the consumer; default is latest
rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);

// Set the client ID; default is rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

Parameter Description and Tuning

For auto-commit offset requests, it is recommended that the auto.commit.interval.ms value not be set below 1000ms. Excessively high frequency offset requests can lead to high Broker CPU usage, affecting read and write operations of other normal services.

Create Consumer Instance

Provides a subscription-based model for creating consumers, offering two methods for offset commits: manual offset commit and automatic offset commit.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Consumer message handling callback
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Set the auto-commit switch for consumers; default is true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// Set the auto-commit interval for consumers in milliseconds; default is 5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// Subscribe to a Topic.
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// Consumption message
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);
}
}

// Unsubscribe
rd_kafka_unsubscribe(consumer);

// Terminate the consumer instance.
rd_kafka_destroy(consumer);

return 0;
}

Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// Consumer message handling callback
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Set the Kafka cluster address
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// Set the consumer group ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// Disable the consumer's auto-commit feature
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);

// Create a consumer instance.
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// Subscribe to a Topic.
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// Consume messages and manually commit offsets
int message_count = 0;
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);

// Manually commit offsets every 10 messages
if (++message_count % 10 == 0) {
if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
printf("Offset %"PRId64" committed\\n", rkmessage->offset);
}
}

rd_kafka_message_destroy(rkmessage);
}
}

// Unsubscribe
rd_kafka_unsubscribe(consumer);

// Terminate the consumer instance.
rd_kafka_destroy(consumer);

return 0;
}


도움말 및 지원

문제 해결에 도움이 되었나요?

피드백