tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Java SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
Kafka Clients:Kafka 自带的客户端,通过 Java 实现,是 Kafka 生产和消费标准协议的客户端。
本文着重介绍上述 Java 客户端的关键参数,实践教程以及常见问题。

生产者实践

版本选择

Kafka 客户端和集群之间的兼容性非常重要,通常情况下,较新版本的客户端可以兼容较旧版本的集群,但反之则不一定成立。一般情况下,CKafka实例 Broker 在部署后是明确的,因此可以直接根据 Broker 的版本选择相匹配的客户端的版本。
Java 生态中,广泛使用 Spring Kafka,其中 Spring Kafka 版本和 Kafka Broker 版本的对应关系,可以参见 Spring 官方网址的版本对应关系

生产者参数与调优

生产者参数

在使用 Kafka Client 客户端写入 Kafka 时候,需要配置如下关键参数,相关的参数和默认值如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); //Kafka集群的地址列表,格式为host1:port1,host2:port2。生产者会使用这个列表来找到集群并建立连接。
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者关键参数以及默认值
props.put(ProducerConfig.ACKS_CONFIG, "1");//acks,默认值为1,消息确认的级别。0表示不等待确认;1表示等待Leader副本写入;all或者-1表示等待所有副本写入。
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);//batch.size,批量发送的大小,单位为字节。生产者会将多个消息打包成一个批次发送,以提高性能。默认大小16384字节。
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);//buffer.memory,生产者用于缓存待发送消息的内存大小,单位为字节。默认33554432,也就是32MB
props.put(ProducerConfig.CLIENT_ID_CONFIG, "");//client.id,客户端ID。这个ID可以用于在服务端日志中识别消息来源。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);//connections.max.idle.ms连接的最大空闲时间,单位为毫秒。超过这个时间的空闲连接会被关闭。默认540s
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);//delivery.timeout.ms消息的最大投递时间,单位为毫秒。超过这个时间的未确认消息会被认为发送失败。默认120s
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");//interceptor.classes拦截器类列表。生产者会在发送消息前后调用这些拦截器。
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//linger.ms延迟发送的时间,单位为毫秒。生产者会等待一段时间以便将更多消息打包成一个批次发送。
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);//max.block.ms,生产者在获取元数据或缓存空间时的最大阻塞时间,单位为毫秒。
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);//max.request.size,请求的最大大小,单位为字节
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);//metadata.max.age.ms元数据的最大寿命,单位为毫秒。超过这个时间的元数据会被刷新。
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");//metric.reporters度量报告器类列表。生产者会使用这些报告器来报告度量信息。
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");//partitioner.class分区器类。生产者会使用这个分区器来决定每个消息发送到哪个分区。
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);//receive.buffer.bytes接收缓冲区的大小,单位为字节。
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);//send.buffer.bytes发送缓冲区的大小,单位为字节。
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);//reconnect.backoff.max.ms重连最大间隔时间,单位为毫秒。
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);//reconnect.backoff.ms重连间隔时间,单位毫秒
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);//request.timeout.ms请求的超时时间,单位为毫秒。
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);//retries发送失败时的重试次数。
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);//retry.backoff.ms重试的间隔时间,单位为毫秒。
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");//client.dns.lookupDNS查找策略。可选值为default、use_all_dns_ips、resolve_canonical_bootstrap_servers_only。
props.put(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 5000); // socket.connection.setup.timeout.ms 控制连接初始化超时时间,单位为毫秒,2.7.0 以上版本可配置。
// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// 发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功:key=" + key + ", value=" + value);
} else {
System.err.println("消息发送失败:" + exception.getMessage());
}
}
});
}

// 关闭生产者
producer.close();
}
}

参数说明调优

关于 acks 参数优化
acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为1,表示消息发送给 Leader Broker 后,Leader 确认消息写入后即返回。acks参数还有以下可选值:
0: 不等待任何确认,直接返回。
1: 等待 Leader 副本确认写入后返回。
-1或者 all: 等待 Leader 副本以及相关的 Follower 副本确认写入后返回。
由上可知,在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性,但是会牺牲写入吞吐和性能,时延会增加。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐量。
关于 Batch 相关参数优化
默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。在高吞吐场景下,可以配合计算和设置:
batch.size:默认16K。
linger.ms:默认为0,可以适当增加耗时,如设置100ms,尽可能聚合更多消息批量发送消息。
buffer.memory:默认32MB,对于大流量 Producer,在堆内存充足情况下可以设置更大,如设置256MB。
关于事务参数优化

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);//enable.idempotence,是否启用幂等性。如果启用,生产者会确保每个消息只被发送一次,即使在网络错误或重试的情况下。 //是否需要幂等,在事务场景下需要设置为true
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);//max.in.flight.requests.per.connection,每个连接上的最大未确认请求数。在事务场景不要超过5
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);//transactional.id事务ID。如果设置了这个参数,生产者会启用事务功能。
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);//transaction.timeout.ms事务的超时时间,单位为毫秒,可以适当延长事务时间。
需要强调,事务因为要保障消息的 exactly once 语义,因此会额外付出更多的计算资源。
对于事务场景,可以适当增加事务超时时间,容忍高吞吐场景下,写入延时带来的抖动。
关于压缩参数优化
Kafka Java Client 支持如下压缩参数:
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");//compression.type消息压缩类型。默认none不压缩,可选值为none、gzip、snappy、lz4、zstd。

目前支持以下几种压缩配置:
none:不使用压缩算法。
gzip:使用 GZIP 压缩算法。
snappy:使用 Snappy 压缩算法。
lz4:使用 LZ4 压缩算法。
zstd:使用 ZSTD 压缩算法。
要在 Kafka Java 客户端中使用压缩消息,需要在创建生产者时设置 compression.type 参数。例如,要使用 LZ4 压缩算法,可以将compression.type 设置为lz4。
Kafka 压缩消息是一种用计算换带宽的优化方式,虽然 Kafka 压缩消息的压缩和解压缩,发生在客户端,但是由于Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 gzip 压缩,服务端对该压缩消息校验的计算成本会非常大,在某种程度上可能会出现得不偿失的情况,因为计算的增加导致 Broker CPU 利用率很高,降低了其他请求的处理能力,导致整体性能更低。这种情况建议可以使用如下方式规避 Broker 的校验:
1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 中存储压缩方式:
{"Compression","lz4"}
2. 在 Producer 端将 messageCompression 当成正常消息发送。
3. 在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

创建生产者实例

如果应用程序需要更高的吞吐量,可以使用异步发送,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,可以使用同步发送,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

同步发送

在 Kafka Java Client 客户端中,同步发送的示例代码如下:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerSyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者参数
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 同步发送消息
for (int i = 0; i < 10; i++) {
String key = "sync-key-" + i;
String value = "sync-value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

try {
// 发送消息并等待结果
RecordMetadata metadata = producer.send(record).get();
System.out.println("同步发送成功:key=" + key + ", value=" + value);
} catch (InterruptedException | ExecutionException e) {
System.err.println("同步发送失败:" + e.getMessage());
}
}

// 关闭生产者
producer.close();
}
}

异步发送

异步发送:异步发送消息时不会阻塞当前线程,生产者的吞吐量较高,但是需要通过回调函数来处理消息结果,示例如下:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerAsyncExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";

public static void main(String[] args) {
// 创建Kafka生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 设置生产者参数
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);

// 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 异步发送消息
for (int i = 0; i < 10; i++) {
String key = "async-key-" + i;
String value = "async-value-" + i;

// 创建消息记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);

// 发送消息并设置回调函数
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("异步发送成功:key=" + key + ", value=" + value);
} else {
System.err.println("异步发送失败:" + exception.getMessage());
}
}
});
}

// 关闭生产者
producer.close();
}
}



消费者实践

消费者参数


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerDemo {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // "bootstrap.servers",Kafka集群的地址,没有默认值
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "key.deserializer",消息键的反序列化方式,没有默认值
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // "value.deserializer",消息值的反序列化方式,没有默认值
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); // "group.id",消费者组ID,没有默认值
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // "auto.offset.reset",位点不存在时的处理方式,"latest"表示从最新的消息开始消费,默认值为"latest"
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // "enable.auto.commit",是否自动提交位点,默认值为"true"
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // "auto.commit.interval.ms",自动提交位点的间隔时间,单位为毫秒,默认值为"5000"
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // "session.timeout.ms",消费者组成员的会话超时时间,单位为毫秒,默认值为"10000"
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // "max.poll.records",单次poll的最大消息数,默认值为"500"
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // "max.poll.interval.ms",两次poll操作间的最大允许间隔时间,单位为毫秒,默认值为"300000"
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1"); // "fetch.min.bytes",服务器返回的最小数据,如果设置大于1,服务器会等待直到累计的数据量大于这个值,默认值为"1"
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800"); // "fetch.max.bytes",服务器返回的最大数据量,单位为字节,默认值为"52428800"
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // "fetch.max.wait.ms",服务器等待满足fetch.min.bytes条件的最大时间,单位为毫秒,默认值为"500"
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // "heartbeat.interval.ms",心跳间隔时间,单位为毫秒,默认值为"3000"
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id"); // "client.id",客户端ID,没有默认值
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"); // "request.timeout.ms",客户端请求超时时间,单位为毫秒,默认值为"30000"
properties.put(ConsumerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000"); // "socket.connection.setup.timeout.ms",控制连接初始化超时时间,单位为毫秒,2.7.0 以上版本可配置。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}

参数调优

1. 在使用 Kafka 消费者时,我们可以通过调整一些参数来优化性能。以下是一些常见的参数调优方案:
fetch.min.bytes:如果不确定消息最低大小,这个参数建议设置为1,如果明确消息最小值,可以设置该值为最小消息大小。
max.poll.records:这个参数可以根据应用的处理能力进行调整。如果您的应用可以处理更多的记录,可以将这个参数设置为更大的值,以减少 poll操作的次数。
auto.commit.interval.ms:这个参数可以根据您的应用的需求进行调整,一般自动提交位点场景,建议保持默认值5000ms。注意,过于频繁的位点提交会影响性能,额外占用 Broker 的计算资源。
client.id:可以为每个消费者设置一个唯一的 ID,以便在监控和日志中区分不同的消费者。
以上是一些常见的参数调优方案,但具体的最佳设置可能会根据您的应用的特性和需求有所不同。在调优参数时,请记住始终进行性能测试,以确保您的设置可以达到预期的效果。
2. 对于 rebalance 时间频繁和消费线程阻塞问题,参考以下说明进行参数优化:
session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> *<max.poll.interval.ms>的积。
max.poll.interval.ms:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

创建消费者实例

Kafka Java Client 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。注意,自动提交位点时间间隔 auto.commit.interval.ms 不要设置太短,否则容易导致 Broker CPU 偏高,影响其他请求处理。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerAutoCommitExample {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group";

public static void main(String[] args) {
// 创建Kafka消费者配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 开启自动提交位点
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 自动提交间隔,单位:5000毫秒

// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));

// 消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
// 关闭消费者
consumer.close();
}
}
}



手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerManualCommitExample {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));

int count = 0;
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
count++;
if (count % 10 == 0) {
consumer.commitSync();
System.out.println("Committed offsets.");
}
}
}
} finally {
consumer.close();
}
}
}

Assign 消费

Kafka Java Client 的 assign 消费模式允许消费者直接指定订阅的分区,而不是通过订阅主题来自动分配分区。这种模式适用于需要手动控制消费分区的场景,例如:为了实现特定的负载均衡策略,或者在某些情况下跳过某些分区。一般流程为使用 assign 方法来手动指定消费者消费的分区,通过seek 方法来设置开始消费的位点,然后执行消费逻辑,使用示例如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerAssignAndSeekApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "my-topic";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

// 设置消费的起始位点
long startPosition0 = 10L;
long startPosition1 = 20L;
consumer.seek(partition0, startPosition0);
consumer.seek(partition1, startPosition1);

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync(); // 手动提交位点
}
} finally {
consumer.close();
}
}
}

Kafka Java Client 生产消费常见问题

Kafka Java Producer 无法成功发送消息
首先排查 Kafka 集群的 IP 和端口能够正常连接,若不能请先解决通信问题。
其次检查是否正确配置接入点,版本是否和 Broker 版本匹配,可以参考实践教程的发送 demo。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈