
Description | Message Throttling for Sending | Message Throttling for Consumption |
Throttling Trigger Scenario | All connections to the cluster can send a total of 500 converted messages per second. Requests will fail if the sending rate reaches the limit. | All consumer clients connected to the cluster can consume a total of 500 converted messages per second. Message consumption delay will increase if the consumption rate reaches the limit. |
Log Keyword When Trigger Throttling | Rate of message sending reaches limit, please take a control or upgrade the resource specification. | Rate of message receiving reaches limit, please take a control or upgrade the resource specification. |
Retry Mechanism When Trigger Throttling | SDKs handle differences across protocols: 5.x SDK retries sending with exponential backoff policy. The maximum number of retries can be customized during Producer initialization, with a default value of 2. Failed requests that reach the maximum retry limit will throw an exception. 4.x SDK directly throws exception and will not retry. | The SDK pull message thread automatically performs backoff retry. |

import java.io.UnsupportedEncodingException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.acl.common.AclClientRPCHook;import org.apache.rocketmq.acl.common.SessionCredentials;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageClientIDSetter;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;public class ProducerExample {private static final InternalLogger log = InternalLoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";String producerGroupName = "Your_Producer_Group_Name";// Instantiate the message producer ProducerDefaultMQProducer producer = new DefaultMQProducer(producerGroupName, // Producer group namenew AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission, accessKey and secretKey can be obtained on the console cluster permission page);// Set Nameserver address, can be obtained on the console cluster basic information pageproducer.setNamesrvAddr(nameserver);Start the Producer instanceproducer.start();// Create message instance, set topic and message contentMessage message = new Message(topicName, "Your_Biz_Body".getBytes(StandardCharsets.UTF_8));// Maximum send count, set based on business needsfinal int maxAttempts = 3;// retry interval, set based on business needsfinal int retryIntervalMillis = 200;// Send the message.int attempt = 0;do {try {SendResult sendResult = producer.send(message);log.info("Send message successfully, {}", sendResult);break;} catch (Throwable t) {attempt++;if (attempt >= maxAttempts) {// Reached the maximum number of timeslog.warn("Failed to send message finally, run out of attempt times, attempt={}, maxAttempts={}, msgId={}",attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);// Log messages that failed to send (or record in other services, such as database etc.)log.warn(message.toString());break;}int waitMillis;if (t instanceof MQBrokerException && ((MQBrokerException) t).getResponseCode() == 215 /* FLOW_CONTROL */) {// Rate limit exceeded, use backoff retrywaitMillis = (int) Math.pow(2, attempt - 1) * retryIntervalMillis; // Retry interval: 200ms, 400ms, ......} else {other exceptionswaitMillis = retryIntervalMillis;}log.warn("Failed to send message, will retry after {}ms, attempt={}, maxAttempts={}, msgId={}",waitMillis, attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);try {Thread.sleep(waitMillis);} catch (InterruptedException ignore) {}}}while (true);producer.shutdown();}}
import java.io.IOException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ProducerExample {private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException, IOException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";// ACL permission, accessKey and secretKey can be obtained on the console cluster permission pageSessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(nameserver) // Set NameServer address, can be obtained on the console cluster basic information page.setCredentialProvider(sessionCredentialsProvider).build();Start the Producer instanceClientServiceProvider provider = ClientServiceProvider.loadService();Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topicName) // Pre-declare the topic for message sending, recommend setting.setMaxAttempts(3) // Maximum send count, set based on business needs.build();// Create message instance, set topic and message contentbyte[] body = "Your_Biz_Body".getBytes(StandardCharsets.UTF_8);final Message message = provider.newMessageBuilder().setTopic(topicName).setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.warn("Failed to send message", t);// Log messages that failed to send (or record in other services, such as databases)log.warn(message.toString());}producer.close();}}

Feedback