
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version></dependency></dependencies>
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.rocketmq.client.java.example;import java.time.Duration;import java.util.Collections;import java.util.List;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.consumer.FilterExpression;import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;import org.apache.rocketmq.client.apis.message.MessageId;import org.apache.rocketmq.client.apis.message.MessageView;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class SimpleConsumerExample {private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);private SimpleConsumerExample() {}@SuppressWarnings({"resource", "InfiniteLoopStatement"})public static void main(String[] args) throws ClientException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// Credential provider is optional for client configuration.String accessKey = "User A ak";String secretKey = "User A sk";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);String endpoints = "Tencent Cloud webpage access point";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints)// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in// client configuration to solve the problem please if SSL is not essential.// .enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String consumerGroup = "consumption group";// Default consumption time, 30s, which means that for pulled messages, if consumption is not completed within 30s, the message will be pulled again by another client.// users are advised to configure based on own scenarioDuration awaitDuration = Duration.ofSeconds(30);String tag = "*";String topic = "topic name";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// In most case, you don't need to create too many consumers, singleton pattern is recommended.SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)// Set the consumer group name..setConsumerGroup(consumerGroup)// set await duration for long-polling..setAwaitDuration(awaitDuration)// Set the subscription for the consumer..setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();// Max message num for each long polling.int maxMessageNum = 16;// Set message invisible duration after it is received.Duration invisibleDuration = Duration.ofSeconds(15);// Receive message, multi-threading is more recommended.do {final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);log.info("Received {} message(s)", messages.size());for (MessageView message : messages) {final MessageId messageId = message.getMessageId();try {consumer.ack(message);log.info("Message is acknowledged successfully, messageId={}", messageId);} catch (Throwable t) {log.error("Message is failed to be acknowledged, messageId={}", messageId, t);}}} while (true);// Close the simple consumer when you don't need it anymore.// You could close it manually or add this into the JVM shutdown hook.// consumer.close();}}
Feedback