Comparison Item | Tag-based Message Filtering | SQL-based Message Filtering |
Filtering target | Tag property of messages | K-v property of messages |
Filtering capability | Exact match | SQL syntax matching |
Scenario | Simple filtering scenarios; simple and lightweight computing logic | Complex filtering scenarios; complex computing logic |

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version> <!-- Recommended version --></dependency>
go get -u github.com/apache/pulsar-client-go@master
// Create a producer.Producer<byte[]> producer = pulsarClient.newProducer()// Disable the batch feature..enableBatching(false)// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name..topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{DisableBatching: true, // Disable the batch feature.})
TAGS.// Send a message.MessageId msgId = producer.newMessage().property("tag1", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription-related parameters that can be used to set the subscription tag (TAG).HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("topic_sub1")// Declare the consumption mode to be the Shared mode..subscriptionType(SubscriptionType.Shared)// Subscription-related parameters, including tag subscription..subscriptionProperties(subProperties)// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed..subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Send messages.if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS",},}); err != nil {log.Fatal(err)}// Create a consumer.consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1"},})
// Send messages.MessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription-related parameters that can be used to set the subscription tag (TAG).HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("topic_sub1")// Declare the consumption mode to be the Shared mode..subscriptionType(SubscriptionType.Shared)// Subscription-related parameters, including tag subscription..subscriptionProperties(subProperties)// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed..subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producer.if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS",},}); err != nil {log.Fatal(err)}// Create a consumer.consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// Send messages.MessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").property("xxx", "yyy").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription-related parameters that can be used to set the subscription tag (TAG).HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("topic_sub1")// Declare the consumption mode to be the Shared mode..subscriptionType(SubscriptionType.Shared)// Subscription-related parameters, including tag subscription..subscriptionProperties(subProperties)// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed..subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producer.if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS","xxx": "yyy",},}); err != nil {log.Fatal(err)}// Create a consumer.consumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// Construct a producer.Producer<byte[]> producer = pulsarClient.newProducer()// Disable the batch feature..enableBatching(false)// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name..topic("persistent://pulsar-xxx/sdk_java/topic2").create();// Send a message.MessageId msgId = producer.newMessage().property("idc", "idc1") // Specify the property of the message (idc)..property("label", "online") // Specify the property of the message (label)..property("other", "xxx") // Specify other properties of the message..value(value.getBytes(StandardCharsets.UTF_8)).send();
// Subscription-related parameters.HashMap<String, String> subProperties = new HashMap<>();// If the Properties field of the consumer contains TDMQ_PULSAR_SQL92_FILTER_EXPRESSION, SQL92 filtering is enabled, and the value is the filter expression.subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // The expression means that the idc property is set to idc1, and the label property exists.// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("topic_sub1")// Declare the consumption mode to be the Shared mode..subscriptionType(SubscriptionType.Shared)// Parameters related to subscription configuration, containing SQL filter expressions..subscriptionProperties(subProperties).subscribe();
Syntax | Description | Example |
IS NULL | Determines that a property does not exist. | a IS NULL: Property a does not exist. |
IS NOT NULL | Determines that a property exists. | a IS NOT NULL: Property a exists. |
> >= < <= | Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. Note: Strings that can be converted to numbers are also considered as numbers. | a IS NOT NULL AND a > 100: Property a exists, and its value is greater than 100. a IS NOT NULL AND a > 'abc': Incorrect example. abc is a string and cannot be compared. |
BETWEEN xxx AND xxx | Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. It is equivalent to >= xxx AND <= xxx, indicating that the property value is between two numbers. | a IS NOT NULL AND (a BETWEEN 10 AND 100): Property a exists, and its value is greater than or equal to 10 and less than or equal to 100. |
NOT BETWEEN xxx AND xxx | Used to compare numbers and cannot be used to compare strings. Otherwise, an error is reported when the consumer client starts. It is equivalent to < xxx OR > xxx, indicating that the property value is outside the range of two values. | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100): Property a exists, and its value is less than 10 or greater than 100. |
IN (xxx, xxx) | Indicates that the property value is within a specific set that contains only strings. | a IS NOT NULL AND (a IN ('abc', 'def')): Property a exists, and its value is abc or def. |
= <> | Equality and inequality, used to compare numbers and strings. | a IS NOT NULL AND (a = 'abc' OR a<>'def'): Property a exists, and its value is abc or not def. |
AND OR | Logical AND and OR, which can be used to combine any simple logical conditions, with each logical condition enclosed in parentheses. | a IS NOT NULL AND (a > 100) OR (b IS NULL): Property a exists, and its value is greater than 100, or property b does not exist. |
Feedback