Release Notes
Announcements
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));// Send messages.SendResult sendResult = producer.send(msg);System.out.println("sendResult = " + sendResult);}
// Subscribe to a topic. The following example subscribes to all tags.pushConsumer.subscribe(topic_name, "*");//Subscribe to the specified tag.//pushConsumer.subscribe(TOPIC_NAME, "Tag1");//Subscribe to multiple tags.//pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");// Register a callback implementation class to handle messages pulled from the broker.pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logic.System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as successfully consumed and return an appropriate status.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instance.pushConsumer.start();
Parameter | Description |
topic_name | Topic name. You can copy the name from the Topic Management page in the console. 4.x virtual/exclusive cluster: Concatenate the namespace name in the format of full namespace name%topic name, such as MQ_INSTxxx_aaa%TopicTest.4.x general cluster/5.x cluster: The namespace name does not need to be concatenated. Enter the topic name. |
"*" | If the subscription expression is null or uses the * wildcard, it indicates subscribing to all messages. It also supports the format "tag1 || tag2 || tag3" to subscribe to multiple types of tags. |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// Send messages.SendResult sendResult = producer.send(msg);System.out.println("sendResult = " + sendResult);}
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// Subscribe to a topic. The following shows the SQL expression for subscribing to a single key (most commonly used).//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//Subscribe to multiple properties.//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));// Register a callback implementation class to handle messages pulled from the broker.pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logic.System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as successfully consumed and return an appropriate status.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instance.pushConsumer.start();
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback