import uuidfrom kafka import KafkaConsumer,TopicPartition,OffsetAndMetadataconsumer = KafkaConsumer(# Topic name provided by the cls kafka protocol consumption console, such as XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX, can be copied from the console'Your consumption topics',group_id = 'your consumer group name',auto_offset_reset='earliest',# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situationbootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',# username is logset ID, such as ca5cXXXXdd2e-4ac0af12-92d4b677d2c6sasl_plain_username = "${logsetID}",The password is a string composed of the user's SecretId#SecretKey, such as AKID********************************#XXXXuXtymIXT0Lac. Be careful not to lose the #. Use sub-account keys. When the root account authorizes the sub-account, follow the principle of least privilege. The actions and resources in the sub-account access policy should be configured to the minimum range to fulfill the operations.sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (0,10,1))print('begin')for message in consumer:print('begins')print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))print('end')
from kafka import KafkaConsumerimport threadingTOPIC_NAME = 'Your consumption topics'GROUP_ID = 'your consumer group name'# Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situationBOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''def consume_messages(thread_id):# Create a Kafka consumer instanceconsumer = KafkaConsumer(TOPIC_NAME,group_id=GROUP_ID,bootstrap_servers=BOOTSTRAP_SERVERS,value_deserializer=lambda m: m.decode('utf-8'),auto_offset_reset='earliest',security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',sasl_plain_username = "${logsetID}"",sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (2, 5, 1))try:for message in consumer:print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")except KeyboardInterrupt:passfinally:# Stop the consumer.consumer.close()if __name__ == "__main__":Start 3 consumer threads. This is an example. Please configure according to actual conditions.num_consumers = 3threads = []for i in range(num_consumers):thread = threading.Thread(target=consume_messages, args=(i,))threads.append(thread)thread.start()# Wait for all threads to completefor thread in threads:thread.join()
${SecretId}#${SecretKey} is followed by (; semicolon). Do not leave any field out, otherwise an error will be reported.<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.0</version></dependency>import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerGroupTest {public static void consume() {Properties props = new Properties();String logset_id = "${logsetID}";// Topic name for display on the page of kafka protocol consumption in the CLS consoleString topic_id = "Your consumption topics";String accessKeyID = System.getenv("${SecretId}");String accessKeySecret = System.getenv("${SecretKey}");String groupId = "your consumer group name";// Service address + port, public network port 9096, private network port 9095, example is intranet consumption, please fill in according to your actual situationString hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";props.put("bootstrap.servers", hosts);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");// Kafka consumer configurationprops.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000");props.put("session.timeout.ms", "10000");props.put("auto.offset.reset", "earliest");props.put("max.poll.interval.ms", "120000");props.put("heartbeat.interval.ms", "3000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// Create a Kafka consumer instanceKafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);consumer.subscribe(Collections.singletonList(topic_id));while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());}}}public static void main(String[] args){consume();}}
${SecretId}#${SecretKey} is followed by (; semicolon). Do not leave any field out, otherwise an error will be reported.package mainimport ("context""fmt""github.com/Shopify/sarama""log""os""os/signal""syscall")func main() {// Create Sarama consumer configuration//TOPIC_NAME is your consumption topic, view it in the console.topicName := "${TOPIC_NAME}"//GROUP_ID is your consumer group namegroupID := "${GROUP_ID}"//BOOTSTRAP_SERVERS is the consumption service host address and port, public network port 9096, private network port 9095, such as kafkaconsumer-${region}.cls.tencentyun.com:9095endpoint := "${BOOTSTRAP_SERVERS"config := sarama.NewConfig()config.Net.SASL.Enable = trueconfig.Net.SASL.User = "${logsetID}"config.Net.SASL.Password = "${SecretId}#${SecretKey}"config.Net.SASL.Mechanism = sarama.SASLTypePlaintextconfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobinconfig.Version = sarama.V1_1_1_0config.Consumer.Offsets.Initial = sarama.OffsetNewest// Create Sarama consumersarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)consumer, err := sarama.NewConsumerGroup([]string{endpoint}, groupID, config)if err != nil {log.Fatal(err)}defer consumer.Close()// Process received messageshandler := &ConsumerGroupHandler{}signals := make(chan os.Signal, 1)signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)go func() {for {err := consumer.Consume(context.Background(), []string{topicName}, handler)if err != nil {log.Fatal(err)}if handler.ready {break}}}()<-signalsfmt.Println("Exiting...")}// ConsumerGroupHandler implements the sarama.ConsumerGroupHandler APItype ConsumerGroupHandler struct {ready bool}// Setup is called before the consumer group starts upfunc (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {h.ready = truereturn nil}// Cleanup is called after the consumer group stopsfunc (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {h.ready = falsereturn nil}// ConsumeClaim consumes messages from the Claimfunc (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {fmt.Printf("Received message: %s\\n", string(message.Value))session.MarkMessage(message, "")}return nil}
Feedback