tencent cloud

POP Consumption Mode (5.X)
Last updated: 2025-07-15 17:46:17
POP Consumption Mode (5.X)
Last updated: 2025-07-15 17:46:17

Issue Background

RocketMQ is known by many customers and developers for its high performance, low delay, and backlog resistance characteristics. However, during the usage of the RocketMQ 4.x client SDK, some customers report that the consumer client encounters issues while consuming messages, such as the commonly used Push Consumer.
The SDK takes on too many functions, such as pulling messages, load balancing, message offset management, and Rebalance when adding new clients, which is unfriendly to multi-language developers.
The queue-exclusive load balancing policy can cause consumption bottlenecks: Each queue on the Broker can only be assigned to one consumer client in the same Group. So when the queue count is fixed, simply adding more consumer clients won't enhance consumption performance. Assuming a Topic has 10 queues, the Group can have up to 10 clients consuming messages (meaning each client can consume at most one queue). During peak business periods, even if the customer wants to add a new client to consume messages, the newly added 11th client will be unable to consume messages.
A single client exception leads to accumulation. If a client hangs due to an exception, since the heartbeat with the server is not disconnected, the client will still be allocated to the queue for consumption. However, because the client is actually unable to consume messages, exceptions accumulate. Due to the previous reason, simply increasing the number of clients does not resolve the issue.

Solution

Given the above reasons, 5.x introduced the POP consumption mode.
In POP mode, the consumer offset is managed by the server, so multiple clients can consume the same queue. Clients using POP mode pull messages from all queues, addressing the issues of single client exceptions and consumption bottlenecks.
Meanwhile, the server maintains consumption information, making the client SDK more lightweight and easier to port to multiple languages.


Sample Code

How to use POP consumption mode?
Require the use of gRPC SDK 5.x with the following dependencies:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
</dependencies>
Meanwhile, refer to the open-source community DEMO as follows (for example, Java code):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.java.example;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {
}

@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
String accessKey = "User A ak";
String secretKey = "User A sk";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

String endpoints = "Tencent Cloud webpage access point";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not essential.
// .enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "consumption group";
// Default consumption time, 30s, which means that for pulled messages, if consumption is not completed within 30s, the message will be pulled again by another client.
// users are advised to configure based on own scenario
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "*";
String topic = "topic name";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
In such cases, consumers in the same consumption group will not be bound to queues one-to-one, and the problem of queue accumulation due to a single consumer blocking, as seen in earlier 4.x versions, can be avoided to the maximum extent.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback