tencent cloud

Message Filtering
Last updated:2025-12-24 15:03:03
Message Filtering
Last updated: 2025-12-24 15:03:03
This document mainly introduces the message filtering feature of TDMQ for Apache Pulsar, including its application scenarios and usage methods.

Feature Introduction

After a consumer subscribes to a topic, TDMQ for Apache Pulsar delivers all messages in the topic to the consumer. If the consumer only needs to focus on specific messages, filter conditions can be set on the server to obtain only relevant messages. This prevents receiving a large number of irrelevant messages and, therefore, simplifies the architectural design of business logic.
TDMQ for Apache Pulsar Pro Edition supports two types of filtering methods:
Tag-based message filtering: When messages are produced, one or more fixed tags are attached, and consumers subscribe to messages by specifying tags.
SQL-based message filtering: When messages are produced, one or more k-v properties are attached, and consumers can subscribe to messages through the flexible SQL 92 syntax.
Comparison Item
Tag-based Message Filtering
SQL-based Message Filtering
Filtering target
Tag property of messages
K-v property of messages
Filtering capability
Exact match
SQL syntax matching
Scenario
Simple filtering scenarios; simple and lightweight computing logic
Complex filtering scenarios; complex computing logic

Scenarios

Usually, a topic stores messages with the same business properties. For example, the transaction statement topic contains the order statements, payment statements, and delivery statements. To consume only a certain type of statement, the business department can filter it on the client. However, this filtering method wastes bandwidth resources.
In the preceding scenario, TDMQ for Apache Pulsar provides broker-side filtering. Users can set one or more tags or properties during message production and subscribe to the messages by specifying rules during consumption.


Usage Instructions

Message filtering is passed through properties and can be obtained using the following method:
Java
Go
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version> <!-- Recommended version -->
</dependency>
It is recommended that the latest version be used.
go get -u github.com/apache/pulsar-client-go@master

Tag-based Message Filtering

Message tagging does not support the batch feature. By default, the batch feature is enabled. To use message tagging, disable the batch feature on the producer side. The details are as follows:
Java
Go
// Create a producer.
Producer<byte[]> producer = pulsarClient.newProducer()
// Disable the batch feature.
.enableBatching(false)
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name.
.topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{
DisableBatching: true, // Disable the batch feature.
})
Tag-based message filtering applies only to messages with tags set. If no tag is set when a consumer subscribes to a topic, all messages in the topic will be delivered to the consumer for consumption.
To enable message tagging, set the Properties field in ProducerMessage when a message is sent. In addition, you need to specify the SubscriptionProperties field in ConsumerOptions when a consumer is created.
When the Properties field is set in ProducerMessage, the key is the tag name, and the value is a fixed value of TAGS.
When the SubscriptionProperties field in ConsumerOptions is specified, the key is the name of the tag to be subscribed to, and the value is the version information of the tag. It is a reserved field and has no substantive meaning. It is used for subsequent feature expansion. The details are as follows:
Specifying a single tag
Java
Go
// Send a message.
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// Subscription-related parameters that can be used to set the subscription tag (TAG).
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
// Construct a consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page.
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// You need to create a subscription on the topic details page in the console. Specify the subscription name here.
.subscriptionName("topic_sub1")
// Declare the consumption mode to be the Shared mode.
.subscriptionType(SubscriptionType.Shared)
// Subscription-related parameters, including tag subscription.
.subscriptionProperties(subProperties)
// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed.
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Send messages.
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
},
}); err != nil {
log.Fatal(err)
}

// Create a consumer.
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{"tag1": "1"},
})
Specifying multiple tags
Java
Go
// Send messages.
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// Subscription-related parameters that can be used to set the subscription tag (TAG).
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
// Construct a consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page.
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// You need to create a subscription on the topic details page in the console. Specify the subscription name here.
.subscriptionName("topic_sub1")
// Declare the consumption mode to be the Shared mode.
.subscriptionType(SubscriptionType.Shared)
// Subscription-related parameters, including tag subscription.
.subscriptionProperties(subProperties)
// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed.
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producer.
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
},
}); err != nil {
log.Fatal(err)
}

// Create a consumer.
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
Mixing tags and properties
Java
Go
// Send messages.
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.property("xxx", "yyy")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

// Subscription-related parameters that can be used to set the subscription tag (TAG).
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
// Construct a consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page.
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// You need to create a subscription on the topic details page in the console. Specify the subscription name here.
.subscriptionName("topic_sub1")
// Declare the consumption mode to be the Shared mode.
.subscriptionType(SubscriptionType.Shared)
// Subscription-related parameters, including tag subscription.
.subscriptionProperties(subProperties)
// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed.
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producer.
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
"xxx": "yyy",
},
}); err != nil {
log.Fatal(err)
}

// Create a consumer.
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
Note:
1. A consumer can use multiple tags, and the relationship between multiple tags is OR.
2. Multiple consumers should use the same tag. If different consumers within a subscription use different tags, filtering rules may be overwritten, affecting the business logic.

SQL-based Message Filtering

Producer Examples

Producers can add multiple properties in the property field.
// Construct a producer.
Producer<byte[]> producer = pulsarClient.newProducer()
// Disable the batch feature.
.enableBatching(false)
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name.
.topic("persistent://pulsar-xxx/sdk_java/topic2").create();

// Send a message.
MessageId msgId = producer.newMessage()
.property("idc", "idc1") // Specify the property of the message (idc).
.property("label", "online") // Specify the property of the message (label).
.property("other", "xxx") // Specify other properties of the message.
.value(value.getBytes(StandardCharsets.UTF_8))
.send();

Consumer Examples

In the Properties field of the consumer, TDMQ_PULSAR_SQL92_FILTER_EXPRESSION must be included to enable SQL92 filtering, where the value is the SQL92 filter expression.
// Subscription-related parameters.
HashMap<String, String> subProperties = new HashMap<>();
// If the Properties field of the consumer contains TDMQ_PULSAR_SQL92_FILTER_EXPRESSION, SQL92 filtering is enabled, and the value is the filter expression.
subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // The expression means that the idc property is set to idc1, and the label property exists.

// Construct a consumer.
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page.
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
// You need to create a subscription on the topic details page in the console. Specify the subscription name here.
.subscriptionName("topic_sub1")
// Declare the consumption mode to be the Shared mode.
.subscriptionType(SubscriptionType.Shared)
// Parameters related to subscription configuration, containing SQL filter expressions.
.subscriptionProperties(subProperties)
.subscribe();

Must-Knows

1. If subscriptionProperties contains TDMQ_PULSAR_SQL92_FILTER_EXPRESSION, the subscription uses SQL-based message filtering. If other properties also exist in subscriptionProperties, they are ignored. That is, tag-based message filtering is no longer used.
2. SQL-based message filtering is performed based on the Properties field in a message (that is, the properties specified when the message is sent), and it is independent of tag-based message filtering. That is, once SQL-based message filtering is used, filtering is performed based on the consumption properties, regardless of whether a property is tagged. TDMQ for Apache Pulsar regards properties with the key set to tag1 and the value set to TAGS as ordinary properties, which are not treated specially.
3. Message filtering does not support the batch feature. The batch feature is enabled by default and needs to be disabled when a producer is created. For batched messages, the server does not perform message filtering and delivers the messages to the consumers directly.
4. Properties supported in SQL statements can only contain letters, digits, and underscores (_).
5. The number of conditions in an SQL statement cannot exceed 50, and it is recommended that it does not exceed 5. In other words, the number of AND and OR operators in an SQL expression should be less than 50. AND operators included in BETWEEN xxx AND xxx and NOT BETWEEN xxx AND xxx are also counted.
6. If the newly input SQL expression rule is incorrect and the server cannot parse the SQL statement correctly, the server maintains the previous filtering method.
7. For SQL-based property filtering, the producer defines message properties and the consumer sets the SQL-based filter conditions. Therefore, the calculation result of the filter conditions is uncertain. The handling methods of the server are as follows:
Exception handling: If the calculation of the filter condition expression is abnormal, messages are filtered out by default and are not delivered to consumers. For example, compare numeric and non-numeric values.
Handling of null values: If the calculation value of the filter condition expression is null or not a boolean type (true/false), messages are filtered out by default and are not delivered to consumers. For example, if a property is not defined when a message is sent, but it is directly used in the filter condition at subscription, the calculation result of the filter condition expression is null.
Handling of inconsistent value types: If a custom property of a message is of a floating-point type but is used as an integer for calculation in the filter condition, the message is filtered out by default and is not delivered to consumers.
8. Multiple consumers need to use the same filtering rule. If different consumers within the same subscription use different rules, filtering rules may be overwritten, affecting the business logic.
Note:
SQL-based message filtering is supported only in clusters of Pro Edition created after September 10, 2024. If existing clusters require this feature, contact us for an upgrade.

Use Cases

1. The number of properties in a message should not be excessive, and the key or value of a property should not be too large. It is recommended that the number of properties be less than 10, and the total string length be less than 512 bytes.
2. When SQL-based message filtering is used, it is recommended that the number of filter conditions be less than 5 and the value of a filter condition be within 64 bytes.

Syntax Rules of SQL-based Message Filtering

Syntax
Description
Example
IS NULL
Determines that a property does not exist.
a IS NULL: Property a does not exist.
IS NOT NULL
Determines that a property exists.
a IS NOT NULL: Property a exists.
> >= < <=
Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. Note: Strings that can be converted to numbers are also considered as numbers.
a IS NOT NULL AND a > 100: Property a exists, and its value is greater than 100.
a IS NOT NULL AND a > 'abc': Incorrect example. abc is a string and cannot be compared.
BETWEEN xxx AND xxx
Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. It is equivalent to >= xxx AND <= xxx, indicating that the property value is between two numbers.
a IS NOT NULL AND (a BETWEEN 10 AND 100): Property a exists, and its value is greater than or equal to 10 and less than or equal to 100.
NOT BETWEEN xxx AND xxx
Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. It is equivalent to < xxx OR > xxx, indicating that the property value is outside the range of two values.
a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Property a exists, and its value is less than 10 or greater than 100.
IN (xxx, xxx)
Indicates that the property value is within a specific set that contains only strings.
a IS NOT NULL AND (a IN ('abc', 'def')): Property a exists, and its value is abc or def.
= <>
Equality and inequality, used to compare numbers and strings.
a IS NOT NULL AND (a = 'abc' OR a<>'def'): Property a exists, and its value is abc or not def.
AND OR
Logical AND and OR, which can be used to combine any simple logical conditions, with each logical condition enclosed in parentheses.
a IS NOT NULL AND (a > 100) OR (b IS NULL): Property a exists, and its value is greater than 100, or property b does not exist.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback