tencent cloud

消息队列 MQTT 版

动态与公告
新功能发布记录
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 MQTT 版
应用场景
技术架构
产品系列
MQTT 协议兼容说明
开源对比
高可用
产品约束与使用配额
基本概念
开服地域
购买指南
计费概述
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
公网接入
VPC 网络接入
用户指南
使用流程指引
配置账号权限
新建集群
管理 Topic
连接集群
查询消息
管理客户端
管理集群
查看监控和配置告警
数据集成
集成数据到云函数 SCF
集成数据到 CKafka
集成数据到 RocketMQ
开发指南
MQTT 5 高级特性
数据面 HTTP 接口说明
配置自定义域名
配置 SQL 过滤
配置点对点订阅
MQTT over QUIC
管理客户端订阅
消息增强规则
实践教程
MQTT 客户端开发注意事项
可观测能力
Topic 与通配符订阅
API 参考
History
Introduction
API Category
Making API Requests
Cluster APIs
Topic APIs
Authorization Policy APIs
User APIs
Client APIs
Message Enhancement Rule APIs
Message APIs
Data Types
Error Codes
SDK 参考
接入点格式
Java SDK
C SDK
Javascript/Node.JS/小程序
Go SDK
iOS SDK
JavaScript SDK
Dart SDK
Python SDK
.NET
安全与合规
权限管理
常见问题
相关协议
隐私协议
数据处理和安全协议
消息队列 MQTT 版服务等级协议
联系我们

配置 SQL 过滤

PDF
聚焦模式
字号
最后更新时间: 2026-01-30 15:23:29

背景

MQTT 标准规范定义了 Topic Filter 的概念,允许订阅者基于 MQTT Topic Name 的层级结构和通配符来选择需要接收的消息。虽然 Topic 和通配符提供了较强的过滤能力,但在灰度发布、A/B 测试、系统升级等场景中,仅依赖 Topic 过滤仍无法满足更灵活的业务需求。

实现原理

MQTT 5.0协议引入了 Subscribe User Property 机制,本产品基于此机制扩展支持了 Subscribe User Property 的过滤语义,从而支持更细粒度的消息过滤能力。当订阅消息时,如果 Subscribe User Properties 中包含 Key 为 $where,Value 为合法的 WHERE 子句,MQTT Server 将在推送消息时依据该 WHERE 子句对消息进行过滤,仅将满足条件的消息投递给订阅者




基本工作流程

1. ​​订阅与声明​​:订阅者发起 Subscribe 请求,在 User Property 中声明过滤条件($where)。
2. ​​条件解析​​:服务端解析并校验 WHERE 子句的有效性。
3. ​​消息匹配​​:当有消息发布时,服务端对匹配主题的消息应用所有订阅者的过滤条件。
4. ​​精准投递​​:仅将满足条件的消息投递给订阅者。

SQL 过滤语法

WHERE子 句支持丰富的操作符和函数,用于构建灵活的过滤条件。

支持的操作符

类型
操作符
示例
说明
​​比较运算符​​
=, !=, >, >=, <, <=
payload.temp > 30
比较数值或字符串
​​逻辑运算符​​
AND, OR, NOT
temp > 25 AND hum < 70
组合多个条件
​​范围判断​​
IN
clientid IN ('client1', 'client2')
判断字段值是否在列表中
​​空值判断​​
IS NULL
payload.location IS NULL
判断字段是否为NULL
​​模式匹配​​
LIKE
topic LIKE 'sensor/%/temp'
简单的通配符匹配
​​条件表达式​​
CASE WHEN...THEN...ELSE...END
CASE WHEN qos > 0 THEN 'important' ELSE 'normal' END
实现条件逻辑

支持的函数

类型
函数示例
说明
​​字符串函数​​
UPPER(), LOWER(), LENGTH()
处理文本数据
​​数学函数​​
ABS()
计算绝对值
​​条件函数​​
COALESCE()
返回参数中第一个非NULL值

注意事项

1. 每个 Subscribe 请求的 User Properties 中,只能有一个 Key 为 $where 的属性。若存在多个 $where→ WHERE 子句的属性对,仅第一个生效
2. ​​对于消息中的用户属性(User Property),若存在多个同名的 Key-Value 对,仅取最后一个出现的 Value 参与过滤表达式的计算。
3. ​​若过滤表达式引用的字段在消息属性中不存在,则该字段的值将被视为NULL
4. 字符串字面量请使用单引号表示,例如:WHERE type = 'string-literal'

示例

package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class BasicQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";
String clientId = "deviceBasic";

String topic = "home/room/1";
String[] topicFilters = new String[] {"home/#"};
int[] qos = new int[] {1};

MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",
topic, message.getQos(), content, message.getProperties());
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
try {
// Subscribe
MqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];
for (int i = 0; i < topicFilters.length; i++) {
subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);
}
MqttProperties subscribeProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");
userProperties.add(userProperty);
subscribeProperties.setUserProperties(userProperties);
client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}


int total = 128;
for (int i = 0; i < total; i++) {
byte[] payload = new byte[4];
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.putInt(i);
MqttMessage message = new MqttMessage(payload);
message.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setContentType("application/json");
properties.setResponseTopic("response/topic");
message.setProperties(properties);
System.out.printf("Prepare to publish message %d%n", i);
// P2P topic format: {first-topic}/p2p/{target-client-id}
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.MILLISECONDS.sleep(100);
}
TimeUnit.MINUTES.sleep(3);
client.disconnect();
}
}


帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈