tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

Kafka Python SDK

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:13

Background

CKafka's Python client has the following main libraries:
kafka-python: This is a pure Python implementation of a Kafka client that supports Kafka 0.8.2 and later versions. It provides APIs for producers, consumers, and managing Kafka clusters. This library is easy to use but may not perform as well as clients based on librdkafka.
Installation method: pip install kafka-python
confluent-kafka-python: This library is implemented based on the high-performance C library librdkafka. It supports Kafka 0.9 and later versions, providing APIs for producers, consumers, and managing Kafka clusters. This library offers better performance but may require installing additional dependencies.
Installation method: pip install confluent-kafka
aiokafka: This is an asynchronous Kafka client based on kafka-python, using the asyncio library. This library is suitable for scenarios requiring asynchronous programming.
Installation method: pip install aiokafka
pykafka: This is a Python client that supports Kafka version 0.8.x. It provides APIs for producers, consumers, and managing Kafka clusters. This library is no longer actively maintained but is still suitable for scenarios requiring support for older versions of Kafka.
Installation method: pip install pykafka
When selecting a Python Kafka client, choose an appropriate library based on your application requirements and Kafka version. For most scenarios, kafka-python or confluent-kafka-python is recommended as they support newer Kafka versions and offer more comprehensive features. If your application requires asynchronous programming, consider using aiokafka.
This article focuses on the usage of kafka-python. Official documentation can be found at kafka-python.

Producer Practices

Version Selection

When using kafka-python, the kafka-python library must be installed first. You can install it using the following command:
pip install kafka-python

Producer Parameters and Tuning

Producer Parameters

Kafka Python involves the following key parameters. The related parameters and their default values are as follows:
from kafka import KafkaProducer

producer = KafkaProducer(
bootstrap_servers='localhost:9092', # Used to initialize the connection to the Kafka cluster with a list of brokers. The default value is 'localhost:9092'
client_id=None, # Custom client ID used to identify the client in Kafka server logs. The default value is None
key_serializer=None, # A callable used to serialize message keys into bytes. The default value is None.
value_serializer=None, # A callable used to serialize message values into bytes. The default value is None.
compression_type=None, # The message compression type. The available options are 'gzip', 'snappy', 'lz4', or None (indicating no compression). The default value is None.
retries=0, # The number of times to resend failed messages. The default value is 0.
batch_size=16384, # The size for batching messages in bytes. The default value is 16384
linger_ms=0, # The maximum time to wait for more messages before batching, in milliseconds. The default value is 0.
partitioner=None, # A callable used to determine the partition for messages. The default value is None.
buffer_memory=33554432, # The total memory in bytes used to buffer unsent messages. The default value is 33554432.
connections_max_idle_ms=540000, # The maximum idle time for connections in milliseconds. The default value is 540000.
max_block_ms=60000, # The maximum blocking time in milliseconds for the send() method when the buffer memory limit is reached. The default value is 60000.
max_request_size=1048576, # The maximum size in bytes of requests sent to the broker. The default value is 1048576.
metadata_max_age_ms=300000, # The maximum time metadata is cached locally in milliseconds. The default value is 300000.
retry_backoff_ms=100, # The wait time in milliseconds between retries. The default value is 100.
request_timeout_ms=30000, # The maximum time the client will wait for a request response in milliseconds. The default value is 30000.
receive_buffer_bytes=32768, # The size of the network buffer for receiving data in bytes. The default value is 32768.
send_buffer_bytes=131072, # The size of the network buffer for sending data in bytes. The default value is 131072.
acks='all', # Message acknowledgment mechanism. Valid values are '0', '1', or 'all'. The default value is 'all'.
transactional_id=None, # The transactional ID used to uniquely identify a producer participating in a transaction. The default value is None.
transaction_timeout_ms=60000, # The transaction timeout in milliseconds. The default value is 60000.
enable_idempotence=False, # Whether to enable idempotence. The default value is False.
security_protocol='PLAINTEXT', # Security protocol type. Valid values are 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', or 'SASL_SSL'. The default value is 'PLAINTEXT'.

Parameter Description Tuning

On acks Parameter Optimization
The acks parameter controls the acknowledgment mechanism when producers send messages. Its default value is -1, meaning the producer only returns after the message is sent to the Leader Broker and both the Leader acknowledgment and corresponding Follower messages are fully written. The acks parameter also supports the following optional values: 0, 1, -1. In cross-AZ scenarios and for topics with a high number of replicas, the value of the acks parameter affects message reliability and throughput. Therefore:
In scenarios involving online business messages where throughput requirements are not high, setting the acks parameter to -1 ensures that messages are returned only after being received and acknowledged by all replicas, thereby enhancing message reliability.
In scenarios such as log collection, big data, or offline computing where high throughput (i.e., the amount of data written to Kafka per second) is required, setting acks to 1 can improve throughput.
About buffer_memory Parameter Optimization (Caching)
By default, when transmitting the same amount of data, using a single request for network transmission instead of multiple requests can effectively reduce related computational and network resource consumption, thereby increasing overall write throughput.
Therefore, this parameter can be configured to optimize the client's message sending throughput. For the Kafka Python Client, it provides a default batching time of linger_ms=0ms to accumulate messages. This can be optimized by appropriately increasing the value, for example, setting it to 100ms to aggregate multiple requests and send them in batches, thereby improving throughput. If bandwidth is high and single-machine memory is sufficient, it is recommended to increase buffer_memory to enhance throughput.
About Compression Parameter Optimization
Kafka Python Client supports the following compression parameters: none, gzip, snappy, lz4.
none: No compression.
gzip: Uses GZIP compression.
snappy: Uses Snappy compression.
lz4: Uses LZ4 compression.
To use message compression in the Producer client, set the compression_type parameter when creating the producer. For example, to use the LZ4 compression algorithm, set compression_type to lz4. Although compression and decompression occur on the client side—trading computational resources for bandwidth optimization—Brokers incur additional computational costs for validating compressed messages. Under low traffic conditions, compression is not recommended, especially gzip compression, as the computational cost for Broker validation can be substantial. In some cases, this may result in diminishing returns where increased computation reduces the Broker's capacity to handle other requests, ultimately lowering bandwidth throughput. For such scenarios, consider the following approach:
In the Producer, messages are independently compressed to generate compressed data packets: messageCompression, while storing the compression method in the message's key:
{"Compression","lz4"}
At the Producer end, send messageCompression as a normal message.
On the Consumer side, read the message key to obtain the compression method used and independently decompress.

Create Producer Instance

If the application requires higher reliability, a synchronous producer can be used to ensure successful message delivery. Additionally, the ACK confirmation mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter tuning, refer to Producer Parameters and Tuning. If the application demands higher throughput, an asynchronous producer should be used to increase message delivery speed. Simultaneously, batch message sending can be adopted to reduce network overhead and IO consumption. Example:
from kafka import KafkaProducer
import sys

# Parameter Configuration
BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC = 'test_topic'
SYNC = True
ACKS = '1' # leader replica acknowledgment is sufficient for write confirmation.
LINGER_MS = 500 # Delays sending by 500ms
BATCH_SIZE = 16384 # Message batch size 16KB

def create_producer(servers, acks, linger_ms, batch_size):
return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)

def send_message_sync(producer, topic, message):
future = producer.send(topic, message)
result = future.get(timeout=10)
print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")

def send_message_async(producer, topic, message):
def on_send_success(record_metadata):
print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")

def on_send_error(excp):
print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)
print(excp, file=sys.stderr)

future = producer.send(topic, message)
future.add_callback(on_send_success).add_errback(on_send_error)

def main():
producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)
messages = ['Hello Kafka', 'Async vs Sync', 'Demo']

if SYNC:
for message in messages:
send_message_sync(producer, TOPIC, message.encode('utf-8'))
else:
for message in messages:
send_message_async(producer, TOPIC, message.encode('utf-8'))

producer.flush()

if __name__ == '__main__':
main()



Consumer Practices

Consumer Parameters and Tuning

Consumer Parameters

from kafka import KafkaConsumer

# Create a KafkaConsumer object to connect to the Kafka cluster and consume messages
consumer = KafkaConsumer(
'topic_name', # List of topics to subscribe to
bootstrap_servers=['localhost:9092'], # Bootstrap servers for the Kafka cluster
group_id=None, # The consumer group ID, used to group consumers, to join dynamic partition assignment (if enabled), and the name of the consumer group for obtaining and committing offsets. If set to None, disables automatic partition assignment (via the group coordinator) and offset committing.
client_id='kafka-python-{version}', # Client Id, default is kafka-python-{version}
api_version=None, # Specifies the Kafka API version to use. If set to None, the client will attempt to probe different versions via API requests to enable features.
enable_auto_commit=True, # Whether to automatically commit offsets. Default is True
auto_commit_interval_ms=5000, # Interval for auto-committing offsets, default is 5 seconds (5000 milliseconds)
auto_offset_reset='latest', # The policy for the consumer's starting position in the partition. Default is 'latest' (start consuming from the latest position)
fetch_min_bytes=1, # Minimum bytes to fetch when reading from partitions. Default is 1 byte
fetch_max_wait_ms=500, # When there is no new data to consume, wait for 500 ms by default
fetch_max_bytes=52428800, # Maximum bytes to fetch when reading from partitions. Default is 52428800 bytes (50MB)
max_poll_interval_ms=300000 # The default value is 300000 milliseconds (5 minutes). If a consumer fails to send a heartbeat signal within 5 minutes, it will be considered disconnected and removed from the consumer group. In this case, other consumers will take over the partitions of the removed consumer and trigger a rebalance.
retry_backoff_ms=100, # Retry backoff interval, default is 100 milliseconds
reconnect_backoff_max_ms=1000, # Maximum interval (in milliseconds) for reconnecting to a Broker after multiple failed connection attempts. Upon connection failure, the interval increases exponentially with each consecutive failure until reaching this maximum. Once the maximum is reached, reconnection attempts will continue periodically at this fixed rate.
request_timeout_ms=305000, # Client request timeout in milliseconds
session_timeout_ms=10000, # session_timeout_ms (int) – Timeout for detecting failures when using Kafka group management. Consumers periodically send heartbeats to the broker to indicate their liveness. If the broker does not receive a heartbeat before this session timeout expires, it will remove the consumer from the group and initiate a rebalance.
heartbeat_interval_ms=3000, # Expected time (in milliseconds) between heartbeats to the consumer coordinator when using Kafka's group management. Heartbeats are used to ensure the consumer's session remains active and to facilitate rebalancing when new consumers join or leave the group. This value must be set lower than session_timeout_ms, and typically should not exceed 1/3 of that value.
receive_buffer_bytes=32768,#The size of the TCP receive buffer (SO_RCVBUF) used when reading data. Default: None (relies on system default), default is 32768.
send_buffer_bytes=131072# The size of the TCP send buffer (SO_SNDBUF) used when sending data. Default: None (relies on system default), 131072.
)

for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")

Parameter Description and Tuning

1. max_poll_interval_ms is a configuration parameter of Kafka Python Consumer, which specifies the maximum delay between two poll operations. The primary purpose of this parameter is to control the Consumer's liveness, determining whether the Consumer is still active. If the Consumer fails to perform a poll operation within the time specified by max_poll_interval_ms, Kafka will consider the Consumer dead and trigger a rebalance operation. This parameter setting should be adjusted based on actual consumption speed. If set too small, it may cause frequent rebalance operations, increasing Kafka's burden; if set too large, it may prevent Kafka from timely detecting problematic Consumers, thereby affecting message consumption. For high-throughput scenarios, it is recommended to increase this value accordingly.
2. For auto-commit offset requests, it is recommended that the auto_commit_interval_ms value not be set below 1000ms. Excessively frequent offset requests can cause high Broker CPU usage, affecting read and write operations of other services.

Create Consumer Instance

Kafka Python provides a subscription-based model for creating consumers, offering both manual and automatic offset commit methods.

Automatic Offset Commit

Automatic offset commit: After polling messages, the consumer automatically commits offsets without manual intervention. This approach offers simplicity and ease of use but may lead to duplicate message consumption or message loss. It is recommended to set the commit interval to 5 seconds.

# auto_commit_consumer_interval.py
from kafka import KafkaConsumer
from time import sleep

consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='auto_commit_group',
auto_commit_interval_ms=5000 # Set auto-commit offset interval to 5000 milliseconds (5 seconds)
)

for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
sleep(1)

Manual Offset Commit

Manual offset commit: After processing messages, the consumer needs to manually commit offsets. The advantage of this approach is precise control over offset commits, preventing duplicate message consumption or message loss. However, note that overly frequent manual offset commits can cause high CPU usage on the Broker, impacting performance. As message volume increases, high CPU consumption may affect other features of the Broker. Therefore, it is recommended to commit offsets at certain message intervals.

# manual_commit_consumer.py
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from time import sleep

consumer = KafkaConsumer(
'your_topic_name',
bootstrap_servers=['localhost:9092'],
group_id='manual_commit_group',
enable_auto_commit=False
)

count = 0
for message in consumer:
print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
count += 1

if count % 10 == 0:
try:
consumer.commit()
except KafkaError as e:
print(f"Error while committing offset: {e}")

sleep(1)


도움말 및 지원

문제 해결에 도움이 되었나요?

피드백