Release Notes
Cluster Version Updates
Product Announcements
//Obtain the serviceURL access address, token, full topic name, and subscription name from the configuration file (all can be copied from the console).@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;//Declare one client object and one producer object.private PulsarClient pulsarClient;private Producer<String> producer;//Create the client and producer objects in the initialization program.public void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();}
producer to send messages.//Directly reference the producer in the business logic of actual message production. Ensure that the schema type specified by the producer through the paradigm matches the input object.public void onProduce(Producer<String> producer){//Add business logic.String msg = "my-message";//Simulate the actual business logic to obtain messages.try {//TDMQ for Apache Pulsar enables schema verification by default. The message object should match the schema type declared by the producer.MessageId messageId = producer.newMessage().key("msgKey").value(msg).send();System.out.println("delivered msg " + msgId + ", value:" + value);} catch (PulsarClientException e) {System.out.println("delivered msg failed, value:" + value);e.printStackTrace();}}public void onProduceAsync(Producer<String> producer){//Add business logic.String msg = "my-asnyc-message";//Simulate the actual business logic to obtain messages.//Send messages asynchronously without thread blocking. This increases the sending rate.CompletableFuture<MessageId> messageIdFuture = producer.newMessage().key("msgKey").value(msg).sendAsync();//Use the asynchronous callback to check whether the message is sent successfully.messageIdFuture.whenComplete(((messageId, throwable) -> {if( null != throwable ) {System.out.println("delivery failed, value: " + msg );//Add the delayed retry logic here.} else {System.out.println("delivered msg " + messageId + ", value:" + msg);}}));}
close method to disable it to prevent it from occupying resources. Similarly, when a client instance remains idle for a long time, call the close method to disable it to prevent the connection pool from being exhausted.public void destroy(){if (producer != null) {producer.close();}if (pulsarClient != null) {pulsarClient.close();}}
producers in a client to write data simultaneously, see the following code.//Obtain the serviceURL access address, token, and full topic name from the configuration file (all can be copied from the console).@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic1}")private String topic1;@Value("${tdmq.topic2}")private String topic2;@Value("${tdmq.topic3}")private String topic3;//Declare one client object and multiple producer objects.private PulsarClient pulsarClient;private Producer<String> producer1;private Producer<String> producer2;private Producer<String> producer3;private Producer<String> producer4;//Create a client and multiple producer objects in the initialization program.public void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();producer1 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();//producer2 subscribes to topic2.producer2 = pulsarClient.newProducer(Schema.STRING).topic(topic2).create();//producer3 subscribes to topic3.producer3 = pulsarClient.newProducer(Schema.STRING).topic(topic3).create();//producer4 can subscribe to the same topic as producer1.producer4 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();}
import org.apache.pulsar.client.api.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;@Servicepublic class ConsumerService implements Runnable {//Obtain the serviceURL access address, token, full topic name, and subscription name from the configuration file (all can be copied from the console).@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;@Value("${tdmq.subscription}")private String subscription;private volatile boolean start = false;private PulsarClient pulsarClient;private Consumer<String> consumer;private static final int corePoolSize = 10;private static final int maximumPoolSize = 10;private ExecutorService executor;private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);@PostConstructpublic void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)//.subscriptionType(SubscriptionType.Shared).subscriptionName(subscription).subscribe();executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());start = true;}@PreDestroypublic void destroy() throws Exception {start = false;if (consumer != null) {consumer.close();}if (pulsarClient != null) {pulsarClient.close();}if (executor != null) {executor.shutdownNow();}}@Overridepublic void run() {logger.info("tdmq consumer started...");for (int i = 0; i < maximumPoolSize; i++) {executor.submit(() -> {while (start) {try {Message<String> message = consumer.receive();if (message == null) {continue;}onConsumer(message);} catch (Exception e) {logger.warn("tdmq consumer business error", e);}}});}logger.info("tdmq consumer stopped...");}/*** Write the consumption business logic here.** @param message* @return true indicates that the message is acknowledged, and false indicates the message is unacknowledged.* @throws An exception is displayed, indicating that the message is unacknowledged.*/private void onConsumer(Message<String> message) {//Business logic, delay operations.try {System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());Thread.sleep(1000);//Simulate business logic processing.consumer.acknowledge(message);logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());} catch (Exception exception) {consumer.negativeAcknowledge(message);logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());}}}
Was this page helpful?
You can also Contact sales or Submit a Ticket for help.
Help us improve! Rate your documentation experience in 5 mins.
Feedback