Release Notes
Broker Release Notes
Announcement
pip install kafka-pythonpip install confluent-kafkapip install aiokafkapip install pykafkapip install kafka-pythonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092', # Used to initialize the connection to the Kafka cluster with a list of brokers. The default value is 'localhost:9092'client_id=None, # Custom client ID used to identify the client in Kafka server logs. The default value is Nonekey_serializer=None, # A callable used to serialize message keys into bytes. The default value is None.value_serializer=None, # A callable used to serialize message values into bytes. The default value is None.compression_type=None, # The message compression type. The available options are 'gzip', 'snappy', 'lz4', or None (indicating no compression). The default value is None.retries=0, # The number of times to resend failed messages. The default value is 0.batch_size=16384, # The size for batching messages in bytes. The default value is 16384linger_ms=0, # The maximum time to wait for more messages before batching, in milliseconds. The default value is 0.partitioner=None, # A callable used to determine the partition for messages. The default value is None.buffer_memory=33554432, # The total memory in bytes used to buffer unsent messages. The default value is 33554432.connections_max_idle_ms=540000, # The maximum idle time for connections in milliseconds. The default value is 540000.max_block_ms=60000, # The maximum blocking time in milliseconds for the send() method when the buffer memory limit is reached. The default value is 60000.max_request_size=1048576, # The maximum size in bytes of requests sent to the broker. The default value is 1048576.metadata_max_age_ms=300000, # The maximum time metadata is cached locally in milliseconds. The default value is 300000.retry_backoff_ms=100, # The wait time in milliseconds between retries. The default value is 100.request_timeout_ms=30000, # The maximum time the client will wait for a request response in milliseconds. The default value is 30000.receive_buffer_bytes=32768, # The size of the network buffer for receiving data in bytes. The default value is 32768.send_buffer_bytes=131072, # The size of the network buffer for sending data in bytes. The default value is 131072.acks='all', # Message acknowledgment mechanism. Valid values are '0', '1', or 'all'. The default value is 'all'.transactional_id=None, # The transactional ID used to uniquely identify a producer participating in a transaction. The default value is None.transaction_timeout_ms=60000, # The transaction timeout in milliseconds. The default value is 60000.enable_idempotence=False, # Whether to enable idempotence. The default value is False.security_protocol='PLAINTEXT', # Security protocol type. Valid values are 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', or 'SASL_SSL'. The default value is 'PLAINTEXT'.
{"Compression","lz4"}from kafka import KafkaProducerimport sys# Parameter ConfigurationBOOTSTRAP_SERVERS = 'localhost:9092'TOPIC = 'test_topic'SYNC = TrueACKS = '1' # leader replica acknowledgment is sufficient for write confirmation.LINGER_MS = 500 # Delays sending by 500msBATCH_SIZE = 16384 # Message batch size 16KBdef create_producer(servers, acks, linger_ms, batch_size):return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)def send_message_sync(producer, topic, message):future = producer.send(topic, message)result = future.get(timeout=10)print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")def send_message_async(producer, topic, message):def on_send_success(record_metadata):print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")def on_send_error(excp):print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)print(excp, file=sys.stderr)future = producer.send(topic, message)future.add_callback(on_send_success).add_errback(on_send_error)def main():producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)messages = ['Hello Kafka', 'Async vs Sync', 'Demo']if SYNC:for message in messages:send_message_sync(producer, TOPIC, message.encode('utf-8'))else:for message in messages:send_message_async(producer, TOPIC, message.encode('utf-8'))producer.flush()if __name__ == '__main__':main()
from kafka import KafkaConsumer# Create a KafkaConsumer object to connect to the Kafka cluster and consume messagesconsumer = KafkaConsumer('topic_name', # List of topics to subscribe tobootstrap_servers=['localhost:9092'], # Bootstrap servers for the Kafka clustergroup_id=None, # The consumer group ID, used to group consumers, to join dynamic partition assignment (if enabled), and the name of the consumer group for obtaining and committing offsets. If set to None, disables automatic partition assignment (via the group coordinator) and offset committing.client_id='kafka-python-{version}', # Client Id, default is kafka-python-{version}api_version=None, # Specifies the Kafka API version to use. If set to None, the client will attempt to probe different versions via API requests to enable features.enable_auto_commit=True, # Whether to automatically commit offsets. Default is Trueauto_commit_interval_ms=5000, # Interval for auto-committing offsets, default is 5 seconds (5000 milliseconds)auto_offset_reset='latest', # The policy for the consumer's starting position in the partition. Default is 'latest' (start consuming from the latest position)fetch_min_bytes=1, # Minimum bytes to fetch when reading from partitions. Default is 1 bytefetch_max_wait_ms=500, # When there is no new data to consume, wait for 500 ms by defaultfetch_max_bytes=52428800, # Maximum bytes to fetch when reading from partitions. Default is 52428800 bytes (50MB)max_poll_interval_ms=300000 # The default value is 300000 milliseconds (5 minutes). If a consumer fails to send a heartbeat signal within 5 minutes, it will be considered disconnected and removed from the consumer group. In this case, other consumers will take over the partitions of the removed consumer and trigger a rebalance.retry_backoff_ms=100, # Retry backoff interval, default is 100 millisecondsreconnect_backoff_max_ms=1000, # Maximum interval (in milliseconds) for reconnecting to a Broker after multiple failed connection attempts. Upon connection failure, the interval increases exponentially with each consecutive failure until reaching this maximum. Once the maximum is reached, reconnection attempts will continue periodically at this fixed rate.request_timeout_ms=305000, # Client request timeout in millisecondssession_timeout_ms=10000, # session_timeout_ms (int) – Timeout for detecting failures when using Kafka group management. Consumers periodically send heartbeats to the broker to indicate their liveness. If the broker does not receive a heartbeat before this session timeout expires, it will remove the consumer from the group and initiate a rebalance.heartbeat_interval_ms=3000, # Expected time (in milliseconds) between heartbeats to the consumer coordinator when using Kafka's group management. Heartbeats are used to ensure the consumer's session remains active and to facilitate rebalancing when new consumers join or leave the group. This value must be set lower than session_timeout_ms, and typically should not exceed 1/3 of that value.receive_buffer_bytes=32768,#The size of the TCP receive buffer (SO_RCVBUF) used when reading data. Default: None (relies on system default), default is 32768.send_buffer_bytes=131072# The size of the TCP send buffer (SO_SNDBUF) used when sending data. Default: None (relies on system default), 131072.)for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
# auto_commit_consumer_interval.pyfrom kafka import KafkaConsumerfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='auto_commit_group',auto_commit_interval_ms=5000 # Set auto-commit offset interval to 5000 milliseconds (5 seconds))for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")sleep(1)
# manual_commit_consumer.pyfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='manual_commit_group',enable_auto_commit=False)count = 0for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")count += 1if count % 10 == 0:try:consumer.commit()except KafkaError as e:print(f"Error while committing offset: {e}")sleep(1)
피드백