新功能发布记录
集群版本更新记录
产品公告
//从配置文件中获取 serviceURL 接入地址、Token 密钥、Topic 全名和 Subscription 名称(均可从控制台复制)@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;//声明1个 Client 对象、producer 对象private PulsarClient pulsarClient;private Producer<String> producer;//在一段初始化程序中创建好客户端和生产者对象public void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();}
Producer 完成消息的发送。//在实际生产消息的业务逻辑中直接引用,注意 Producer 通过范式声明的 Schema 类型要和传入对象匹配public void onProduce(Producer<String> producer){//添加业务逻辑String msg = "my-message";//模拟从业务逻辑拿到消息try {//TDMQ Pulsar 版默认开启 Schema 校验, 消息对象一定需要和 producer 声明的 Schema 类型匹配MessageId messageId = producer.newMessage().key("msgKey").value(msg).send();System.out.println("delivered msg " + msgId + ", value:" + value);} catch (PulsarClientException e) {System.out.println("delivered msg failed, value:" + value);e.printStackTrace();}}public void onProduceAsync(Producer<String> producer){//添加业务逻辑String msg = "my-asnyc-message";//模拟从业务逻辑拿到消息//异步发送消息,无线程阻塞,提升发送速率CompletableFuture<MessageId> messageIdFuture = producer.newMessage().key("msgKey").value(msg).sendAsync();//通过异步回调得知消息发送成功与否messageIdFuture.whenComplete(((messageId, throwable) -> {if( null != throwable ) {System.out.println("delivery failed, value: " + msg );//此处可以添加延时重试的逻辑} else {System.out.println("delivered msg " + messageId + ", value:" + msg);}}));}
close 方法关闭,以避免占用资源;当一个客户端实例长时间不使用时,同样需要调用 close 方法关闭,以避免连接池被占满。public void destroy(){if (producer != null) {producer.close();}if (pulsarClient != null) {pulsarClient.close();}}
client 创建多个 producer 同时写入数据时,可以参考如下代码。//从配置文件中获取 serviceURL 接入地址、Token 密钥和Topic 全名(均可从控制台复制)@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic1}")private String topic1;@Value("${tdmq.topic2}")private String topic2;@Value("${tdmq.topic3}")private String topic3;//声明1个 Client 对象和多个 Producer 对象private PulsarClient pulsarClient;private Producer<String> producer1;private Producer<String> producer2;private Producer<String> producer3;private Producer<String> producer4;//在一段初始化程序中创建好客户端和多个生产者对象public void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();producer1 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();//producer2订阅topic2producer2 = pulsarClient.newProducer(Schema.STRING).topic(topic2).create();//producer3订阅topic3producer3 = pulsarClient.newProducer(Schema.STRING).topic(topic3).create();//producer4可与producer1订阅相同主题producer4 = pulsarClient.newProducer(Schema.STRING).topic(topic1).create();}
import org.apache.pulsar.client.api.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;@Servicepublic class ConsumerService implements Runnable {//从配置文件中获取 serviceURL 接入地址、Token 密钥、Topic 全名和 Subscription 名称(均可从控制台复制)@Value("${tdmq.serviceUrl}")private String serviceUrl;@Value("${tdmq.token}")private String token;@Value("${tdmq.topic}")private String topic;@Value("${tdmq.subscription}")private String subscription;private volatile boolean start = false;private PulsarClient pulsarClient;private Consumer<String> consumer;private static final int corePoolSize = 10;private static final int maximumPoolSize = 10;private ExecutorService executor;private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);@PostConstructpublic void init() throws Exception {pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(AuthenticationFactory.token(token)).build();consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)//.subscriptionType(SubscriptionType.Shared).subscriptionName(subscription).subscribe();executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),new ThreadPoolExecutor.AbortPolicy());start = true;}@PreDestroypublic void destroy() throws Exception {start = false;if (consumer != null) {consumer.close();}if (pulsarClient != null) {pulsarClient.close();}if (executor != null) {executor.shutdownNow();}}@Overridepublic void run() {logger.info("tdmq consumer started...");for (int i = 0; i < maximumPoolSize; i++) {executor.submit(() -> {while (start) {try {Message<String> message = consumer.receive();if (message == null) {continue;}onConsumer(message);} catch (Exception e) {logger.warn("tdmq consumer business error", e);}}});}logger.info("tdmq consumer stopped...");}/*** 这里写消费业务逻辑** @param message* @return return true: 消息ack return false: 消息nack* @throws Exception 消息nack*/private void onConsumer(Message<String> message) {//业务逻辑,延时类操作try {System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());Thread.sleep(1000);//模拟业务逻辑处理consumer.acknowledge(message);logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());} catch (Exception exception) {consumer.negativeAcknowledge(message);logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());}}}
文档反馈