Shared subscription is a load-balancing consumption method. Since multiple clients in the same shared subscription group distribute and receive subscribed messages in a round-robin manner, it is also referred to as consumer load balancing.
As shown in the figure below, the publisher publishes 4 messages: M1, M2, M3, and M4. Clients Client-0 and Client-1 consume the subscribed messages from the topic through a shared subscription. Unlike the default MQTT Pub/Sub model, Client-0 only consumes M1 and M3, while Client-1 only consumes M2 and M4.
In the standard MQTT protocol, Shared Subscription is a new feature introduced in MQTT 5.0. However, TDMQ for MQTT enhances this feature on the server-side. In addition to MQTT 5.0 clients, it also supports MQTT 3.1 and 3.1.1 clients, which can subscribe directly according to the usage guidelines of Shared Subscription.
Restrictions and Limitations
A single cluster supports a maximum of 20 shared subscription groups. Each group allows up to 10 subscription expressions. The TPS limit for subscription and unsubscription requests per shared subscription group is 10. A single shared subscription group can accommodate up to 1,024 clients. When these limits are reached, client subscription requests (SUBSCRIBE) trigger an error. For Platinum Edition clusters requiring quota adjustments, submit a ticket to contact us. Prerequisites
Using Shared Subscription to Subscribe to Messages
Step 1: Creating a Shared Subscription Group
2. In the left sidebar, choose Resource > Cluster, select a region, and click the ID of the target cluster to go to the cluster basic information page.
3. Select the Shared Subscription Group tab, click Create to create a shared subscription group, and fill in the following fields in the pop-up window as required:
|
Shared subscription group | Set the shared subscription group name according to the following rules: It cannot be empty, must be 1–64 characters long, and can contain only letters and digits. | order_processing_group |
Load balancing policy | ○ Random (default): The load is randomly distributed across subscribing clients. ○ Hash partitioning: To some extent, it maintains message order during load distribution. When selecting this mode, you need to specify the time for load balancing to take effect. That is, after a new consumer client joins, it will be added to the load balancing policy only after the specified delay. (By default, Topic-hash is used, which prioritizes maintaining message order under the same topic in shared subscriptions. If you require ClientID-hash, submit a ticket to contact us.) | Random |
Description | Enter the description for the shared subscription group (optional). | Multi-client load balancing for order processing |
4. Click Submit to complete the creation.
Step 2: Configuring Shared Subscription on the Client
Usage Method
To subscribe to messages using Shared Subscription, configure the subscription topic filter as follows:
$share/{ShareName}/{TopicFilter}
|
$share | Tag specified by the protocol to indicate Shared Subscription, which is a fixed string. |
{ShareName} | Name of a shared subscription group created in the console, which must not contain " / ", " + ", or " # ". |
{TopicFilter} | Topic filter used by clients for normal subscriptions, which follows the same rules and semantics as an MQTT topic filter. |
Sample Code
package org.apache.rocketmq.mqtt.example.quickstart;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SharedSubscriptionQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://127.0.0.1:1883";
String clientId = "shared-sub-0";
try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);
int total = 1;
CountDownLatch latch = new CountDownLatch(total);
client.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload()));
latch.countDown();
}
public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete: " + token.isComplete());
}
});
String topic = "$share/Group0/home/#";
client.subscribe(topic, 1);
TimeUnit.HOURS.sleep(1);
client.disconnect();
}
}
}
Shared Subscription Offline Message Retention Policy
If an active session under ${ShareName} subscribes to ${TopicFilter}, offline messages matching ${TopicFilter} are retained. When the subscriber reconnects, it resumes consuming messages from where it left off.
Session Validity Period
MQTT 3.1 and 3.1.1 define the session lifecycle through clean-session. When clean-session is set to true, the session lifecycle aligns with the transport layer lifecycle. When clean-session is set to false, the session lifecycle is independent of the transport layer lifecycle. To avoid wasting resources, a session is allowed to remain active for a maximum of 3 days after the transport layer disconnects.
According to the MQTT 5.0 protocol, the following equivalent semantics apply:
|
clean-session = true | clean-start = true | session-expiry-interval = 0 |
clean-session = false | clean-start = false | session-expiry-interval = 259200 |
Viewing a Shared Subscription Group
2. In the left sidebar, choose Resource > Cluster, select a region, and click the ID of the target cluster to go to the cluster basic information page.
3. Click the Shared Subscription Management tab to view and manage the list of shared subscription groups (ShareName) in the current cluster.
Note:
For your convenience, the list page automatically displays information about existing shared subscription groups in the current cluster. If a client uses $share/{ShareName}/{topicfilter} for a shared subscription, the ShareName automatically appears in the shared subscription group list, with the default load-balancing policy set to Random.
4. Click the specific subscription group name to view its details, as shown below. The details page shows information about the current shared subscription, the subscription expressions (filters) under the selected shared subscription group (ShareName), and the corresponding clients and their statuses for each filter when expanded.
Note:
When the load balancing mode is set to Hash Partitioning to maintain the message order in shared subscriptions, the online status of clients may be affected by load balancing delays. Even if a client session no longer exists, it is still displayed as "Online" during the load balancing validity period. Once the disconnection time exceeds the load-balancing delay, the status changes to "Offline".
Editing a Shared Subscription
To avoid issues such as message duplication or loss of order during policy transitions, once a shared subscription group is created, its name and load balancing policy cannot be changed. To change the load balancing policy for a shared subscription group, pause client operations, delete the existing group, and create a new shared subscription group with a different load balancing policy.
2. In the left sidebar, choose Resource > Cluster, select a region, and click the ID of the target cluster to go to the cluster basic information page.
3. Click the Shared Subscription Management tab to view and manage the list of shared subscription groups (ShareName) in the current cluster.
4. In the Action column of the shared subscription group list, click Edit to modify the shared subscription group information.
If the load balancing policy is set to "Random", you can only modify the description.
If the load balancing policy is set to "Hash Partitioning", you can modify the description and effective time.
Deleting a Shared Subscription Group
Before deleting a shared subscription group, carefully evaluate its impact on online services. MQTT verifies whether active subscriptions exist in the shared subscription group. To avoid unexpected service interruptions, it is recommended to ensure that all relevant client sessions are disconnected before performing the deletion.
2. In the left sidebar, choose Resource > Cluster, select a region, and click the ID of the target cluster to go to the cluster basic information page.
3. Click the Shared Subscription Management tab to view and manage the list of shared subscription groups (ShareName) in the current cluster.
4. In the Action column of the shared subscription group list, click Delete, and then confirm the deletion in the pop-up window.