

// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub_topic1")// Declare the consumption mode to be the Exclusive mode..subscriptionType(SubscriptionType.Exclusive).subscribe();


// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub_topic1")// Declare the consumption mode to be the Shared mode..subscriptionType(SubscriptionType.Shared).subscribe();


// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub_topic1")// Declare the consumption mode to be the Failover mode..subscriptionType(SubscriptionType.Failover).subscribe();


// Construct a producer.Producer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false).create();// Set the key when messages are sent.MessageId msgId = producer.newMessage()// The message content..value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key are distributed only to the same consumer..key("youKey1").send();
// Construct a producer.Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(true).batcherBuilder(BatcherBuilder.KEY_BASED).create();// Set the key when messages are sent.MessageId msgId = producer.newMessage()// The message content..value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key are distributed only to the same consumer..key("youKey1").send();
// Construct a consumer. Consumer<byte[]> consumer = pulsarClient.newConsumer() // Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page. .topic("persistent://pulsar-xxx/sdk_java/topic1") // Create a subscription on the topic details page in the console. Specify the subscription name here. .subscriptionName("sub_topic1") // Declare the consumption mode to be the Key_Shared mode. .subscriptionType(SubscriptionType.Key_Shared) .subscribe();

// Construct a producer.Producer<byte[]> producer pulsarClient.newProducer().topic(topic).enableBatching(false) // Disable the batch feature..create();// Set the key when messages are sent.MessageId msgId = producer.newMessage()// The message content..value(value.getBytes(StandardCharsets.UTF_8))// Set the key here. Messages with the same key are sent to the same partition..key("youKey1").send();
// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub_topic1")// Declare the consumption mode to be the Failover mode..subscriptionType(SubscriptionType.Failover).subscribe();
// Construct a consumer.Consumer<byte[]> consumer = pulsarClient.newConsumer()// Full topic path, in the format of persistent://cluster (tenant) ID/namespace/topic name, copied from the Topic page..topic("persistent://pulsar-xxx/sdk_java/topic1")// You need to create a subscription on the topic details page in the console. Specify the subscription name here..subscriptionName("sub_topic1")// Declare the consumption mode to be the Key_Shared mode..subscriptionType(SubscriptionType.Key_Shared)// Disallow out-of-order delivery..keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false)).subscribe();
Feedback