Message Sender Use Cases
Choosing Between Topics and Tags
A topic is a message category used to distinguish messages of the same business type. A tag is a special property or sub-type of a message. Downstream consumers can perform efficient filtering on the broker side using filter expressions.
Use the following guidelines to decide between using a topic or a tag:
1. Consistency of message type: Normal messages, scheduled messages, transactional messages, and ordered messages cannot be mixed. Different message types must use different topics.
2. Consistency of business scenario: For example, order messages and payment messages should use different topics. If order messages need to be filtered by province or region downstream, the city can be marked as a tag.
3. Consistency of message volume: TDMQ for RocketMQ manages consumption progress per topic. If messages within the same topic vary drastically in volume, consumers interested in a less frequent tag would first receive a large number of irrelevant messages and need to filter them out on the client side. This leads to wasted resources and increased consumption delays.
In summary, we recommend that you use different topics to distinguish messages with no business association and use tags to differentiate key properties within the same topic, enabling efficient filtering of partial messages by different downstream consumers.
Using Keys
It is recommended to set the unique business identifier for each message in the Key field. The server creates an index (hash index) for each message. You can query the message content and its trace using the topic and key, which facilitates troubleshooting.
Choosing an Appropriate Sending Method
TDMQ for RocketMQ supports three message sending methods:
Synchronous sending: suitable for scenarios requiring high reliability, such as payment messages and Short Message Service (SMS) notifications.
Asynchronous sending: suitable for business scenarios sensitive to response time, where the sender cannot tolerate long waits for a broker response.
One-way sending: suitable for scenarios where the sending result is not critical, such as log collection.
The send(msg) method synchronously sends messages and returns results only upon success. However, blocks may occur synchronously, increasing the sending duration. For performance-critical scenarios, use the asynchronous method send(msg, callback), and check the result in the callback. For applications that do not require delivery confirmation, such as log collection, use the sendOneWay method.
Message Resending Policy
For applications where message loss is unacceptable, a message resending mechanism is essential. The send method of the producer inherently supports internal retries:
1. It retries up to 2 times (2 for synchronous sending, and 0 for asynchronous sending).
2. If sending fails, it switches to the next broker. The total time for this method does not exceed the value set by sendMsgTimeout, which is 3s by default.
3. If a timeout exception occurs when messages are sent to the broker, no retry is performed.
For critical business messages, it is recommended to store failed messages in a database after a sending failure. A timer-based thread can then perform scheduled retries to ensure messages are resent to downstream consumers after the RocketMQ service recovers.
Printing Logs for Sent Messages
Regardless of whether the sending succeeds or fails, it is recommended to print the result in a log, including properties such as msgid, key, and tag, to facilitate troubleshooting and issue identification.
Message Consumer Use Cases
Using Consumer Groups to Isolate Different Downstream Businesses
Different consumption businesses can use separate consumer groups to independently consume the same topic. Each consumer maintains its own consumer offset. Make sure that all consumers within the same group subscribe to the same topic using the same filtering rules.
Ensuring Idempotency in Message Consumption
TDMQ for RocketMQ guarantees at-least-once message delivery in principle and cannot avoid message duplication. For example, network jitter during message sending can cause retries; consumption exceptions during message delivery can cause retries; and load balancing during consumer restarts can also lead to duplicates. Therefore, if your business is highly sensitive to duplicate consumption, deduplication must be implemented at the business logic level. You can leverage distributed caches or relational databases for deduplication.
To ensure idempotency, first determine the unique key of the message. It is not recommended to rely on msgId. Instead, use the unique business identifier field set in the keys field, such as an order ID. Before consumption, check whether this unique key exists in the distributed cache or relational database. If it does not exist, insert it and proceed with consumption. Otherwise, skip it or perform further deduplication based on business logic.
Consumption Failure Retries
Concurrent consumption employs a backoff retry mechanism for failed messages. Failed messages are sent back to the system's delay queue. With each failure, the delayLevel will increase by 1. The default maximum number of retries is 16; exceeding this limit routes messages to the dead letter queue. The retry intervals for 16 attempts correspond to delay message levels 3 to 18, ranging from 10s to 2h. For details, see Message Retries. Sequential consumption: To ensure message ordering, failed messages are not sent back to the server. Instead, a local continuous retry mechanism is employed. The default maximum number of retries is Integer.MAX_VALUE. You can set a maximum number of retries; retries exceeding this value will enter the dead letter queue.
You can also configure the maximum number of retries using maxReconsumeTimes.
Increasing Consumption Parallelism
Most message consumption behaviors are I/O-intensive, involving operations such as database access or RPC calls during processing. The consumption speed for such behaviors depends on the throughput of backend databases or external systems. Increasing consumption parallelism can improve overall consumption throughput.
1. Increase the number of consumer nodes: To increase parallelism, you can add more consumer instances within the same consumer group. Note that consumer instances exceeding the number of subscribed queues are invalid. This can be achieved by adding machines or starting multiple processes on existing machines.
2. To achieve higher concurrency, increase the number of threads of a single consumer node by modifying the Consumer parameters consumeThreadMin and consumeThreadMax.
3. Batch consumption: If the business process supports batch consumption, throughput can be significantly improved. For example, in an order deduction application, processing one order takes 1s, while processing 10 orders may only take 2s, thereby greatly improving consumption throughput. By setting the consumeMessageBatchMaxSize parameter of the consumer (default is 1, meaning one message is consumed at a time), for example, to N, the number of messages consumed per batch will be less than or equal to N.
4. Skip non-critical messages: When message backlogs occur, if business requirements permit, you can filter out expired messages based on time or discard less important messages based on business rules to improve processing efficiency.
Subscription Relationship Consistency
A consumer group typically represents a group of consumer nodes with consistent consumption business logic. Subscription relationship consistency means that all consumer instances under the same consumer group should have identical subscription relationships. Otherwise, message consumption logic may become disordered, potentially leading to message loss.
To ensure compatibility when subscription relationships change, avoid modifying the subscribed topic. Only apply filter expressions incrementally. If different consumer nodes within the same group deploy changes at different times, inconsistent filtering rules may occur across nodes. When the broker filters messages, different rules may take effect on different consumer nodes. Therefore, once a group's subscription relationship changes, pay attention to compatibility to prevent message loss.
The Tencent Cloud TDMQ for RocketMQ console provides a diagnostic feature for inconsistent subscription relationships on the consumer details page, showing which nodes have inconsistent subscription relationships.
Printing Logs for Consumed Messages
Regardless of whether the sending succeeds or fails, it is recommended to print a log for a message upon receipt, including properties such as msgid, key, and tag, to facilitate troubleshooting and issue identification. After message processing is completed, print the msgid, key, tag, number of retries, and consumption result as well.
Other Consumption Recommendations
About Consumers and Subscriptions
The first thing to note is that different consumer groups can independently consume certain topics, and each consumer group has its own consumer offset. Make sure that the subscription information is consistent across all consumers in the same group.
The consumer will lock each message queue to ensure they are consumed one by one. While this may reduce performance, it is useful when message order matters. We do not recommend throwing exceptions. Instead, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT.
About Concurrent Consumption
As the name implies, the consumer will consume messages concurrently. It is recommended to implement concurrent consumption for optimal performance. We do not recommend throwing exceptions. Instead, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER.
About the Consumption Status
For concurrent message listeners, you can return RECONSUME_LATER to notify the consumer that this message cannot be processed now and should be re-consumed later. You can then continue consuming other messages. For ordered message listeners, since message order matters, you cannot skip messages. Instead, you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to instruct the consumer to wait for a while.
Blocking the listener is not recommended, as it blocks the thread pool and may eventually terminate the consumption process.
About the Consumer Offset
When creating a consumer group, you need to decide whether to consume historical messages that already exist in the broker. You can use CONSUME_FROM_LAST_OFFSET to ignore historical messages and consume any messages generated afterwards. You can use CONSUME_FROM_FIRST_OFFSET to consume all messages that already exist in the broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages generated after a specified timestamp.
Troubleshooting Practices
SDK Logs
The communication protocol between the RocketMQ client and server is complex. Processes such as consumption queue assignment and topic addressing are determined by the client SDK implementation. Critical information about these processes is recorded in the SDK logs. Therefore, when production or consumption issues occur, SDK logs are one of the most critical tools for troubleshooting. It is essential to preserve these logs. Typically, SDK logs are not printed in the same file as business logs. The following table lists the default log paths for common SDKs:
|
Java | remoting | ~/logs/rocketmqlogs/rocketmq_client.log |
Java | grpc | ~/logs/rocketmq/rocketmq_client.log |
Go | remoting | /tmp/rocketmq-client.log |
Go | grpc | ~/logs/rocketmqlogs/rocketmq_client_go.log |