tencent cloud

POP Consumption Mode (5.x)
Last updated:2026-01-23 17:52:23
POP Consumption Mode (5.x)
Last updated: 2026-01-23 17:52:23

Issue Background

RocketMQ is well-known to many customers and developers for its high performance, low latency, and backlog resistance. However, when using the RocketMQ 4.x client SDK, certain customers report that the consumer client encounters issues during actual message consumption with the 4.x clients (such as the commonly used Push Consumer):
The SDK undertakes too many features, such as pulling messages, load balancing, message queue offset management, and rebalance when new clients are added. This is not developer-friendly for those working with multiple programming languages.
The queue-exclusive load balancing policy can easily lead to consumption bottlenecks. Each queue on the broker can only be assigned to one consumer client within the same consumer group. Therefore, when the number of queues is fixed, simply increasing the number of consumer clients does not improve consumption performance. For example, if a topic has 10 queues, the group can have at most 10 clients consuming messages (at most one queue per client). During peak business hours, even if a customer wants to add a new client to consume messages, the 11th client that comes online will be unable to consume any messages.
A single client failure can cause message backlog. If a single client hangs due to an exception but its heartbeat with the server remains connected, the server will still assign queues to this client for consumption. Since the client is actually unable to process messages due to the exception, a backlog starts to accumulate. Furthermore, as explained in the previous point, merely adding more clients cannot resolve this issue.

Solutions

Given the reasons above, the POP consumption mode was introduced in the 5.x version.
In POP mode, the consumer offset is managed by the server, allowing multiple clients to consume from the same queue. Clients using the POP mode pull messages from all queues, thereby solving the issues of single-client failure and consumption bottlenecks mentioned above.
Furthermore, as the server maintains the consumption information, the client SDK becomes more lightweight, facilitating easier multi-language porting.


Sample Code

So, how do we use the POP consumption mode?
The 5.x gRPC SDK is required. Include the following dependencies:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
</dependencies>
Also, refer to the following demo from the open-source community (using Java code as an example):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.java.example;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {
}

@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
String accessKey = "User AccessKey";
String secretKey = "User SecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

String endpoints = "Tencent Cloud page access point";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not essential.
// .enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "Consumer group";
// The default consumption timeout is 30 seconds. This means if a pulled message is not consumed within 30 seconds, it can be pulled again by another client.
// Configure this based on your actual scenario.
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "*";
String topic = "Topic name";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
In this scenario, a consumer within the same consumer group is no longer exclusively bound to specific queues. This also largely prevents the queue backlog issue seen in 4.x, which was caused by the blocking of a single consumer.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback