<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.0.8</version></dependency>
package com.tencent.cloud.tdmq.pulsar;import org.apache.pulsar.client.api.AuthenticationFactory;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;public class Constant {/*** Service access address on the Cluster page.*/private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";/*** Role token authorized by the namespace to be used on the Role Management page.*/private static final String AUTHENTICATION = "eyJrZXlJZC......";/*** Initialize the TDMQ for Apache Pulsar client.** @return TDMQ for Apache Pulsar client.*/public static PulsarClient initPulsarClient() throws PulsarClientException {// One TDMQ for Apache Pulsar client corresponds to one client connection.// In principle, one process has one client. Avoid repeated creation, which consumes resources.// For use cases about clients and producers/consumers, see the official documentation at https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1.PulsarClient pulsarClient = PulsarClient.builder()// Service access address..serviceUrl(SERVICE_URL)// Authorized role token..authentication(AuthenticationFactory.token(AUTHENTICATION)).build();System.out.println(">> pulsar client created.");return pulsarClient;}}
Parameter | Description |
SERVICE_URL | Address used to access a cluster. You can view and copy the address from the Cluster page in the console. |
AUTHENTICATION | Role token. You can copy the role token from the Role Management page. |
package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.Producer;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import java.nio.charset.StandardCharsets;/*** Send messages synchronously.*/public class SimpleProducer {public static void main(String[] args) throws PulsarClientException, InterruptedException {// Initialize the TDMQ for Apache Pulsar client.PulsarClient pulsarClient = Constant.initPulsarClient();// Construct a producer.Producer<byte[]> producer = pulsarClient.newProducer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name..topic("persistent://pulsar-xxx/sdk_java/topic1").create();System.out.println(">> pulsar producer created.");for (int i = 0; i < 10; i++) {String value = "my-sync-message-" + i;// Send messages.MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();System.out.println("deliver msg " + msgId + ",value:" + value);Thread.sleep(500);}// Disable the producer.producer.close();// Disable the client.pulsarClient.close();}}
persistent://clusterid/namespace/Topic, and the clusterid/namespace/topic part can be copied from the Topic page in the console.package com.tencent.cloud.tdmq.pulsar.simple;import com.tencent.cloud.tdmq.pulsar.Constant;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageId;import org.apache.pulsar.client.api.PulsarClient;import org.apache.pulsar.client.api.PulsarClientException;import org.apache.pulsar.client.api.SubscriptionInitialPosition;import org.apache.pulsar.client.api.SubscriptionType;/*** Consumer.*/public class SimpleConsumer {public static void main(String[] args) throws PulsarClientException {// Initialize the TDMQ for Apache Pulsar client.PulsarClient pulsarClient = Constant.initPulsarClient();// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub1_topic1")// Declare the consumption mode to be the Exclusive mode..subscriptionType(SubscriptionType.Exclusive)// Configure consumption from the earliest time. Otherwise, historical messages may not be consumed..subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");for (int i = 0; i < 10; i++) {// Receive a message corresponding to the current offset.Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);// After the message is received, it must be acknowledged. Otherwise, the offset remains at the current message, resulting in message backlogs.consumer.acknowledge(msg);}// Disable the consumer.consumer.close();// Disable the client.pulsarClient.close();}}
persistent://clusterId/namespace/Topic. The clusterid/namespace/topic part can be copied from the Topic page in the console.

Feedback