tencent cloud

消息队列 Pulsar 版

动态与公告
新功能发布记录
集群版本更新记录
产品公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 Pulsar 版
产品优势
应用场景
技术原理
产品系列
开源 Pulsar 版本支持说明
与开源 Pulsar 对比
高可用
配额与限制
基础概念
产品计费
计费概述
价格说明
计费示例
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
使用 SDK 收发普通消息
使用 SDK 收发高级特性消息
用户指南
使用流程指引
配置账号权限
新建集群
配置命名空间
配置 Topic
连接集群
管理集群
查询消息及轨迹
跨地域复制
查看监控和配置告警
实践教程
客户端使用实践
异常消费者隔离
限流机制说明
交易对账
消息幂等性
消息压缩
迁移指南
单写多读集群迁移方案
虚拟集群平滑迁移至专业集群
API 参考
API 概览
SDK 参考
SDK 概述
SDK 配置参数推荐
TCP 协议(Pulsar 社区版)
安全与合规
权限管理
删除保护
云 API 审计
常见问题
监控相关
客户端相关
服务协议
服务等级协议
TDMQ 政策
联系我们
词汇表

Java SDK

PDF
聚焦模式
字号
最后更新时间: 2025-12-24 15:26:38

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:安装 Java 依赖库

Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.0.8</version>
</dependency>
说明:
建议使用 3.0.8 及以上版本。
如果在客户端中使用批量收发消息功能(BatchReceive),则应使用 3.0.8 及以上版本的 SDK。

步骤2:修改配置参数

修改 Constant.java 参数。
package com.tencent.cloud.tdmq.pulsar;

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

public class Constant {
/**
* 服务接入地址,位于【集群管理】页面接入地址
*/
private static final String SERVICE_URL = "http://pulsar-xxx.tdmq-pulsar.ap-sh.public.tencenttdmq.com:8080";

/**
* 要使用的命名空间授权的角密钥,位于【角色管理】页面
*/
private static final String AUTHENTICATION = "eyJrZXlJZC......";


/**
* 初始化pulsar客户端
*
* @return pulsar客户端
*/
public static PulsarClient initPulsarClient() throws PulsarClientException {
// 一个Pulsar client对应一个客户端链接
// 原则上一个进程一个client,尽量避免重复创建,消耗资源
// 关于客户端和生产消费者的实践教程,可以参考官方文档 https://www.tencentcloud.com/document/product/1179/58090?from_cn_redirect=1
PulsarClient pulsarClient = PulsarClient.builder()
// 服务接入地址
.serviceUrl(SERVICE_URL)
// 授权角色密钥
.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
System.out.println(">> pulsar client created.");
return pulsarClient;
}
}

参数
说明
SERVICE_URL
集群接入地址,可以在控制台集群管理页面查看并复制。

AUTHENTICATION
角色的密钥,角色密钥可以在角色管理中复制。


步骤3:生产消息

创建并编译运行 SimpleProducer.java。
package com.tencent.cloud.tdmq.pulsar.simple;

import com.tencent.cloud.tdmq.pulsar.Constant;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;

import java.nio.charset.StandardCharsets;

/**
* 同步发送消息
*/
public class SimpleProducer {

public static void main(String[] args) throws PulsarClientException, InterruptedException {

// 初始化pulsar客户端
PulsarClient pulsarClient = Constant.initPulsarClient();
// 构建生产者
Producer<byte[]> producer = pulsarClient.newProducer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
.topic("persistent://pulsar-xxx/sdk_java/topic1").create();
System.out.println(">> pulsar producer created.");
for (int i = 0; i < 10; i++) {
String value = "my-sync-message-" + i;
// 发送消息
MessageId msgId = producer.newMessage().key("key" + i).value(value.getBytes(StandardCharsets.UTF_8)).send();
System.out.println("deliver msg " + msgId + ",value:" + value);

Thread.sleep(500);
}
// 关闭生产者
producer.close();
// 关闭客户端
pulsarClient.close();
}
}

.topic:填写创建好的 topic 名称,需要填入完整路径,即 persistent://clusterid/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上 Topic 管理页面直接复制。


步骤4:消费消息

创建并编译运行 SimpleConsumer.java。
package com.tencent.cloud.tdmq.pulsar.simple;

import com.tencent.cloud.tdmq.pulsar.Constant;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;

/**
* 消费者
*/
public class SimpleConsumer {


public static void main(String[] args) throws PulsarClientException {
// 初始化pulsar客户端
PulsarClient pulsarClient = Constant.initPulsarClient();
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub1_topic1")
// 声明消费模式为exclusive(独占)模式
.subscriptionType(SubscriptionType.Exclusive)
// 配置从最早开始消费,否则可能会消费不到历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
System.out.println(">> pulsar consumer created.");
for (int i = 0; i < 10; i++) {
// 接收当前offset对应的一条消息
Message<byte[]> msg = consumer.receive();
MessageId msgId = msg.getMessageId();
String value = new String(msg.getValue());
System.out.println("receive msg " + msgId + ",value:" + value);
// 接收到之后必须要ack,否则offset会一直停留在当前消息,导致消息积压
consumer.acknowledge(msg);
}
// 关闭消费者
consumer.close();
// 关闭客户端
pulsarClient.close();
}
}

.topic:Topic 名称需要填入完整路径,即persistent://clusterId/namespace/Topicclusterid/namespace/topic 的部分可以从控制台上 Topic 管理 页面直接复制。



.subscriptionName:需要写入订阅名,可在消费者界面查看。

步骤5:查看消费情况

进入 消息查询 页面,可查看消息详情。
说明:
消息轨迹的查询只支持单条消息,如果用户在 Producer 侧开启了 Batch 功能,则在消息查询中,同一个 Batch 的消息只可以查询到 Batch 中的第一条消息。

消息轨迹如下:



说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoPulsar 官方文档

SDK 版本相关

社区 issue 及优化点

Java SDK 2.7.2 及以下版本,强烈建议升级到 3.0.8 版本及更高版本,较高版本客户端已修复下列严重问题:
1. 修复 broker 重启导致 Consumer 消费暴涨问题(相关参考文档)。
2. 修复消费者因批量消息确认中的竞争条件而丢失消息确认问题(相关参考文档)。
3. 修复由于 IO 线程竞争条件导致生产者/消费者停止重新连接或发布/订阅的问题(相关参考文档)。
4. 修复重试队列和死信队列创建命名不规范问题(相关参考文档)。
5. 修复未 unack 后批量消息全部重推问题(相关参考文档)。
完整的社区 issue 参见:相关参考文档

低版本风险隐患

Java SDK 2.7.x 及以下(包含 2.7.x)版本,对于极端场景异常处理覆盖不够全面,当 broker 升级重启或网络故障等场景下,有极小概率客户端和服务端重连过程出现异常,导致发送超时或者停止消费等问题。
这里强烈建议您先将客户端升级到 3.0.8 新版本后,再进行 broker 集群版本的更新。
除此之外,由于 2.7.x 和 2.9.x 以上版本死信队列和重试队列默认格式有变化,最好在业务代码中指定老的重试队列和死信队列的名称从而兼容,否则会出现无法消费到老重试队列/死信队列消息的情况,具体信息可参考 消息重试与死信机制

低版本隐患处理手段

新版本在 broker 升级过程中能够正常重连,基本做到业务无感知。但如果您的客户端 SDK 确实无法升级到新版本,建议您在 broker 集群升级后,关注客户端的日志输出及控制台的生产消费相关指标。
如果出现生产消费卡住的情况,请及时重启客户端,通常客户端在重启后即可恢复正常,但可能会出现少量重复生产/消费消息的情况。如果重启客户端后还是无法改善,请及时 提交工单 进一步定位处理。

帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈