




<!-- in your <properties> block --><pulsar.version>2.7.2</pulsar.version><!-- in your <dependencies> block --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>${pulsar.version}</version></dependency>
// One TDMQ for Apache Pulsar client corresponds to one client connection.// In principle, one process has one client. Avoid repeated creation, which consumes resources.// For use cases about clients and producers/consumers, see the official documentation at https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1.PulsarClient client = PulsarClient.builder()//Replace it with the cluster access address on the Cluster page..serviceUrl("http://pulsar-..tencenttdmq.com:8080")//Replace it with the role token on the Role Management page..authentication(AuthenticationFactory.token("eyJr")).build();System.out.println(">> pulsar client created.");

Consumer<byte[]> consumer = client.newConsumer()//Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-****/namespace/topicName")//Create a subscription on the topic details page in the console and specify the subscription name here..subscriptionName("subscriptionName")//Declare the consumption mode to be the Exclusive mode..subscriptionType(SubscriptionType.Exclusive)//Configure consumption from the earliest time. Otherwise, historical messages may not be consumed..subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");

Producer<byte[]> producer = client.newProducer()//Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name..topic("persistent://pulsar-****/namespace/topicName").create();System.out.println(">> pulsar producer created.");
for (int i = 0; i < 5; i++) {String value = "my-sync-message-" + i;//Send messages.MessageId msgId = producer.newMessage().value(value.getBytes()).send();System.out.println("deliver msg " + msgId + ",value:" + value);}//Disable the producer.producer.close();
for (int i = 0; i < 5; i++) {//Receive a message corresponding to the current offset.Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);//After the message is received, it must be acknowledged. Otherwise, the offset remains at the current message, and other messages cannot be consumed.consumer.acknowledge(msg);}
pom.xml is located, run the mvn clean package command. Alternatively, use the integrated development environment (IDE) built-in feature to package the entire project and generate an executable JAR file in the target directory.

tdmq-demo-cloud-1.0.0.jar command to run the demo and view the running logs.



Feedback