新功能发布记录
集群版本更新记录
产品公告





<!-- in your <properties> block --><pulsar.version>2.7.2</pulsar.version><!-- in your <dependencies> block --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>${pulsar.version}</version></dependency>
// 一个Pulsar client对应一个客户端链接// 原则上一个进程一个client,尽量避免重复创建,消耗资源// 关于客户端和生产消费者的实践教程,可以参考官方文档 https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1PulsarClient client = PulsarClient.builder()//替换成集群接入地址,位于【集群管理】页面接入地址.serviceUrl("http://pulsar-..tencenttdmq.com:8080")//替换成角色密钥,位于【角色管理】页面.authentication(AuthenticationFactory.token("eyJr")).build();System.out.println(">> pulsar client created.");
Consumer<byte[]> consumer = client.newConsumer()//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-****/namespace/topicName")//需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("subscriptionName")//声明消费模式为exclusive(独占)模式.subscriptionType(SubscriptionType.Exclusive)//配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();System.out.println(">> pulsar consumer created.");

Producer<byte[]> producer = client.newProducer()//topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-****/namespace/topicName").create();System.out.println(">> pulsar producer created.");
for (int i = 0; i < 5; i++) {String value = "my-sync-message-" + i;//发送消息MessageId msgId = producer.newMessage().value(value.getBytes()).send();System.out.println("deliver msg " + msgId + ",value:" + value);}//关闭生产者producer.close();
for (int i = 0; i < 5; i++) {//接收当前offset对应的一条消息Message<byte[]> msg = consumer.receive();MessageId msgId = msg.getMessageId();String value = new String(msg.getValue());System.out.println("receive msg " + msgId + ",value:" + value);//接收到之后必须要ack,否则offset会一直停留在当前消息,无法继续消费consumer.acknowledge(msg);}
pom.xml 所在目录执行命令 mvn clean package,或者通过 IDE 自带的功能打包整个工程,在 target 目录下生成一个可运行的 jar 文件。

tdmq-demo-cloud-1.0.0.jar,运行 Demo,可查看运行日志。



文档反馈