tencent cloud

文档反馈

消息过滤

最后更新时间:2024-01-17 16:53:14
    本文主要介绍 TDMQ RocketMQ 版中消息过滤的功能、应用场景和使用方式。

    功能介绍

    消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
    消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。

    应用场景

    通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的流水,可在客户端进行过滤,但这种过滤方式会带来带宽的资源浪费。
    针对上述场景,TDMQ 提供 Broker 端过滤的方式,用户可在生产消息时设置一个或者多个 Tag 标签,消费时指定 Tag 订阅。
    
    
    

    使用方式

    TAG 过滤

    发送消息

    说明:
    发送消息时,每条消息必须指明 Tag。
    String tag = "yourMessageTagA";
    final Message message = provider.newMessageBuilder()
    // Set topic for the current message.
    .setTopic(topic)
    // Message secondary classifier of message besides topic.
    .setTag(tag)
    // Key(s) of the message, another way to mark message besides message id.
    .setKeys("yourMessageKey-1c151062f96e")
    .setBody(body)
    .build();

    订阅消息

    订阅所有 Tag:消费者如需订阅某 Topic 下所有类型的消息,Tag 用星号(*)表示。
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "*";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    订阅单个 Tag:消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag。
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "TAGA";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    订阅多个 Tag:消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用两个竖线||分隔。
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String tag = "TAGA || TAGB";
    FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();

    SQL 过滤

    发送消息

    发送代码和简单的消息没有区别。主要是在构造消息体的时候,带上自定义属性,允许多个。
    final Message message = provider.newMessageBuilder()
    // Set topic for the current message.
    .setTopic(topic)
    // Message secondary classifier of message besides topic.
    // Key(s) of the message, another way to mark message besides message id.
    .setKeys("yourMessageKey-1c151062f96e")
    .setBody(body)
    //一些用于sql过滤的信息
    .addProperty("key1", "value1")
    .build();

    订阅消息

    对于消费消息,订阅时需带上相应的 SQL 表达式,其余与普通的消费消息流程无区别。
    String consumerGroup = "yourConsumerGroup";
    String topic = "yourTopic";
    String sql = "key1 IS NOT NULL AND key1='value1'";
    //sql表达式
    FilterExpression filterExpression = new FilterExpression(sql, FilterExpressionType.SQL92);
    //如果是订阅所有
    //FilterExpression filterExpression = FilterExpression.SUB_ALL;
    // In most case, you don't need to create too many consumers, singleton pattern is recommended.
    PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // Set the consumer group name.
    .setConsumerGroup(consumerGroup)
    // Set the subscription for the consumer.
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
    // Handle the received message and return consume result.
    log.info("Consume message={}", messageView);
    return ConsumeResult.SUCCESS;
    })
    .build();
    说明:
    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持