Message Type | Consumption Sequence | Performance | Use Cases |
General message | No sequence | High | Suitable for scenarios with high demands for throughput and no requirement for production and consumption sequence. |
Sequential message | Messages in a specified Topic following the FIFO rule | Average | Scenarios having average demands for throughput and requiring publishing and consuming all messages in a specified Topic in strict accordance with the FIFO rule |

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
SendResult send(Message msg, MessageQueueSelector selector, Object arg) method is called. MessageQueueSelector is a queue selector and arg is a Java object, which can be passed in as the classification standard for message sending partition.MessageQueueSelector API is as follows:public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
/*** Description: Sequential consumer*/public class OrderConsumer {/***topic name*/private static final String TOPIC_NAME = "order_topic";/*** consumer group name*/private static final String GROUP_NAME = "group2";public static void main(String[] args) throws Exception {create message consumer// Instantiate the consumer.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME,new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),new AllocateMessageQueueAveragely(), true, null);// Set the NameServer address.consumer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/*Set whether the Consumer starts consuming from the head or tail of the queue on first startupif not first startup, then resume consumption from last position*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// subscribe to all messages in the topicconsumer.subscribe(TOPIC_NAME, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {// You can see each queue has a unique consume thread to consume, order is ordered per queue (partition)System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", msgId=" + msg.getMsgId() + ", content:" + new String(msg.getBody()));}try {// Simulating business logic processing...TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
Feedback