tencent cloud

Feedback

SDK for Java

Last updated: 2022-06-29 11:39:32

    Scenarios

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Java as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    Directions

    1. Introduce dependencies in a Java project and add the following dependencies to the pom.xml file. This document uses a Maven project as an example.
      <dependency>
      <groupid>org.apache.pulsar</groupid>
      <artifactid>pulsar-client</artifactid>
      <version>2.7.2</version>
      </dependency>
    Note:

    • SDK 2.7.2 or later is recommended.
    • SDK 2.7.4 or later is recommended if you use the batch message sending and receiving feature (BatchReceive) of the client.
    1. Create a Pulsar client.

      PulsarClient pulsarClient = PulsarClient.builder()
      // Service access address
      .serviceUrl(SERVICE_URL)
      // Role token
      .authentication(AuthenticationFactory.token(AUTHENTICATION)).build();
      Parameter Description
      SERVICE_URL Cluster access address, which can be viewed and copied on the Cluster page in the console.
      AUTHENTICATION Role token, which can be copied in the **Token** column on the Role Management page.
    2. Create a producer.

      // Create a producer of the `byte[]` type
      Producer<byte[]> producer = pulsarClient.newProducer()
      // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
      .topic("persistent://pulsar-xxx/sdk_java/topic1").create();
    Note:

    You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic page in the console.

    1. Send the message.

      // Send the message
      MessageId msgId = producer.newMessage()
      // Message content
      .value("this is a new message.".getBytes(StandardCharsets.UTF_8))
      // Business key
      .key("youKey")
      // Business parameter
      .property("mykey", "myvalue").send();
    2. Release the resources.

      // Disable the producer
      producer.close();
      // Disable the client
      pulsarClient.close();
    3. Create a consumer.

      // Create a consumer of the `byte[]` type (default type)
      Consumer<byte[]> consumer = pulsarClient.newConsumer()
      // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
      .topic("persistent://pulsar-xxx/sdk_java/topic1")
      // You need to create a subscription on the topic details page in the console and enter the subscription name here
      .subscriptionName("sub_topic1")
      // Declare the exclusive mode as the consumption mode
      .subscriptionType(SubscriptionType.Exclusive)
      // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
      .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
      // Subscription
      .subscribe();
    Note:

    • You need to enter the complete path of the topic name, i.e., persistent://clusterid/namespace/Topic, where the clusterid/namespace/topic part can be copied directly from the Topic page in the console.
    • You need to enter the subscription name in the subscriptionName parameter, which can be viewed on the Consumption Management page.
    1. Consume the message.

      // Receive a message corresponding to the current offset
      Message<byte[]> msg = consumer.receive();
      MessageId msgId = msg.getMessageId();
      String value = new String(msg.getValue());
      System.out.println("receive msg " + msgId + ",value:" + value);
      // Messages must be acknowledged after being received; otherwise, the offset will be held in the position of the current message, causing message heap.
      consumer.acknowledge(msg);
    2. Use the listener for consumption.

      // Message listener
      MessageListener<byte[]> myMessageListener = (consumer, msg) -> {
      try {
      System.out.println("Message received: " + new String(msg.getData()));
      // Return `ack` as the acknowledgement
      consumer.acknowledge(msg);
      } catch (Exception e){
      // Return `nack` if the consumption fails
      consumer.negativeAcknowledge(msg);
      }
      };
      pulsarClient.newConsumer()
      // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
      .topic("persistent://pulsar-mmqwr5xx9n7g/sdk_java/topic1")
      // You need to create a subscription on the topic details page in the console and enter the subscription name here
      .subscriptionName("sub_topic1")
      // Declare the exclusive mode as the consumption mode
      .subscriptionType(SubscriptionType.Exclusive)
      // Set the listener
      .messageListener(myMessageListener)
      // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
      .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
      .subscribe();
    3. Log in to the TDMQ for Pulsar console, click Topic > Topic Name to enter the Consumption Management page, and click the triangle below a subscription name to view the production and consumption records.
      img

    Note:

    The above is a brief introduction to the way of publishing and subscribing to messages. For more operations, see Demo or Pulsar Java client.

    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support