In Pulsar, the message data in each partitioned topic is stored in the form of ledger on bookie storage nodes in the BookKeeper cluster. Each ledger contains a set of entries, and bookies write, find, and get data only by entry.
In case of batch message production, one entry may contain multiple messages, so the entries do not necessarily correspond to messages one by one.
Ledgers and entries correspond to different metadata respectively.
|Type||Parameter||Description||Data Storage Location|
|Ledger||ensemble size (E)||Number of bookie nodes selected by each ledger||Metadata is stored in ZooKeeper|
|write quorum size (Qw)||Number of bookies to which each entry needs to send write requests|
|ack quorum size (Qa)||Number of write acks that should be received before a write can be considered successful|
|Ensembles (E)||The ensemble list used in the format of &dxlt;entry id,="" ensembles=""&dxgt; tuple
|Entry||Ledger ID||ID of the ledger of the entry||Data is stored on bookie nodes|
|Entry ID||ID of the current entry|
|Last Add Confirmed||Entry ID of the known latest write ack when the current entry is created|
During the creation of each ledger, E bookie nodes will be selected from the list of existing candidate bookie nodes in writable status in the BookKeeper cluster. If there are not enough candidate nodes, the
BKNotEnoughBookiesExceptio exception will be thrown. After the candidate nodes are selected, this information will be combined to form the &dxlt;entry id, ensembles&dxgt; tuple and stored in ensembles of the ledger's metadata.
Message write process
When the client writes a message, each entry will send a write request to Qw bookie nodes in the ensemble list currently used by the ledger. After Qa write acks are received, the current message will be considered written and stored successfully, and LAP (lastAddPushed) and LAC (LastAddConfirmed) will be used to mark the position currently pushed and the position where the storage ack is received respectively.
The value of the LAC metadata in each entry being pushed is the position value of the last received ack when an entry sending request is created at the current moment. The position of LAC and preceding messages are visible to the read client.
In addition, Pulsar uses the fencing mechanism to prevent multiple clients from writing to the same ledger at the same time. This is suitable for scenarios where the ownership of one topic/partition is transferred from one broker to another.
Message replica distribution
When each entry is written, the set of Qw bookie nodes in the ensemble list with which the current entry needs to be written will be determined based on the entry ID of the current message and the entry ID at the start of the currently used ensemble list (key value). Then, the broker will send a write request to these bookie nodes. After Qa write acks are received, the current message will be considered written and stored successfully. At this point, Qa message replicas can be guaranteed at the least.
As shown above, the ledger selects 4 bookie nodes (bookies 1–4), a message is written to 3 nodes each time, and after 2 write acks are received, the message will be considered stored successfully. The ensemble selected by the current ledger uses bookies 1, 2, and 3 to write entry 1 and uses bookies 2, 3, and 4 to write entry 2, and entry 3 will be written to bookies 3, 4, and 1 according to the calculation result.
By default, each bookie in a Pulsar BookKeeper cluster is started with the recovery service enabled automatically, which performs the following tasks:
Bookie nodes in a BookKeeper cluster will elect a master bookie through ZooKeeper's temporary node mechanism, which mainly performs the following tasks:
The data in a ledger is recovered by fragment (each fragment corresponds to a set of ensemble lists in the ledger; if there are multiple lists, multiple fragments need to be processed).
To perform recovery, the first step is to determine which storage nodes in which fragments of the current ledger need to be replaced with new candidate nodes for data recovery. If there is no entry data on an associated bookie node in a fragment (determined based on whether the entries at the start and end exist by default), the bookie node needs to be replaced, and the fragment requires data recovery.
After the data in the fragment is recovered with the new bookie node, the original data of the ensemble list in the current fragment will be updated in the metadata of the ledger.
In scenarios where the number of data replicas is reduced by failures of bookie nodes, after this process is completed, the number will be gradually restored to Qw (i.e., the number of replicas specified on the backend, which is 3 in TDMQ by default).