Sender Practice
Choosing between Topic and Tags
A Topic is a message Topic used to distinguish business messages of the same type. A Tag is a special attribute or sub-type of a message, which can be accessed by the downstream for efficient consumption filtering on the Broker side using a filter expression.
Choose between Topic and Tag based on the following factors:
1. Message type consistency: Currently, regular messages, scheduled messages, transaction messages, and sequential messages cannot be mixed. If they are different, you must select different Topics.
2. Business scenario consistency: For example, order messages and payment messages, it is recommended to choose different Topics. If order messages require region filtering for different provinces downstream, you can mark the city as a Tag.
3. Message volume consistency: Since RocketMQ consumption progress is maintained by Topic, if there is a very large difference in scale between different types earlier, small-scale Tags need to filter out a massive amount of useless messages before consumption, resulting in invalid resource consumption and increased latency.
Comprehensively, we recommend using Topic to distinguish messages with no business association and Tag to identify key attributes within the same Topic, enabling efficient filtering of some messages for different downstream consumers.
Keys Usage
It is recommended to set the unique business identifier of each message at the business level in the key field. The server will create an index (hash index) for each message. Users can query this message content as well as the message trace by Topic and Key to troubleshoot problems.
Choosing Appropriate Sending Method
RocketMQ supports three sending methods:
Synchronous sending: suitable for scenarios with relatively high reliability requirements, such as payment messages and SMS notifications.
Async sending: applicable to time-sensitive business scenarios where the sender cannot tolerate long waits for the Broker's response.
One-way sending: suitable for scenarios where the sending result is not a major concern, such as log sending.
send(msg) synchronously sends and only returns the result if sent successfully, but it blocks synchronously, increasing the sending duration. If there are performance requirements, you can use the asynchronous method: send(msg, callback), and check the sending result in the callback. For specific applications that do not require confirmation of whether the message was sent successfully, such as log collection category applications, directly use the sendOneWay method to send messages.
Sending Retry Policy
For applications where messages cannot be lost, there must be a message resend mechanism. The Producer's send method itself supports internal retry.
1. Retry up to 2 times (2 for synchronous sending, 0 for async sending).
2. If sending fails, switch to the next Broker. The total duration of this method is no more than the set sendMsgTimeout value, default 3s.
3. If sending a message to the broker causes a timeout exception, it will not be retried.
For key business operations, if message-sending failure occurs, it is advisable to store the message in a db and use a timer-like thread for periodic retries to resend the message to downstream consumers after RocketMQ service resumption.
Printing Message Sending Log
Regardless of whether the message is sent successfully or fails, it is recommended to log attributes such as sendresult, msgid, key, and tag in logs for troubleshooting.
Consumer Practical Tutorial
Using Consumption Groups to Isolate Downstream Businesses
Different businesses can use consumption groups to independently consume the same topic, and each consumer maintains its own consumption offset (offsets), ensuring every consumer in the same group subscribes to the same topic and uses the same filtering rules.
Idempotent Processing
RocketMQ ensures at-least-once consumption in principle and cannot avoid message duplication. For example, network jitter during sending can trigger retries, consumption exceptions during delivery can also cause retries, and load balancing during consumer restarts may lead to duplicate messages. Therefore, if your business is super sensitive to duplicate consumption, you must perform deduplication at the business level. You can use distributed cache or relational database to deduplicate.
For idempotent processing, first determine the unique key of the message. It is not recommended to depend on msgid. Instead, use the unique business identification field set in the keys field, such as order Id. Before consumption, check whether the unique key exists in the distributed cache or relational database. If it does not exist, insert it and consume. Otherwise, skip or perform further deduplication according to business logic.
Retry on Consumption Failure
Concurrent consumption: Failed concurrent consumption uses a backoff retry mechanism. Messages that fail to be consumed are sent back to the system's delayed queue. Each time consumption fails, the delayLevel increases by 1. The default maximum number of retries is 16. Messages exceeding the maximum number will enter the dead letter queue. The time intervals for 16 retries correspond to delayed message levels 3 to 18, ranging from 10s to 2h. For details, see Message Retry. Sequential consumption: To ensure sequence, sequential consumption does not send messages back to the server. It uses a local continuous retry mechanism with a default retry count of Integer.MAX_VALUE. The maximum number of retries is customizable. Messages exceeding the maximum number will enter the dead letter queue.
The maximum number of retries can also be configured through maxReconsumeTimes.
Enhancing Consumption Parallelism
Most message consumption behaviors are IO-intensive, which may involve database operations or RPC calls during the process. The consumption speed depends on the throughput of backend databases or external systems. Adding consumption parallelism can improve overall consumption throughput.
1. Increase the count of consumption nodes. Under the same ConsumerGroup, raise the degree of parallelism by adding the number of instance nodes (notably, Consumer instances exceeding the subscription queue count are invalid). This can be done through adding machines or starting multiple processes on existing machines.
2. Increase the number of threads for each consumer node by changing the Consumer parameters consumeThreadMin and consumeThreadMax to achieve higher concurrency.
3. Batch processing consumption - If specific business processes support batch processing, it can significantly raise consumption throughput. For example, in order deduction applications, processing one order takes 1s, while processing 10 orders may only take 2s. This way, throughput can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize parameter, which is 1 by default (consuming only one message at a time), you can adjust it to N so each consumption handles up to N messages.
4. Skip non-critical messages. In case of message backlog, if business data requirements are not high, you can skip expired messages through time filtering or drop unimportant messages based on business selection to enhance message processing efficiency.
Subscription Relationship Consistency
A Consumer Group normally represents multiple nodes with consistent Consumer business logic. A consistent subscription relationship means all consumers in the same Consumer Group have identical subscription relationships. Otherwise, the consumption logic may become chaotic, even leading to message loss.
Try to ensure compatibility with subscription relationship changes. It is advisable not to alter the subscribed Topic. Only perform incremental subscriptions for filter expressions. During the process of publishing among different consumer nodes in the same Group, inconsistent filtering rules may occur between nodes. When the Broker filters messages, different consumer nodes may apply different filtering rules. Therefore, once the subscription relationship of the same Group changes, pay attention to its compatibility to avoid missing messages.
Tencent Cloud RocketMQ console provides diagnostics for inconsistent subscription relationships in the consumer detail page, where you can see which nodes have this occurrence.
Printing Consumption Message Log
Regardless of whether the message is sent successfully or fails, it is recommended to first print a log upon receiving the message, including attributes such as msgId, key, and tag, for troubleshooting. After message processing is completed, also print msgid, key, tag, retry count, and consumption result in logs.
Additional Consumption Recommendations
About Consumers and Subscriptions
The first thing to note is that different consumer groups can independently consume some Topics, and each consumer group has its own consumption offset. Please ensure each consumer in the same group maintains information consistency.
About Ordered Messages
The consumer will lock each message queue to ensure they are consumed one by one. Although this will result in performance degradation, it is useful when you care about message order. We do not recommend throwing an exception. You can go back to ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT as an alternative.
About Concurrent Consumption
As the name suggests, the consumer will consume these messages concurrently. We recommend you use it to obtain good performance. We do not recommend throwing an exception. You can return ConsumeConcurrentlyStatus.RECONSUME_LATER as an alternative.
About Consumption Status
For a concurrent message listener, you can return RECONSUME_LATER to notify the consumer that this message cannot be consumed now and hope to reconsume it later. Then, you can continue to consume other messages. For an ordered message listener, since you care about its order, the message cannot be skipped. However, you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait a moment.
About Blocking
It is not recommended to block the listener because it may block the thread pool and eventually terminate the consumption process.
About Consumer Offset
When creating a new consumer group, you need to determine whether to CONSUME historical messages already present in the Broker. CONSUME_FROM_LAST_OFFSET will ignore historical messages and CONSUME any messages generated afterward. CONSUME_FROM_FIRST_OFFSET will CONSUME each message present in the Broker. You can also use CONSUME_FROM_TIMESTAMP to CONSUME messages generated after the specified TIMESTAMP.
Troubleshoot Practice
SDK Log
The communication protocol between RocketMQ client and server is complex. For example, the allocation of consumption queues and topic addressing are determined by the client SDK implementation. Important information about these processes is saved in SDK logs. Therefore, when production or consumption issues occur, SDK logs are one of the most important means to troubleshoot problems. Be sure to save these logs. Normally, SDK logs are not printed in the same file as business logs. We provide the default path for commonly used SDK logs as follows:
|
Java | remoting | ~/logs/rocketmqlogs/rocketmq_client.log |
Java | grpc | ~/logs/rocketmq/rocketmq_client.log |
Go | remoting | /tmp/rocketmq-client.log |
Go | grpc | ~/logs/rocketmqlogs/rocketmq_client_go.log |