tencent cloud

Feedback

Message Tag Filtering

Last updated: 2022-05-23 18:40:10

    This document describes the feature, use cases, and usage of message tag filtering in TDMQ for Pulsar.

    Overview

    A message tag is used to categorize messages under a topic. When a producer in TDMQ for Pulsar sends messages with specified tags, the consumer needs to subscribe to those messages by tag.

    If a consumer configures no tags when subscribing to a topic, all messages in the topic will be delivered to the consumer for consumption.

    Use Cases

    Generally, messages with the same business attributes are stored in the same topic; for example, when an order transaction topic contains messages of order placement transactions, payment transactions, and delivery transactions, and if you want to consume only one type of transaction messages in your business, you can filter them on the client, but this will waste bandwidth resources.

    To solve this problem, TDMQ for Pulsar supports filtering on the broker. You can set one or more tags during message production and subscribe to specified tags during consumption.

    img

    Limits

    Tagged messages are passed in through Properties and can be obtained as follows:

    <dependency>
    <groupid>org.apache.pulsar</groupid>
    <artifactid>pulsar-client</artifactid>
    <version>2.x.x</version> <!-- The specific version is to be added -->
    </dependency>

    Use limits of tagged message

    • Tagged messages don't support batch operations. The batch operation feature is enabled by default. To use tagged messages, you need to disable it in the producer as follows:
      // Construct a producer
      Producer<byte[]> producer = pulsarClient.newProducer()
      // Disable batch operation
      .enableBatching(false)
      // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
      .topic("persistent://pulsar-xxx/sdk_java/topic2").create();
    • Tagged message filtering takes effect for only messages with tags. Messages without tags won't be filtered; that is, they will be pushed to all subscribers.
    • To enable tagged message, when sending messages, set the Properties field in ProducerMessage and set the SubscriptionProperties field in ConsumerOptions when creating consumers.
    • When you set the Properties field in ProducerMessage, the key is the tag name, and the value is fixed to TAGS.
    • When you set the SubscriptionProperties field in ConsumerOptions, the key is the tag name to be subscribed to, and the value is the tag version (which is reserved for feature extension in the future and has no meaning currently). You can configure as follows:

    Specify one tag

    展开&收起

    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    // Subscription parameters, which can be used to set subscription tags
    HashMap<string, string=""> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of persistent://cluster (tenant) ID/namespace/topic name, which can be copied from Topic Management
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

    Specify multiple tags

    展开&收起

    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();

    // Subscription parameters, which can be used to set subscription tags
    HashMap<string, string=""> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of persistent://cluster (tenant) ID/namespace/topic name, which can be copied from Topic Management
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

    Mix tags and properties

    展开&收起

    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .property("xxx", "yyy")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();

    // Subscription parameters, which can be used to set subscription tags
    HashMap<string, string=""> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of persistent://cluster (tenant) ID/namespace/topic name, which can be copied from Topic Management
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

    Note:

    Once the SubscriptionProperties field is set in the consumer, the tags processed by the subscription cannot be modified. To modify tags, unsubscribe the current subscription first and create a new subscription.

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support