消息类型 | 消费顺序 | 性能 | 适用场景 |
普通消息 | 无顺序 | 高 | 适用于对吞吐量要求高,且对生产和消费顺序无要求 |
顺序消息 | 指定的 Topic 内的消息遵循先入先出(FIFO)规则 | 一般 | 吞吐量要求一般,但是要求特定的 Topic 严格地按照 FIFO 原则进行消息发布和消费的场景 |

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
SendResult send(Message msg, MessageQueueSelector selector, Object arg) 方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。MessageQueueSelector 的接口如下:public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
/*** Description: 顺序消费者*/public class OrderConsumer {/*** topic名称*/private static final String TOPIC_NAME = "order_topic";/*** 消费者组名称*/private static final String GROUP_NAME = "group2";public static void main(String[] args) throws Exception {// 创建消息消费者// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP_NAME,new AclClientRPCHook(new SessionCredentials("accessKey", "secretKey")),new AllocateMessageQueueAveragely(), true, null);// 设置NameServer的地址consumer.setNamesrvAddr("rmq-xxx.rocketmq.xxxtencenttdmq.com:8080");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);/** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅topic中的所有的信息consumer.subscribe(TOPIC_NAME, "*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", msgId=" + msg.getMsgId() + ", content:" + new String(msg.getBody()));}try {// 模拟业务逻辑处理中...TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}}
文档反馈