This document mainly introduces the concept, technical principles, scenarios, and usage method of transactional messages in TDMQ for RocketMQ.
Relevant Concepts
Transactional messages are an advanced feature provided by RocketMQ. By binding the actions of a two-phase commit with local transactions, they ensure eventual consistency between message production and local transactions in distributed scenarios. Compared with normal messages, transactional messages primarily extend the mechanisms for secondary confirmation and compensation through local transaction status verification.
1. Half message:
After a producer sends a transactional message to the RocketMQ server, the message is persisted and marked in a temporarily undeliverable status. It only becomes eligible for delivery (visible to consumers) after the local transaction is executed and confirmed. A message in this status is called a half message.
2. Two-phase commit:
A key mechanism to achieve eventual consistency for transactions. Phase 1: Sending the half message. Phase 2: The producer executes the local transaction and, based on the result, sends a confirmation to the RocketMQ server to either commit (allow delivery) or rollback (discard the message). This determines the final disposition of the half message.
3. Operation (OP) message:
OP messages are used to tag the message status. If a half message has no corresponding OP message, it indicates that the second-phase confirmation status is unknown, triggering the RocketMQ server to actively verify the local transaction status. The content of an OP message is the storage offset of its corresponding half message.
4. Related topics:
Real topic: the actual business topic specified by the producer when sending messages.
Half topic: a system topic named RMQ_SYS_TRANS_HALF_TOPIC, used to store half messages.
OP topic: a system topic named MQ_SYS_TRANS_OP_HALF_TOPIC, used to store OP messages. Once a half message receives its secondary status confirmation (either commit or rollback), a corresponding OP message is written to this topic.
Scenarios
For example, in a typical distributed scenario such as cross-bank fund transfer, it is imperative to strictly guarantee the eventual consistency between the deduction from the payer's account and the deposit into the payee's account. This can be achieved using TDMQ for RocketMQ's transactional messages, which involves the following three stages:
1. Stage 1: Sending transactional messages (preparing for fund transfer)
After User 1 initiates a transfer, Bank System A where it is located sends a transactional message to the corresponding business topic on the RocketMQ server. The message content indicates "Transfer USD 1,000 from User 1 to User 2's account". At this point, this message is invisible to System B of the receiving bank. This prevents System B from executing the deposit operation prematurely, before the local transaction on the payer's side is confirmed, ensuring the safety of the fund flow.
2. Stage 2: Executing local transactions (debit operation on the payer's account)
After the transactional message is successfully sent, System A proceeds to execute the local transaction by debiting User 1's account balance. If the deduction is successful, it submits a secondary confirmation (commit) to the RocketMQ server, and the message is subsequently delivered downstream. Conversely, if it submits a rollback, the transaction ends, and the balances of both accounts remain unchanged.
3. Stage 3: Downstream service consumption (credit operation on the payee's account)
The payee's Bank System B has previously subscribed to the transfer topic. Upon receiving the message, it executes the operation to increase User 2's account balance. If consumption fails due to network anomalies or account status issues, RocketMQ automatically triggers its retry mechanisms. If the message still fails after multiple retries, it will be moved to the dead letter queue. Manual intervention will then be required for reconciliation, ensuring the funds are ultimately deposited accurately through a compensation process.
Through these three stages, the RocketMQ transactional message mechanism successfully implements eventual consistency for distributed transactions in the cross-bank transfer scenario. Similarly, in scenarios such as e-commerce order payment and inventory deduction, financial transaction reconciliation, and multi-system data synchronization within enterprises, RocketMQ transactional messages can reliably ensure the eventual consistency of cross-service operations with their robust mechanisms.
Implementation Process
1. The producer sends a transactional message to the RocketMQ server.
2. After storing this message, the server returns a success response. At this point, the message is invisible to downstream consumers and remains in the half message status.
3. Upon receiving the successful response for the half message, the producer proceeds to execute the local transaction (such as updating the business database).
4. Based on the execution result of the local transaction, the producer submits the final status (secondary confirmation) to the RocketMQ server.
5. If the confirmation result is commit, the server proceeds to deliver the transactional message to consumers. If the confirmation result is rollback, the server discards the message and stops its delivery.
6. If the confirmation result is unknown or no confirmation result is received within a certain period, a proactive check of the transaction status will be triggered.
7. If the producer has not submitted a final status or the secondary confirmation result is unknown, the RocketMQ server proactively initiates a transaction result query request to the producer service.
8. After receiving the request, the producer submits the secondary confirmation result, and the logic returns to Step 5. If the producer service is temporarily unavailable, the RocketMQ server proceeds to initiate check requests at specified intervals until the maximum number of checks is exceeded, at which point the message is rolled back.
In this way, eventual consistency of the transaction status is achieved regardless of whether the local transaction succeeds. The following sequence diagram demonstrates the steps above:
Usage Examples
This example uses a TDMQ for RocketMQ 5.x cluster to demonstrate how to use transactional messages and their effect.
2. Taking Java as an example, introduce the dependencies corresponding to version 5.x.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
3. Start the producer.
public class ProducerTransactionMessageDemo {
private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class);
private static boolean executeLocalTransaction() {
return true;
}
private static boolean checkTransactionStatus(String orderId) {
return true;
}
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String accessKey = "your-ak";
String secretKey = "your-sk";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "https://your-endpoints";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "tran_topic";
TransactionChecker checker = messageView -> {
log.info("Receive transactional result check request, message={}", messageView);
String orderId = messageView.getProperties().get("orderId");
boolean isSuccess = checkTransactionStatus(orderId);
return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
};
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.setTransactionChecker(checker)
.build();
final Transaction transaction = producer.beginTransaction();
byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "tagA";
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("your-key-565ef26f5727")
.addProperty("orderId", "0001")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message, transaction);
log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
return;
}
boolean localTxSuccess = executeLocalTransaction();
if (localTxSuccess) {
transaction.commit();
} else {
transaction.rollback();
}
}
}
4. After running the code, you can see a message that has been delivered and is waiting for consumption on the Message Query page in the console.
5. Start a consumer and subscribe to this topic. After the message is successfully consumed, view the message trace in the Tencent Cloud console.
6. Modify the code to simulate a failed local transaction execution, causing the transactional message in a half message status to roll back.
private static boolean executeLocalTransaction() {
return false;
}
private static boolean checkTransactionStatus(String orderId) {
return false;
}
7. At this point, you can find that the message has been sent successfully, but it is not visible on the Message Query page in the console, and this message cannot be consumed even if you start the consumer.
Must-Knows
When you use transactional messages, take note of the following points:
1. The topic type must be TRANSACTION. Otherwise, an error will occur during message production. Key error message: current message type not match with topic accept message types.
2. Transactional messages do not support delays. If a delay property is set, it will be cleared before the message is sent.
3. If the local transaction executes slowly, the server should return unknown during the transaction checkback. Furthermore, if it is confirmed that the local transaction will take a long time, adjust the first transaction checkback time to avoid generating a large number of transactions with unknown results.