tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

librdkafka SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍上述 librdkafka 客户端的关键参数和最佳实践,以及常见问题。

生产者实践

版本选择

在使用 librdkafka 时,librdkafka 会自动根据 Kafka 集群的版本选择适当的协议版本进行通信,由于 kafka 的版本迭代更新较快,通常情况下,使用最新的 librdkafka 版本可以获得最佳的兼容性和性能。

生产者参数与调优

生产者参数

librdkafka 主要涉及如下关键参数,相关的参数和默认值如下:

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// Kafka集群的地址,多个地址用逗号分隔,默认为空
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// 发送消息的最大尝试次数,包括第一次尝试,默认为2
rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);

// 重试之间的回退时间(以毫秒为单位),默认为100
rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);

// 客户端请求超时时间(以毫秒为单位),默认为5000
rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);

// 客户端发送缓冲区大小(以字节为单位),默认为131072
rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);

// 客户端发送缓冲区中消息的最大数量,默认为100000
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);

// 客户端发送缓冲区中消息的最大总大小(以字节为单位),默认为1000000
rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);

// 客户端发送缓冲区的linger时间(以毫秒为单位),默认为0
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);

// 是否启用消息压缩,默认为0(不启用)
rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);

// 消息压缩级别,默认为0(自动选择)
rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);

// 客户端的ID,默认为rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);

// 生产者的最大并发请求数,即未收到broker响应的请求数,默认为1000000
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "1000000", NULL, 0);

// 客户端与Kafka集群的连接最大重试次数,默认为3次
rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);

// 客户端与Kafka集群的连接重试间隔(以毫秒为单位),默认为1000
rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);

// 客户端与Kafka集群的连接重试最大间隔(以毫秒为单位),默认为10000
rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);

// 客户端API版本的回退时间(以毫秒为单位),默认为10000
rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);

// 安全协议,默认为plaintext
rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);

// 客户端链接超时时间(以毫秒为单位),默认为30000,1.9版本以上可设置
rd_kafka_conf_set(conf, "socket.connection.setup.timeout.ms", "5000", NULL, 0);

// 其他SSL和SASL相关参数,请参考librdkafka官方文档

// 创建生产者实例
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);

参数说明调优

关于 max.in.flight.requests.per.connection 参数的优化
max.in.flight.requests.per.connection 在 librdkafka 中定义为单个连接能够并发发送请求的数量,默认值为1000000,max.in.flight 参数是max.in.flight.requests.per.connection 的 alias 参数,两者表示的意义一样。这个参数在标准的 Java SDK 里定义为 max.in.flight.requests.per.connection,默认值为5。该值过大容易造成服务端压力,从而引发稳定性问题,因此建议 librdkafka 的 SDK 值与开源 Java 的 SDK 保持一致,默认值设置5。
max.in.flight.requests.per.connection:5
max.in.flight:5
关于 acks 参数优化
acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为-1,表示消息发送给 Leader Broker 后,Leader 确认以及相应的 Follower 消息都写入完成后才返回。acks 参数还有以下可选值:0,1,-1。在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐。
关于 buffering 参数优化(缓存)
默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。对 librdkafka,默认提供5ms的攒批时间积攒消息。如果消息较小,可以适当增加queue.buffering.max.ms 的时间。
关于压缩参数优化
librdkafka 支持如下压缩参数:none, gzip, snappy, lz4, zstd。
在 librdkafka客户端中,支持以下几种压缩算法:
none:不使用压缩算法。
gzip:使用 GZIP 压缩算法。
snappy:使用 Snappy 压缩算法。
lz4:使用 LZ4 压缩算法。
zstd:使用 ZSTD 压缩算法。
要在 Producer 客户端中使用压缩算法,需要在创建生产者时设置 compression.type 参数。例如,要使用 LZ4 压缩算法,可以将 compression.type 设置为 lz4,虽然压缩算法的 CPU 压缩和 CPU解压缩,发生客户端,是一种用计算换带宽的优化方式,但是由于 Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 Gzip 压缩,服务端的压缩计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致 Broker 消息处理能力偏低,导致带宽吞吐更低。这种情况建议可以使用如下方式进行使用:
在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
{"Compression","CompressionLZ4"}
在 Producer 端将 messageCompression 当成正常消息发送。
在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

创建生产者实例

如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// 生产者消息发送回调
void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
rkmessage->len, rkmessage->partition);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// 设置Kafka集群的地址
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// 设置ack等于1,表示leader副本收到消息后即认为发送成功
rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
// 设置5,表示单连接同时发送5个请求
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight", "5", NULL, 0);

// 设置生产者消息发送回调
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

// 创建生产者实例
char errstr[512];
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\\n", errstr);
return 1;
}

// 创建主题实例
rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
if (!topic) {
fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}

// 发送消息
const char *message = "Hello, Kafka!";
if (rd_kafka_produce(
topic,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
(void *)message,
strlen(message),
NULL,
0,
NULL) == -1) {
fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
}

// 等待所有消息发送完成
while (rd_kafka_outq_len(producer) > 0) {
rd_kafka_poll(producer, 1000);
}

// 销毁主题实例
rd_kafka_topic_destroy(topic);

// 销毁生产者实例
rd_kafka_destroy(producer);

return 0;
}

消费者实践

消费者参数与调优

消费者参数

rd_kafka_conf_t *conf = rd_kafka_conf_new();

// 设置Kafka集群的地址
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// 设置消费组ID,默认为空
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// 设置消费者的自动提交间隔(以毫秒为单位),默认为5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// 设置消费者的自动提交开关,默认为true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// 设置消费者的自动偏移量重置策略,默认为latest
rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);

// 设置客户端的ID,默认为rdkafka
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
// 客户端链接超时时间(以毫秒为单位),默认为30000,1.9版本以上可设置
rd_kafka_conf_set(conf, "socket.connection.setup.timeout.ms", "5000", NULL, 0);

// 创建消费者实例
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

参数说明与调优

针对自动提交位点请求,建议 auto.commit.interval.ms 时间不要低于1000ms,因为频率过高的位点请求会导致 Broker CPU 很高,影响其他正常服务的读写。

创建消费者实例

提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// 消费者消息处理回调
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// 设置Kafka集群的地址
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// 设置消费组ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// 设置消费者的自动提交开关,默认为true
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);

// 设置消费者的自动提交间隔(以毫秒为单位),默认为5000
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);

// 创建消费者实例
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// 订阅主题
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// 消费消息
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);
}
}

// 取消订阅
rd_kafka_unsubscribe(consumer);

// 销毁消费者实例
rd_kafka_destroy(consumer);

return 0;
}

手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>

// 消费者消息处理回调
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}

int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();

// 设置Kafka集群的地址
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);

// 设置消费组ID
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);

// 关闭消费者的自动提交开关
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);

// 创建消费者实例
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}

// 订阅主题
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);

// 消费消息并手动提交位点
int message_count = 0;
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);

// 每隔10条消息手动提交位点
if (++message_count % 10 == 0) {
if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
printf("Offset %"PRId64" committed\\n", rkmessage->offset);
}
}

rd_kafka_message_destroy(rkmessage);
}
}

// 取消订阅
rd_kafka_unsubscribe(consumer);

// 销毁消费者实例
rd_kafka_destroy(consumer);

return 0;
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈