tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Kafka Python SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-20 15:59:40

背景

CKafka 的 Python 客户端有以下几个主要的库:
kafka-python:这是一个纯 Python 实现的 Kafka 客户端,支持 Kafka 0.8.2及更高版本。它提供了生产者、消费者和管理 Kafka 集群的 API。这个库易于使用,但性能可能不如基于 librdkafka 的客户端。
安装方法:pip install kafka-python
confluent-kafka-python:这个库是基于高性能的 C 库 librdkafka 实现的。它支持Kafka 0.9及更高版本,并提供了生产者、消费者和管理 Kafka 集群的 API。这个库性能更好,但可能需要安装额外的依赖。
安装方法:pip install confluent-kafka
aiokafka:这是一个基于 kafka-python 的异步 Kafka 客户端,使用 asyncio 库。这个库适用于需要异步编程的场景。
安装方法:pip install aiokafka
pykafka:这是一个支持 Kafka 0.8.x 版本的 Python 客户端。它提供了生产者、消费者和管理 Kafka 集群的 API。这个库已经不再积极维护,但仍然适用于需要支持较旧版本的 Kafka 的场景。
安装方法:pip install pykafka
在选择 Python Kafka 客户端时,请根据您的应用需求和 Kafka 版本选择合适的库。对于大多数场景,推荐使用 kafka-python 或 confluent-kafka-python,因为它们支持较新的 Kafka 版本并且功能更完善。如果您的应用需要异步编程,可以考虑使用 aiokafka。
本文重点介绍 kafka-python 的使用方式,官网文档参见 kafka-python

生产者实践

版本选择

在使用 kafka-python 时,需要先安装 kafka-python 库。可以使用以下命令进行安装:
pip install kafka-python

生产者参数与调优

生产者参数

Kafka Python 涉及如下关键参数,相关的参数和默认值如下:
from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers='localhost:9092', # 用于初始化连接到Kafka集群的broker列表,默认值为'localhost:9092'
client_id=None, # 自定义客户端ID,用于在Kafka服务端日志中识别客户端,默认值为None
key_serializer=None, # 用于将消息键序列化为字节的可调用对象,默认值为None
value_serializer=None, # 用于将消息值序列化为字节的可调用对象,默认值为None
compression_type=None, # 消息压缩类型,可选值为'gzip', 'snappy', 'lz4'None,表示不压缩,默认值为None
retries=0, # 重新发送失败的消息的次数,默认值为0
batch_size=16384, # 用于批处理消息的大小,单位为字节,默认值为16384
linger_ms=0, # 在批处理消息之前等待更多消息的最长时间,单位为毫秒,默认值为0
partitioner=None, # 用于确定消息分区的可调用对象,默认值为None
buffer_memory=33554432, # 用于缓冲待发送消息的内存总量,单位为字节,默认值为33554432
connections_max_idle_ms=540000, # 空闲连接的最长保持时间,单位为毫秒,默认值为540000
max_block_ms=60000, # 在达到缓冲区内存限制时,send()方法阻塞的最长时间,单位为毫秒,默认值为60000
max_request_size=1048576, # 发送到broker的请求的最大字节数,默认值为1048576
metadata_max_age_ms=300000, # 元数据在本地缓存中的最长存活时间,单位为毫秒,默认值为300000
retry_backoff_ms=100, # 两次重试之间的等待时间,单位为毫秒,默认值为100
request_timeout_ms=30000, # 客户端等待请求响应的最长时间,单位为毫秒,默认值为30000
receive_buffer_bytes=32768, # 用于接收数据的网络缓冲区大小,单位为字节,默认值为32768
send_buffer_bytes=131072, # 用于发送数据的网络缓冲区大小,单位为字节,默认值为131072
acks='all', # 消息确认机制,可选值为'0', '1','all',默认值为'all'
transactional_id=None, # 事务ID,用于标识生产者参与事务的唯一标识,默认值为None
transaction_timeout_ms=60000, # 事务超时时间,单位为毫秒,默认值为60000
enable_idempotence=False, # 是否启用幂等性,默认值为False
security_protocol='PLAINTEXT', # 安全协议类型,可选值为'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL',默认值为'PLAINTEXT'

参数说明调优

关于 acks 参数优化
acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为-1,表示消息发送给 Leader Broker 后,Leader 确认以及相应的 Follower 消息都写入完成后才返回。acks 参数还有以下可选值:0,1,-1。在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks参 数设置为-1,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐量。
关于 buffer_memory 参数优化(缓存)
默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。对于 Kafka Python Client,默认提供 linger_ms 为0ms 的攒批时间积攒消息,此处可以优化,适当增加值,例如设置100ms,进行聚合多个请求批量发送消息,提高吞吐。如果带宽较高,且单机内存充足,建议调大 buffer_memory 提高吞吐。
关于压缩参数优化
Kafka Python Client 支持如下压缩参数:none, gzip, snappy, lz4。
none:不使用压缩。
gzip:使用 GZIP 压缩。
snappy:使用 Snappy 压缩。
lz4:使用 LZ4 压缩。
要在 Producer 客户端中使用压缩消息,需要在创建生产者时设置 compression_type 参数。例如,要使用 LZ4 压缩算法,可以将 compression_type 设置为 lz4,虽然压缩消息的压缩和解压缩,发生客户端,是一种用计算换带宽的优化方式,但是由于 Broker 针对压缩消息存在校验行为会付出额外的计算成本,在低流量情况,不建议使用压缩,尤其是 gzip 压缩,Broker 校验计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致 Broker 处理其他请求的能力偏低,导致带宽吞吐更低。这种情况建议可以使用如下方式:
在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
{"Compression","lz4"}
在 Producer 端将 messageCompression 当成正常消息发送。
在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

创建生产者实例

如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。示例如下:
from kafka import KafkaProducer
import sys

# 参数配置
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC = 'test_topic'
SYNC = True
ACKS = '1' # leader副本确认写入即可
LINGER_MS = 500 # 延迟500ms发送
BATCH_SIZE = 16384 # 消息批次大小16KB

def create_producer(servers, acks, linger_ms, batch_size):
return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)

def send_message_sync(producer, topic, message):
future = producer.send(topic, message)
result = future.get(timeout=10)
print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")

def send_message_async(producer, topic, message):
def on_send_success(record_metadata):
print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")

def on_send_error(excp):
print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)
print(excp, file=sys.stderr)

future = producer.send(topic, message)
future.add_callback(on_send_success).add_errback(on_send_error)

def main():
producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)
messages = ['Hello Kafka', 'Async vs Sync', 'Demo']

if SYNC:
for message in messages:
send_message_sync(producer, TOPIC, message.encode('utf-8'))
else:
for message in messages:
send_message_async(producer, TOPIC, message.encode('utf-8'))

producer.flush()

if __name__ == '__main__':
main()



消费者实践

消费者参数与调优

消费者参数

from kafka import KafkaConsumer

# 创建一个KafkaConsumer对象,用于连接Kafka集群并消费消息
consumer = KafkaConsumer(
'topic_name', # 要订阅的主题列表
bootstrap_servers=['localhost:9092'], # Kafka集群的接入点
group_id=None, # 消费者组ID,用于将消费者分组,要加入动态分区分配(如果启用)并用于获取和提交偏移量的消费者组的名称。如果为 None,则禁用自动分区分配(通过组协调器)和偏移量提交。
client_id='kafka-python-{version}',#客户端Id,默认是kafka-python-{version}
api_version=None, #指定要使用的 Kafka API 版本。如果设置为 None,客户端将尝试通过API请求来启动不同版本的功能
enable_auto_commit=True, # 是否自动提交消费位置,默认为True
auto_commit_interval_ms=5000, # 自动提交消费位置的间隔,默认为5秒(5000毫秒)
auto_offset_reset='latest', # 消费者在读取的分区中的消费位置的策略,默认为'latest'(从最新的位置开始消费)
fetch_min_bytes=1, # 消费者在读取分区时的最小字节数,默认为1字节
fetch_max_wait_ms=500, # 在没有新的消费数据,默认等待500ms
fetch_max_bytes=52428800, # 消费者在读取分区时的最大字节数,默认为52428800字节(50MB)
max_poll_interval_ms=300000 # 消参数默认值为300000毫秒(5分钟)。如果消费者在5分钟内没有发送心跳信号,它将被认为已经失去连接,并将被从消费者组中移除。在这种情况下,其他消费者将接管被移除的消费者的分区并发起重平衡。
retry_backoff_ms=100, # 重试间隔时间,默认为100毫秒
reconnect_backoff_max_ms=1000, # 重新连接到多次连接失败的Broker的间隔时间最大值(以毫秒为单位)。如果连接失败,将在每次连续连接失败时呈指数增加,直至达到此最大值。一旦达到最大值,重新连接尝试将以该固定速率定期继续。
request_timeout_ms=305000, # 客户端请求超时(以毫秒为单位)
session_timeout_ms=10000, # session_timeout_ms (int) – 使用 Kafka 组管理工具时用于检测故障的超时时间。消费者定期发送心跳给Broker表明其活跃度。如果在此会话超时到期之前Broker没有收到心跳,则Broker将改消费组从组中删除该消费者并启动重新平衡。
heartbeat_interval_ms=3000, # 使用 Kafka 的组管理工具时,向消费者协调器发出心跳之间的预期时间(以毫秒为单位)。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于 session_timeout_ms,但通常不应高于该值的 1/3。
receive_buffer_bytes=32768,#读取数据时使用的 TCP 接收缓冲区 (SO_RCVBUF) 的大小。默认值:无(依赖于系统默认值),默认为32768。
send_buffer_bytes=131072# 发送数据时使用的 TCP 发送缓冲区 (SO_SNDBUF) 的大小。默认值:无(依赖于系统默认值),131072。
)

for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

参数说明与调优

1. max_poll_interval_ms 是 Kafka Python Consumer 的一个配置参数,它用于指定 Consumer 在两次 poll 操作之间的最大延迟。这个参数的主要作用是控制 Consumer 的 存活,也就是判断 Consumer 是否还活着。如果 Consumer 在 max_poll_interval_ms 指定的时间内没有进行 poll 操作,那么 Kafka 认为这个 Consumer 已经挂掉,会触发 Consumer 的 rebalance 操作。这个参数的设置需要根据实际的消费速度来调整。如果设置得太小,可能会导致 Consumer 频繁地触发 rebalance 操作,增加了 Kafka 的负担;如果设置得太大,可能会导致 Consumer 在出现问题时不能及时被 Kafka 检测到,从而影响了消息的消费。建议在高吞吐下可以时长增加该值的设置。
2. 针对自动提交位点请求,建议 auto_commit_interval_ms 时间不要低于1000ms,因为频率过高的位点请求会导致 Broker CPU 很高,影响其他正常服务的读写。

创建消费者实例

Kafka Python 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

自动提交位点

自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。建议间隔5s提交位点。

# auto_commit_consumer_interval.py
from kafka import KafkaConsumer
from time import sleep

consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='auto_commit_group',
auto_commit_interval_ms=5000 # 设置自动提交位点的间隔为5000毫秒(5秒)
)

for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
sleep(1)

手动提交位点

手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。

# manual_commit_consumer.py
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from time import sleep

consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='manual_commit_group',
enable_auto_commit=False
)

count = 0
for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
count += 1

if count % 10 == 0:
try:
consumer.commit()
except KafkaError as e:
print(f"Error while committing offset: {e}")

sleep(1)


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈