tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

消息过滤

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:24:31
本文主要介绍 TDMQ Pulsar 版中消息过滤的功能、应用场景和使用方式。

功能介绍

消费者订阅了某个主题后,TDMQ Pulsar 版会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在服务端进行过滤,只获取到需要关注的消息,避免接收到大量无效的消息。因此,也可以简化业务逻辑的架构设计。
TDMQ Pulsar 专业版支持两种类型的过滤方式:
标签过滤,生产消息时打上一个或多个固定 Tag,消费者通过指定 Tag 订阅消息。
SQL 过滤,生产消息时打上一个或多个 k-v 的属性,消费者可以通过更为灵活的 SQL 92 语法来订阅消息。
对比项
标签过滤
SQL 过滤
过滤目标
消息的 Tag 标签属性
消息的 k-v 属性
过滤能力
精准匹配
SQL 语法匹配
适用场景
简单过滤场景、计算逻辑简单轻量
复杂过滤场景、计算逻辑较复杂

应用场景

通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的流水,可在客户端进行过滤,但这种过滤方式会带来带宽的资源浪费。
针对上述场景,TDMQ Pulsar 提供 Broker 端过滤的方式,用户可在生产消息时设置一个或者多个 Tag 或属性,消费时通过相应的规则进行订阅。


使用说明

消息过滤目前是通过 Properties 的方式传入的,可以通过如下方式获取:
Java
Go
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version> <!-- 推荐版本 -->
</dependency>
推荐使用最新版本。
go get -u github.com/apache/pulsar-client-go@master

标签过滤

Tag 消息不支持 Batch 功能,Batch 功能默认是开启的。如果要使用 Tag 消息,需要在 Producer 侧禁用掉 batch,具体如下:
Java
Go
// 构建生产者
Producer<byte[]> producer = pulsarClient.newProducer()
// 禁用掉batch功能
.enableBatching(false)
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
.topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{
DisableBatching: true, // 禁用掉batch功能
})
Tag 消息的过滤只针对已设置 Tag 的消息,消费者订阅 Topic 时若未设置 Tag,Topic 中的所有消息都将被投递到消费端进行消费。
如果要开启 Tag 消息,需要发送消息的时候,在 ProducerMessage 中设置 Properties 字段;同时在创建 Consumer 的时候需要在 ConsumerOptions 中指定 SubscriptionProperties 字段。
在 ProducerMessage 中设置 Properties 字段时,其中 key 为 tag 的名字,value 为固定值:TAGS
在 ConsumerOptions 中指定 SubscriptionProperties 字段时,其中 key 为要订阅的 tag 的名字,value 为 tag 的版本信息,为保留字段,目前没有实质含义,用来做后续功能的扩展,具体如下:
指定单个 tag
Java
Go
// 发送消息
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// 订阅相关参数,可用来设置订阅标签(TAG)
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("topic_sub1")
// 声明消费模式为共享模式
.subscriptionType(SubscriptionType.Shared)
// 订阅相关参数,tag订阅等。。
.subscriptionProperties(subProperties)
// 配置从最早开始消费,否则可能会消费不到历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 发送消息
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
},
}); err != nil {
log.Fatal(err)
}

// 创建 consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{"tag1": "1"},
})
指定多个 tag
Java
Go
// 发送消息
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// 订阅相关参数,可用来设置订阅标签(TAG)
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("topic_sub1")
// 声明消费模式为共享模式
.subscriptionType(SubscriptionType.Shared)
// 订阅相关参数,tag订阅等。。
.subscriptionProperties(subProperties)
// 配置从最早开始消费,否则可能会消费不到历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 创建 producer
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
},
}); err != nil {
log.Fatal(err)
}

// 创建 consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
tag 与 properties 混合
Java
Go
// 发送消息
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.property("xxx", "yyy")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// 订阅相关参数,可用来设置订阅标签(TAG)
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("topic_sub1")
// 声明消费模式为共享模式
.subscriptionType(SubscriptionType.Shared)
// 订阅相关参数,tag订阅等。。
.subscriptionProperties(subProperties)
// 配置从最早开始消费,否则可能会消费不到历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 创建 producer
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
"xxx": "yyy",
},
}); err != nil {
log.Fatal(err)
}

// 创建 consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
注意:
1. 单个消费者可以使用多个 Tag,多个 Tag 之间的关系是「或」。
2. 多个消费者需要使用相同 Tag。若一个订阅中有不同消费者使用不同 Tag ,则会出现过滤规则覆盖的情况,进而影响业务逻辑。

SQL 过滤

生产者示例

生产者可以在 property 中添加多个属性。
// 构建生产者
Producer<byte[]> producer = pulsarClient.newProducer()
// 禁用掉batch功能
.enableBatching(false)
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
.topic("persistent://pulsar-xxx/sdk_java/topic2").create();

// 发送消息
MessageId msgId = producer.newMessage()
.property("idc", "idc1") // 指定消息的属性(idc)
.property("label", "online") // 指定消息的属性(label)
.property("other", "xxx") // 指定消息的其他属性
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

消费者示例

消费者的 properties 中必须包含 TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,认为开启了 SQL92 过滤,value 即为 SQL 92 过滤表达式。
// 订阅相关参数
HashMap<String, String> subProperties = new HashMap<>();
// 消费者的properties中包含TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,认为开启了SQL92过滤,value即为过滤表达式。
subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // 表达式说明:idc属性等于idc1 并且 label属性存在

// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("topic_sub1")
// 声明消费模式为共享模式
.subscriptionType(SubscriptionType.Shared)
// 订阅配置相关参数,里面携带SQL过滤表达式
.subscriptionProperties(subProperties)
.subscribe();

注意事项

1. 如果 subscriptionProperties 中包含了 TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,即认为此订阅使用 SQL 过滤。如果同时subscriptionProperties 中还存在其他 properties,其他 properties 忽略,即不再使用标签过滤。
2. SQL 过滤是根据消息中的属性属性(即发送消息时候指定的 property)进行过滤的,和标签(Tag)过滤是相互独立的。即一旦使用了 SQL 过滤,完全按照消费属性来过滤,不再区分是否是 TAGS 标签的属性。对于 property 中 key=tag1,value=TAGS 的属性,Pulsar 将认为这是一个普通的property,key=tag1,value=TAGS 字符串的属性,不会特殊对待。
3. 消息过滤不支持 Batch 功能,Batch 功能默认是开启的,需要在创建 Producer 的时候禁用掉 batch。如果是批量的消息,服务端不会执行消息过滤,会直接投递给消费者。
4. SQL 语句中支持的属性(property)的命名格式只能由下面的字符组成:字母、数字、下划线。
5. SQL 语句中的判断条件最大不能超过 50 个,建议不超过5个。换句话说,就是 SQL 表达式语句中的 AND 和 OR 的数量要小于 50 个。BETWEEN xxx AND xxx 和 NOT BETWEEN xxx AND xxx 中包含的 AND 也会计算在内。
6. 如果新传入的 SQL 表达式规则不正确,导致服务端无法正确解析对应的 SQL 语句,则服务端会继续保持和之前一致的过滤方式。
7. 由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:
异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。
数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
8. 多个消费者需要使用相同的过滤规则。若一个订阅中有不同消费者使用不同规则 ,则会出现过滤规则覆盖的情况,进而影响业务逻辑。
注意:
SQL 过滤需要在 2024 年 9 月 10 日以后创建的专业版,才能支持该功能。存量集群若有需求,请 联系我们 升级。

实践教程

1. 消息中的 property 数量不宜过多,单个 property 的 key 和 value 值也不宜过大。建议 property 数量小于 10 个,整体字符串长度小于 512 byte。
2. 使用 SQL 过滤的时候,过滤条件不易过多,建议过滤条件控制在 5 个以内。作为过滤条件的 value 值不宜过长,建议控制在 64 byte 以内。

SQL 过滤语法规则

语法
说明
示例
IS NULL
判断属性不存在
a IS NULL :属性a不存在。
IS NOT NULL
判断属性存在
a IS NOT NULL:属性a存在。
> >= < <=
用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。
a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。
a IS NOT NULL AND a > 'abc':错误示例,abc为字符串,不能用于比较大小。
BETWEEN xxx AND xxx
用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。
a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
NOT BETWEEN xxx AND xxx
用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。
a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
IN (xxx, xxx)
表示属性的值在某个集合内。集合的元素只能是字符串。
a IS NOT NULL AND (a IN ('abc', 'def')):属性a存在且属性a的值为abc或def。
= <>
等于和不等于。可用于比较数字和字符串。
a IS NOT NULL AND (a = 'abc' OR a<>'def'):属性a存在且属性a的值为abc或a的值不为def。
AND OR
逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。
a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈