Release Notes
Announcements
<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
public class TransactionListenerImpl implements TransactionListener {//After half messages are sent successfully, call back this method to execute the local transaction.@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//Execute the database transaction here. If it is successful, success is returned; otherwise, unknown is returned, or a rollback is performed, waiting for recheck.return LocalTransactionState.UNKNOW;}//Recheck the local transaction.@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {//Query the data status of the local database here and then determine whether to submit the local database data status.return LocalTransactionState.COMMIT_MESSAGE;}}
//The user is required to implement one TransactionListener instance.TransactionListener transactionListener = new TransactionListenerImpl();// Instantiate the transactional message producer.ProducerTransactionMQProducer producer = new TransactionMQProducer("transaction_group",// Access control list (ACL) permissions.new AclClientRPCHook(new SessionCredentials(ClientCreater.ACCESS_KEY, ClientCreater.SECRET_KEY)));// Set the NameServer address.producer.setNamesrvAddr(ClientCreater.NAMESERVER);producer.setTransactionListener(transactionListener);producer.start();
Parameter | Description |
groupName | Producer group name. It is recommended that the topic name be used. |
accessKey | Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console. |
secretKey | Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console. |
nameserver | Cluster access address. You can obtain the access address from the Access Information module on the cluster basic information page in the console. |
for (int i = 0; i < 3; i++) {// Message construction example.Message msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.sendMessageInTransaction(msg,null);System.out.printf("%s%n", sendResult);}
// Instantiate a consumer.DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL permissions.// Set the NameServer address.pushConsumer.setNamesrvAddr(nameserver);pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logic.System.out.printf("%s Receive transaction messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as successfully consumed.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
Parameter | Description |
groupName | Group name. You can copy the name from the Group Management page in the console. 4.x virtual cluster/exclusive cluster: Concatenate the namespace name in the format of full namespace name%group name, such as MQ_INSTxxx_aaa%GroupTest.4.x general cluster/5.x cluster: The namespace name does not need to be concatenated. Enter the group name. |
nameserver | Cluster access address. You can obtain the access address from the Access Information module on the cluster basic information page in the console. |
secretKey | Role name. You can copy the role name from the SecretKey column on the Cluster Permissions page in the console. |
accessKey | Role token. You can copy the token from the AccessKey column on the Cluster Permissions page in the console. |
// Subscribe to a topic.pushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to handle messages pulled from the broker.pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logic.System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as successfully consumed and return an appropriate status.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instance.pushConsumer.start();
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback