Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);}

]$ bin/kafka-consumer-groups.sh --bootstrap-server 9.146.153.249:9092 --listCR
]$ bin/kafka-consumer-groups.sh --bootstrap-server 9.146.153.249:9092 --describe --group CRNote: This will not show information about old Zookeeper-based consumers.
]$ bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test --group test1
--describe 命令是可以看到详情的,如下所示:
# 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间session.timeout.ms=10000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一heartbeat.interval.ms=3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者max.poll.interval.ms=300000
文档反馈