tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

订阅模式

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:24:30
为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。

订阅模式



独占模式(Exclusive)

Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
Exclusive 订阅模式下,同一个 Subscription 里只有一个 Consumer 能消费 Topic,如果多个 Consumer 订阅则会报错,适用于全局有序消费的场景。

Exclusive 模型图


// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为exclusive(独占)模式
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
启动多个消费者将收到错误信息如下图所示:



共享模式(Shared)

消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

Shared 模型图


// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为 Shared(共享)模式
.subscriptionType(SubscriptionType.Shared)
.subscribe();
多个 Shared 模式消费者如下图所示:



灾备模式(Failover)

当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。

Failover 模型图


// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为灾备模式
.subscriptionType(SubscriptionType.Failover)
.subscribe();
多个 Failover 模式消费者如下图所示:



KEY 共享模式(Key_Shared)

当存在多个 Consumer 时,将根据消息的 Key 进行分发,Key 相同的消息只会被分发到同一个消费者。

Key_Shared 模型图


注意:
Key_Shared 本身在使用上存在一定的限制条件,由于其工程实现复杂度较高,在社区版本迭代中,不断有对 Key_Shared 的功能进行改进以及优化,整体稳定性相较 Exclusive,Failover 和 Shared 这三种订阅类型偏弱。如果上述三种订阅类型能满足业务需要,可以优先选用上述三种订阅类型。
专业集群可以保证相同 KEY 的消息按顺序投递;虚拟集群无法保障消息投递顺序。

Key_Shared 使用建议

什么时候才考虑用 Key_Shared 订阅模式

如是普通的生产消费场景,建议直接选用 Shared 模式即可。
若需要让相同 Key 的消息分给同一个消费者,这个时候 Shared 订阅模式无法满足用户需求。有两种方式可以选择:
选择 Key_Shared 订阅模式。
通过多分区主题 + Failover 订阅模式实现。

什么场景下适合用 Key_Shared 订阅

Key 数量多且每个 Key 的消息分布相对均匀
消费处理速度快,无消息堆积的情况
如果在生产过程中不能保证上面的两个条件同时满足,建议用 【多分区主题 + Failover 订阅】

代码示例

Key_Shared 订阅示例

默认情况下,Pulsar 在生产消息时是开启 Batch 功能的,Pulsar 的 Batch 消息解析是在 Consumer 侧处理的。所以在 Broker 侧一个 Batch 消息是被当作一条 Entry 处理的,所以对于 Key_shared 的基于消息 Key 有序订阅类型来说,是没办法处理这种 Case 的,因为不同 Key 的消息有可能被打包到同一个 Batch 中。针对这种情况在创建 Producer 时有如下两种规避方式:
1. 禁用 Batch。
// 构建生产者
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息只会被分发到同一个消费者。
.key("youKey1")
.send();
2. 使用 key_based batch 类型。
// 构建生产者
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息只会被分发到同一个消费者。
.key("youKey1")
.send();
消费者代码示例:
// 构建消费者 Consumer<byte[]> consumer = pulsarClient.newConsumer() // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 .topic("persistent://pulsar-xxx/sdk_java/topic1") // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名 .subscriptionName("sub_topic1") // 声明消费模式为 Key_Shared(Key 共享)模式 .subscriptionType(SubscriptionType.Key_Shared) .subscribe();
多个 Key_Shared 模式消费者。



多分区主题 + Failover 订阅示例

注意事项:
在该模式下,每个分区同时只会分配给一个消费者实例。若消费者数量多于分区数量,超出数量的消费者无法参与消息,可以通过扩容分区数量不小于消费者数量解决。
在设计 Key 的时候尽量保证 Key 分布均匀。
Failover 模式下不支持延时消息。
1. 生产者代码示例
// 构建生产者
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false) // 禁用batch
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息会发送到同一个分区中
.key("youKey1")
.send();
2. 消费者代码示例
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为Failover模式
.subscriptionType(SubscriptionType.Failover)
.subscribe();

开启保序

TDMQ Pulsar 2.9.2 版本集群可以支持 KEY 的顺序投递。如需开启,需要在创建消费者实例时指定 keySharedPolicy。
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为 Key_Shared(Key 共享)模式
.subscriptionType(SubscriptionType.Key_Shared)
// 设置为不允许乱序
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
.subscribe();
注意:
集群 2.7.2 版本不支持开启保序,可能造成消息推送堵塞,无法消费;
开启了保序,消费者重启后可能会出现消费速率下降、消息堆积的情况,这是因为在保序模式下新消费者上线后,需要等待消费者上线之前的全部消息都被消费完成后(全部确认后),才能继续消费后面的消息。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈