

// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the exclusive mode to be the consumption mode.subscriptionType(SubscriptionType.Exclusive).subscribe();


// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the shared mode to be the consumption mode.subscriptionType(SubscriptionType.Shared).subscribe();


// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the failover mode to be the consumption mode.subscriptionType(SubscriptionType.Failover).subscribe();


// Construct a producerProducer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false).create();// Set the key when sending messagesMessageId msgId = producer.newMessage()// Message content.value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key will only be distributed to the same consumer..key("youKey1").send();
// Construct a producerProducer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(true).batcherBuilder(BatcherBuilder.KEY_BASED).create();// Set the key when sending messagesMessageId msgId = producer.newMessage()// Message content.value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key will only be distributed to the same consumer..key("youKey1").send();
// Construct a consumer Consumer<byte[]> consumer = pulsarClient.newConsumer() // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page. .topic("persistent://pulsar-xxx/sdk_java/topic1") // You need to create a subscription on the topic details page in the console and enter the subscription name here .subscriptionName("sub_topic1") // Declare the key_shared mode to be the consumption mode .subscriptionType(SubscriptionType.Key_Shared) .subscribe();

// Construct a producerProducer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false) // Disable the batch feature.create();// Set the key when sending messagesMessageId msgId = producer.newMessage()// Message content.value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key will be sent to the same partition..key("youKey1").send();
// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the failover mode to be the consumption mode.subscriptionType(SubscriptionType.Failover).subscribe();
keySharedPolicy when creating the consumer instance.// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("sub_topic1")// Declare the key_shared mode to be the consumption mode.subscriptionType(SubscriptionType.Key_Shared)// Set to require sequence.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false)).subscribe();
Feedback