This document describes how to use open-source SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.
pom.xml
file. This document uses a Maven project as an example.<dependency>
<groupid>org.apache.pulsar</groupid>
<artifactid>pulsar-client</artifactid>
<version>2.7.2</version>
</dependency>
Note:
- SDK 2.7.2 or later is recommended.
- SDK 2.7.4 or later is recommended if you use the batch message sending and receiving feature (
BatchReceive
) of the client.
Create a Pulsar client.
PulsarClient pulsarClient = PulsarClient.builder()
// Service access address
.serviceUrl(SERVICE_URL)
// Role token
.authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
Parameter | Description |
---|---|
SERVICE_URL | Cluster access address, which can be viewed and copied on the Cluster page in the console.![]() |
AUTHENTICATION | Role token, which can be copied in the **Token** column on the Role Management page.![]() |
Create a producer.
// Create a producer of the `byte[]` type
Producer<byte[]> producer = pulsarClient.newProducer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
.topic("persistent://pulsar-xxx/sdk_java/topic1").create();
Note:You need to enter the complete path of the topic name, i.e.,
persistent://clusterid/namespace/Topic
, where theclusterid/namespace/topic
part can be copied directly from the Topic page in the console.
Send the message.
// Send the message
MessageId msgId = producer.newMessage()
// Message content
.value("this is a new message.".getBytes(StandardCharsets.UTF_8))
// Business key
.key("youKey")
// Business parameter
.property("mykey", "myvalue").send();
Release the resources.
// Disable the producer
producer.close();
// Disable the client
pulsarClient.close();
Create a consumer.
// Create a consumer of the `byte[]` type (default type)
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the exclusive mode as the consumption mode
.subscriptionType(SubscriptionType.Exclusive)
// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// Subscription
.subscribe();
Note:
- You need to enter the complete path of the topic name, i.e.,
persistent://clusterid/namespace/Topic
, where theclusterid/namespace/topic
part can be copied directly from the Topic page in the console.- You need to enter the subscription name in the
subscriptionName
parameter, which can be viewed on the Consumption Management page.
Consume the message.
// Receive a message corresponding to the current offset
Message<byte[]> msg = consumer.receive();
MessageId msgId = msg.getMessageId();
String value = new String(msg.getValue());
System.out.println("receive msg " + msgId + ",value:" + value);
// Messages must be acknowledged after being received; otherwise, the offset will be held in the position of the current message, causing message heap.
consumer.acknowledge(msg);
Use the listener for consumption.
// Message listener
MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
// Return `ack` as the acknowledgement
consumer.acknowledge(msg);
} catch (Exception e){
// Return `nack` if the consumption fails
consumer.negativeAcknowledge(msg);
}
};
pulsarClient.newConsumer()
// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
.topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")
// You need to create a subscription on the topic details page in the console and enter the subscription name here
.subscriptionName("sub_topic1")
// Declare the exclusive mode as the consumption mode
.subscriptionType(SubscriptionType.Exclusive)
// Set the listener
.messageListener(myMessageListener)
// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Log in to the TDMQ for Pulsar console, click Topic > Topic Name to enter the Consumption Management page, and click the triangle below a subscription name to view the production and consumption records.
Note:The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar Java client.
Was this page helpful?