pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.6.1</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.6.1</version></dependency>
// Instantiate the message producerDefaultMQProducer producer = new DefaultMQProducer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission);// Set the NameServer addressproducer.setNamesrvAddr(nameserver);// Start the producer instancesproducer.start();
Parameter | Description |
namespace | Namespace name, which can be copied under the Namespace tab in the console. Its format is cluster ID + | + namespace. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Send the messageSendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
// Disable retry upon sending failuresproducer.setRetryTimesWhenSendAsyncFailed(0);// Set the number of messages to be sentint messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// Logic for message sending successescountDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// Logic for message sending failurescountDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Send one-way messagesproducer.sendOneway(msg);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL permission// Set the NameServer addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
namespace | Namespace name, which can be copied under the Namespace tab in the console. Its format is cluster ID + | + namespace. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey |
// Instantiate the consumerDefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// Set the NameServer addresspullConsumer.setNamesrvAddr(nameserver);// Specify the first offset as the start offset for consumptionpullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
Parameter | Description |
namespace | Namespace name, which can be copied under the Namespace tab in the console. Its format is cluster ID + | + namespace. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
nameserver | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
// Subscribe to a topicpullConsumer.subscribe(topic_name, "*");// Start the consumer instancepullConsumer.start();try {System.out.printf("Consumer Started.%n");while (true) {// Pull the messageList<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}} finally {pullConsumer.shutdown();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
Was this page helpful?