
Description | Traffic Throttling for Message Sending | Traffic Throttling for Message Consumption |
Traffic Throttling Scenario | All sending clients connected to this cluster can send a maximum total of 500 converted messages per second. When the sending rate reaches the limit, any excess sending requests will fail. | All consumption clients connected to this cluster can consume a maximum total of 500 converted messages per second. When the consumption rate reaches the limit, the consumption delay for messages will increase. |
SDK Log Keywords When Traffic Throttling Is Triggered | Rate of message sending reaches limit, please take a control or upgrade the resource specification. | Rate of message receiving reaches limit, please take a control or upgrade the resource specification. |
SDK Retry Mechanism When Traffic Throttling Is Triggered | SDKs of different protocols handle this differently: 5.x SDK retries on message sending based on an exponential backoff policy. The maximum number of retries can be customized during producer initialization, with a default value of 2. If a sending request still fails after reaching the maximum number of retries, an exception will be thrown. 4.x SDK directly throws an exception and does not perform any retry. | The SDK's message pulling thread will automatically back off and retry. |

import java.io.UnsupportedEncodingException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.acl.common.AclClientRPCHook;import org.apache.rocketmq.acl.common.SessionCredentials;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageClientIDSetter;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;public class ProducerExample {private static final InternalLogger log = InternalLoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";String producerGroupName = "Your_Producer_Group_Name";// Instantiate a message producer.DefaultMQProducer producer = new DefaultMQProducer(producerGroupName, // Producer group name.new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // Access control list (ACL) permissions. You can obtain the accessKey and secretKey from the Cluster Permissions page in the console.);// Set the NameServer address. You can obtain the address from the cluster details page in the console.producer.setNamesrvAddr(nameserver);// Start the producer instance.producer.start();// Create a message instance and set the topic and message content.Message message = new Message(topicName, "Your_Biz_Body".getBytes(StandardCharsets.UTF_8));// Maximum number of message sending attempts. Configure the limit based on your business scenario.final int maxAttempts = 3;// Retry interval. Configure the interval based on your business scenario.final int retryIntervalMillis = 200;// Send messages.int attempt = 0;do {try {SendResult sendResult = producer.send(message);log.info("Send message successfully, {}", sendResult);break;} catch (Throwable t) {attempt++;if (attempt >= maxAttempts) {// The maximum number of attempts is reached.log.warn("Failed to send message finally, run out of attempt times, attempt={}, maxAttempts={}, msgId={}",attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);// Log the failed messages (or log them to other business systems, such as databases).log.warn(message.toString());break;}int waitMillis;if (t instanceof MQBrokerException && ((MQBrokerException) t).getResponseCode() == 215 /* FLOW_CONTROL */) {// Traffic throttling exception. Use backoff retry.waitMillis = (int) Math.pow(2, attempt - 1) * retryIntervalMillis; // Retry interval: 200 ms, 400 ms, ......} else {// Other exceptions.waitMillis = retryIntervalMillis;}log.warn("Failed to send message, will retry after {}ms, attempt={}, maxAttempts={}, msgId={}",waitMillis, attempt, maxAttempts, MessageClientIDSetter.getUniqID(message), t);try {Thread.sleep(waitMillis);} catch (InterruptedException ignore) {}}}while (true);producer.shutdown();}}
import java.io.IOException;import java.nio.charset.StandardCharsets;import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientException;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ProducerExample {private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException, IOException {String nameserver = "Your_Nameserver";String accessKey = "Your_Access_Key";String secretKey = "Your_Secret_Key";String topicName = "Your_Topic_Name";// ACL permissions. You can obtain the accessKey and secretKey from the Cluster Permissions page in the console.SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(nameserver) // Set the NameServer address. You can obtain the address from the cluster details page in the console..setCredentialProvider(sessionCredentialsProvider).build();// Start the producer instance.ClientServiceProvider provider = ClientServiceProvider.loadService();Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topicName) // Pre-declare the topic for message sending. It is recommended to set this parameter..setMaxAttempts(3) // Maximum number of message sending attempts. Configure the limit based on your business scenario..build();// Create a message instance and set the topic and message content.byte[] body = "Your_Biz_Body".getBytes(StandardCharsets.UTF_8);final Message message = provider.newMessageBuilder().setTopic(topicName).setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.warn("Failed to send message", t);// Log the failed messages (or log them to other business systems, such as databases).log.warn(message.toString());}producer.close();}}

Feedback