Message Type | Consumption Order | Performance | Scenario |
Normal message | No specific order. | High | Suitable for scenarios requiring high throughput with no specific requirements on production and consumption orders. |
Ordered message | Messages within a specified topic follow the FIFO principle. | Moderate | Suitable for scenarios with moderate throughput requirements but requiring that messages within a specific topic must be produced and consumed in strict accordance with the FIFO principle. |

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {// Instantiate a message producer.DefaultMQProducer producer = new DefaultMQProducer(groupName,// Access control list (ACL) permissions.new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY)), true, null);// Set the NameServer address.producer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");// Start the producer instance.producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
SendResult send(Message msg, MessageQueueSelector selector, Object arg) method. MessageQueueSelector is a queue selector, and arg is a Java object that can be passed in as the classification criterion for message sending partitions.MessageQueueSelector is as follows:public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
/*** Description: Consumer of ordered messages.*/public class OrderConsumer {/*** Topic name.*/private static final String TOPIC_NAME = "order_topic";/*** Consumer group name.*/private static final String GROUP_NAME = "group2";public static void main(String[] args) throws Exception {// Create a message consumer.// Instantiate a consumer.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME,new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),new AllocateMessageQueueAveragely(), true, null);// Set the NameServer address.consumer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** Set whether the consumer starts consuming from the beginning or the end of the queue on its first startup.* If it is not the first startup, consumption continues from the last consumption position.*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// Subscribe to all messages in the topic.consumer.subscribe(TOPIC_NAME, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {// It can be seen that each queue is consumed by a unique thread, ensuring order within each queue (partition).System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", msgId=" + msg.getMsgId() + ", content:" + new String(msg.getBody()));}try {// Simulating business logic processing...TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
Feedback