pip install kafka-pythonpip install confluent-kafkapip install aiokafkapip install pykafkapip install kafka-pythonfrom kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092', # Broker list used to initialize connection to the Kafka cluster, with the default value being 'localhost:9092'client_id=None, # Custom client ID for identification in Kafka server logs, with the default value being Nonekey_serializer=None, # Callable object used for serializing message keys into bytes, with the default value being Nonevalue_serializer=None, # Callable object used for serializing message values into bytes, with the default value being Nonecompression_type=None, # Message compression type, valid values are 'gzip', 'snappy', 'lz4', or None. Default value is None, indicating no compression.retries=0, # Number of times to retry failed messages, with the default value being 0batch_size=16384, # The size of messages used for batch processing, measured in bytes, with the default value being 16384linger_ms=0, # Maximum waiting time for additional messages before batch processing, in milliseconds, with the default value being 0partitioner=None, # Callable object used to determine the message partition, with the default value being Nonebuffer_memory=33554432, # Total memory allocated for buffering messages awaiting dispatch, in bytes, with the default value being 33554432connections_max_idle_ms=540000, # Maximum duration to maintain idle connections, in milliseconds, with the default value being 540000max_block_ms=60000, # Maximum duration to block the send() method when reaching buffer memory limits, in milliseconds, with the default value being 60000max_request_size=1048576, # Maximum byte size of requests sent to the broker, with the default value being 1048576metadata_max_age_ms=300000, # Maximum lifespan of metadata in the local cache, in milliseconds, with the default value being 300000retry_backoff_ms=100, # Waiting time between two retry attempts, in milliseconds, with the default value being 100request_timeout_ms=30000, # Maximum waiting time for client to receive a response, in milliseconds, with the default value being 30000receive_buffer_bytes=32768, # Network buffer size for receiving data, in bytes, with the default value being 32768send_buffer_bytes=131072, # Network buffer size for sending data, in bytes, with the default value being 131072acks='all', # Message acknowledgment mechanism, optional values are '0', '1', or 'all', with the default value being 'all'transactional_id=None, # Transaction ID, a unique identifier for producer participating in a transaction, with the default value being Nonetransaction_timeout_ms=60000, # Transaction timeout, in milliseconds, with the default value being 60000enable_idempotence=False, # Whether to enable Idempotence, with the default value being Falsesecurity_protocol='PLAINTEXT', # Security protocol type, optional values are 'PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL', with the default value being 'PLAINTEXT'
{"Compression","lz4"}from kafka import KafkaProducerimport sys# Parameter ConfigurationBOOTSTRAP_SERVERS = 'localhost:9092'TOPIC = 'test_topic'SYNC = TrueACKS = '1' # leader replica acknowledgement sufficesLINGER_MS = 500 # Delay sending for 500 msBATCH_SIZE = 16384 # Message batch size 16 KBdef create_producer(servers, acks, linger_ms, batch_size):return KafkaProducer(bootstrap_servers=servers, acks=acks, linger_ms=linger_ms, batch_size=batch_size)def send_message_sync(producer, topic, message):future = producer.send(topic, message)result = future.get(timeout=10)print(f"Sent message: {message} to topic: {topic}, partition: {result.partition}, offset: {result.offset}")def send_message_async(producer, topic, message):def on_send_success(record_metadata):print(f"Sent message: {message} to topic: {topic}, partition: {record_metadata.partition}, offset: {record_metadata.offset}")def on_send_error(excp):print(f"Error sending message: {message} to topic: {topic}", file=sys.stderr)print(excp, file=sys.stderr)future = producer.send(topic, message)future.add_callback(on_send_success).add_errback(on_send_error)def main():producer = create_producer(BOOTSTRAP_SERVERS, ACKS, LINGER_MS, BATCH_SIZE)messages = ['Hello Kafka', 'Async vs Sync', 'Demo']if SYNC:for message in messages:send_message_sync(producer, TOPIC, message.encode('utf-8'))else:for message in messages:send_message_async(producer, TOPIC, message.encode('utf-8'))producer.flush()if __name__ == '__main__':main()
from kafka import KafkaConsumer# Create a KafkaConsumer object for connecting to the Kafka cluster and consuming messages.consumer = KafkaConsumer('topic_name', # List of Topics to subscribe tobootstrap_servers=['localhost:9092'], # Access point for the Kafka clustergroup_id=None, # Consumer Group ID used for grouping consumers, required by dynamic partition allocation (if enabled) and used to access and submit the consumer group name of offset. If it is set to None, auto partition allocation (through the group coordinator) and offset submission are disabled.client_id='kafka-python-{version}', # The default client ID, with the default value being kafka-python-{version}api_version=None, # Specify the Kafka API version to use. If it is set to None, the client will try to activate different version features through API requestsenable_auto_commit=True, # Whether to automatically commit the consumer offset, with the default value being Trueauto_commit_interval_ms=5000, # Interval for auto-committing consumer offset, with the default value being 5 seconds (5000 milliseconds)auto_offset_reset='latest', # Policy for consumer's consumption position in the partition being read, with the default value being 'latest' (starting consuming from the latest position)fetch_min_bytes=1, # Minimum bytes for consumer to read from a partition, with the default value being 1 bytefetch_max_wait_ms=500, # Waiting time when there is no more new consumption data, with the default value being 500 msfetch_max_bytes=52428800, # Maximum bytes for consumer to read from a partition, with the default value being 52,428,800 bytes (50 MB)max_poll_interval_ms=300000 # Default interval is 300,000 milliseconds (5 minutes). If the consumer does not send a heartbeat signal within 5 minutes, it will be considered to have lost connection and will be removed from the consumer group. In this situation, other consumers will take over the partitions of the removed consumer and rebalancing will be triggered.retry_backoff_ms=100, # Retry interval, with the default value being 100 millisecondsreconnect_backoff_max_ms=1000, # Maximum interval for reconnection attempts to a broker after multiple failures, measured in milliseconds. If the connection fails, the value will exponentially increase after each consecutive failure until reaching this maximum value. Once the maximum value is reached, the reconnection attempts will continue at this fixed rate regularly.request_timeout_ms=305000, # Client request timeout, in millisecondssession_timeout_ms=10000, # session_timeout_ms (int) – Timeout period for detecting failures when the Kafka group management tool is used. Consumers regularly send heartbeats to the broker to indicate their activity. If the broker does not receive a heartbeat before this session timeout expires, it will remove that consumer from the group and initiate rebalancing.heartbeat_interval_ms=3000, # The expected time interval between heartbeats to the consumer coordinator when Kafka's group management tool is used, in milliseconds. Heartbeats are used to ensure the consumer's session remains active and facilitate rebalancing when new consumers join or leave the group. This value must be set to less than session_timeout_ms, but it typically should not exceed 1/3 of that value.receive_buffer_bytes=32768, # Size of the TCP receive buffer (SO_RCVBUF) used when reading data. It has no default value, or default value depends on system default, usually 32768.send_buffer_bytes=131072 # Size of the TCP send buffer (SO_SNDBUF) used when sending data. It has no default value, or default value depends on system default, usually 131072.)for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
# auto_commit_consumer_interval.pyfrom kafka import KafkaConsumerfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='auto_commit_group',auto_commit_interval_ms=5000 # Set the interval for automatic offset commits to 5000 milliseconds (5 seconds))for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")sleep(1)
# manual_commit_consumer.pyfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorfrom time import sleepconsumer = KafkaConsumer('your_topic_name',bootstrap_servers=['localhost:9092'],group_id='manual_commit_group',enable_auto_commit=False)count = 0for message in consumer:print(f"Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")count += 1if count % 10 == 0:try:consumer.commit()except KafkaError as e:print(f"Error while committing offset: {e}")sleep(1)
Feedback