tencent cloud

TDMQ for Apache Pulsar

Release Notes and Announcements
Release Notes
Cluster Version Updates
Product Announcements
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for Apache Pulsar
Strengths
Scenarios
How It Works
Product Series
Version Support Instructions for Open-Source Apache Pulsar
Comparison with Open-Source Apache Pulsar
High Availability
Quotas and Limits
Basic Concepts
Billing
Billing Overview
Pricing
Billing Examples
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Getting Started Guide
Preparations
Using the SDK to Send and Receive General Messages
Using the SDK to Send and Receive Advanced Feature Messages
User Guide
Usage Process Guide
Configuring the Account Permission
Creating a Cluster
Configuring the Namespace
Configuring the Topic
Connecting to a Cluster
Managing the Cluster
Querying Messages and Traces
Cross-Region Replication
Viewing Monitoring Data and Configuring Alarm Rules
Use Cases
Client Usage
Abnormal Consumer Isolation
Traffic Throttling Mechanisms
Transaction Reconciliation
Message Idempotence
Message Compression
Migration Guide
Single-Write Multiple-Read Cluster Migration Solutions
Hitless Migration from Virtual Cluster to Pro Cluster
SDK Reference
API Overview
SDK Reference
SDK Overview
Recommended SDK Configuration Parameters
TCP Protocol (Apache Pulsar)
Security and Compliance
Permission Management
Deletion Protection
CloudAudit
FAQs
Monitoring
Clients
Agreements
Service Level Agreement
TDMQ Policy
Contact Us
Glossary
DocumentationTDMQ for Apache PulsarUse CasesClient UsageClients, Connections, and Producers/Consumers

Clients, Connections, and Producers/Consumers

PDF
Focus Mode
Font Size
Last updated: 2025-12-24 15:03:03
This document mainly introduces the relationship between TDMQ for Apache Pulsar clients and connections and between clients and producers/consumers. It also describes how to use the clients appropriately so that developers can use the TDMQ for Apache Pulsar service efficiently and stably.
Core principles:
One PulsarClient is provided for each process.
Producers and consumers are thread-safe. For the same topic, they can be reused, and it is recommended that they be reused.

Relationship Between Clients and Connections

The TDMQ for Apache Pulsar client (PulsarClient) is a fundamental unit for applications to connect to TDMQ for Apache Pulsar. One PulsarClient corresponds to one TCP connection. Generally, one application or process on the user side corresponds to one PulsarClient. The number of clients equals the number of application nodes. For application nodes that do not use TDMQ for Apache Pulsar services for a long time, clients should be reclaimed to save resources. (TDMQ for Apache Pulsar supports a maximum of 200 client connections for each topic.)
Note:
If a large number of topics exist on the business side and multiple clients need to be created, you can reuse client objects in the following manner:
1. Reuse the same client object for multiple producers or consumers of the same topic.
2. If the number of clients is still insufficient, try to reuse the same client object for multiple topics.
3. This limit applies only to shared clusters. For pro clusters, the default number of client connections per topic is still 200. However, this limit can be adjusted based on actual user requirements.

Relationship Between Clients and Producers/Consumers

Multiple producers and consumers can be created under one client to increase the production and consumption speeds. Generally, multithreading is used to create multiple producer or consumer objects under one client for production and consumption, and data of different producers and consumers is isolated.
The relationship between producers and topics is many-to-one, which indicates that:
A producer can send messages only to one topic.
Multiple producers can send messages to the same topic simultaneously.
The limits on producers/consumers in TDMQ for Apache Pulsar are as follows:
The maximum number of producers per topic is 1,000.
The maximum number of consumers per topic is 2,000.

Use Cases

The number of producers/consumers does not necessarily depend on the business object. They are reusable resources and are uniquely identified by name.

Producers

If 1,000 business objects produce messages simultaneously, it is not necessary to create 1,000 producers. If the messages are delivered to the same topic, each application node can initially use one producer (singleton pattern) for production. Generally, one producer can fully utilize the hardware configuration of an application node.
The following sample code shows Java message production.
//Obtain the serviceURL access address, token, full topic name, and subscription name from the configuration file (all can be copied from the console).
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic}")
private String topic;

//Declare one client object and one producer object.
private PulsarClient pulsarClient;
private Producer<String> producer;

//Create the client and producer objects in the initialization program.
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
}
In the business logic of actual message production, directly reference the producer to send messages.
//Directly reference the producer in the business logic of actual message production. Ensure that the schema type specified by the producer through the paradigm matches the input object.
public void onProduce(Producer<String> producer){
//Add business logic.
String msg = "my-message";//Simulate the actual business logic to obtain messages.
try {
//TDMQ for Apache Pulsar enables schema verification by default. The message object should match the schema type declared by the producer.
MessageId messageId = producer.newMessage()
.key("msgKey")
.value(msg)
.send();
System.out.println("delivered msg " + msgId + ", value:" + value);
} catch (PulsarClientException e) {
System.out.println("delivered msg failed, value:" + value);
e.printStackTrace();
}
}

public void onProduceAsync(Producer<String> producer){
//Add business logic.
String msg = "my-asnyc-message";//Simulate the actual business logic to obtain messages.
//Send messages asynchronously without thread blocking. This increases the sending rate.
CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
.key("msgKey")
.value(msg)
.sendAsync();
//Use the asynchronous callback to check whether the message is sent successfully.
messageIdFuture.whenComplete(((messageId, throwable) -> {
if( null != throwable ) {
System.out.println("delivery failed, value: " + msg );
//Add the delayed retry logic here.
} else {
System.out.println("delivered msg " + messageId + ", value:" + msg);
}
}));
}
When a producer is not used for a long time, call the close method to disable it to prevent it from occupying resources. Similarly, when a client instance remains idle for a long time, call the close method to disable it to prevent the connection pool from being exhausted.
public void destroy(){
if (producer != null) {
producer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
}
When you need to create multiple producers in a client to write data simultaneously, see the following code.
//Obtain the serviceURL access address, token, and full topic name from the configuration file (all can be copied from the console).
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic1}")
private String topic1;
@Value("${tdmq.topic2}")
private String topic2;
@Value("${tdmq.topic3}")
private String topic3;
//Declare one client object and multiple producer objects.
private PulsarClient pulsarClient;
private Producer<String> producer1;
private Producer<String> producer2;
private Producer<String> producer3;
private Producer<String> producer4;
//Create a client and multiple producer objects in the initialization program.
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic1)
.create();
//producer2 subscribes to topic2.
producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic2)
.create();
//producer3 subscribes to topic3.
producer3 = pulsarClient.newProducer(Schema.STRING)
.topic(topic3)
.create();

//producer4 can subscribe to the same topic as producer1.
producer4 = pulsarClient.newProducer(Schema.STRING)
.topic(topic1)
.create();
}

Consumers

It is recommended that consumers be used in a singleton pattern like producers. A single consumer node only needs one client instance and one consumer instance. Generally, the performance bottleneck of the consumer of a message queue lies in the process of processing messages by consumers based on their business logic, rather than receiving messages. Therefore, when poor consumption performance occurs, check the network bandwidth consumption of the consumer. If it does not reach an obvious upper limit, analyze the time for processing messages based on logs and message trace information.
Note
When the Shared or Key-Shared mode is used, the number of consumers is not necessarily less than or equal to the number of partitions. The server has a module responsible for distributing messages to all consumers in a specific way (round-robin scheduling in Shared mode and round-robin scheduling within a key in Key-Shared mode by default).
When the Shared mode is used, and the production side suspends production, the final portion of messages may not be evenly distributed to consumers.
When multi-threaded consumption is used, the message order cannot be guaranteed even if the same consumer object is reused.
The following Java sample code shows how to use a thread pool for multi-threaded consumption based on the SpringBoot framework.
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Service
public class ConsumerService implements Runnable {

//Obtain the serviceURL access address, token, full topic name, and subscription name from the configuration file (all can be copied from the console).
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic}")
private String topic;
@Value("${tdmq.subscription}")
private String subscription;

private volatile boolean start = false;
private PulsarClient pulsarClient;
private Consumer<String> consumer;
private static final int corePoolSize = 10;
private static final int maximumPoolSize = 10;

private ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);

@PostConstruct
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
//.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subscription)
.subscribe();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy());
start = true;
}

@PreDestroy
public void destroy() throws Exception {
start = false;
if (consumer != null) {
consumer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
if (executor != null) {
executor.shutdownNow();
}
}

@Override
public void run() {
logger.info("tdmq consumer started...");
for (int i = 0; i < maximumPoolSize; i++) {
executor.submit(() -> {
while (start) {
try {
Message<String> message = consumer.receive();
if (message == null) {
continue;
}
onConsumer(message);
} catch (Exception e) {
logger.warn("tdmq consumer business error", e);
}
}
});
}
logger.info("tdmq consumer stopped...");
}

/**
* Write the consumption business logic here.
*
* @param message
* @return true indicates that the message is acknowledged, and false indicates the message is unacknowledged.
* @throws An exception is displayed, indicating that the message is unacknowledged.
*/
private void onConsumer(Message<String> message) {
//Business logic, delay operations.
try {
System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());
Thread.sleep(1000);//Simulate business logic processing.
consumer.acknowledge(message);
logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());
} catch (Exception exception) {
consumer.negativeAcknowledge(message);
logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());
}
}
}


Help and Support

Was this page helpful?

Help us improve! Rate your documentation experience in 5 mins.

Feedback