Message Metadata Composition
The message data of each partition topic in Apache Pulsar is stored in the form of ledgers on the bookie storage nodes of the BookKeeper cluster. Each ledger contains a set of entries, and the bookie nodes only write, search for, and obtain data by entry.
Note
When messages are produced in batches, one entry may contain multiple messages. Therefore, entries and messages are not necessarily in a one-to-one correspondence.
Ledgers and entries correspond to different metadata.
The ledger metadata is stored on ZooKeeper.
In addition to message data, an entry contains metadata. The entry data is stored on the bookie storage nodes.
|
ledger | ensemble size (E) | Number of bookie nodes selected by each ledger. | Metadata is stored on ZooKeeper. |
|
| write quorum size (Qw) | Number of bookie nodes to which each entry sends write requests. |
|
| ack quorum size (Qa) | Number of write acknowledgments received to confirm that a write operation is successful. |
|
| Ensembles (E) | List of ensembles used, in the format of a <entry id,="" ensembles=""> tuple. key (entry id): start entry ID of the ensemble list used. value (ensembles): list of bookie node IP addresses selected by a ledger. Each value contains IP addresses specified by the ensemble size (E). Each ledger may contain multiple ensemble lists, and only one ensemble list is in use at a time. |
Entry | Ledger ID | ID of the ledger to which an entry belongs. | Data is stored on the bookie storage nodes. |
|
| Entry ID | Current entry ID. |
|
| Last Add Confirmed | Latest written entry ID when the entry is created. |
|
| Digest | CRC |
When a ledger is created, the number of bookie nodes corresponding to the ensemble size is selected from the list of writable bookie candidate nodes in the existing BookKeeper cluster. If there are not enough candidate nodes, BKNotEnoughBookiesException is displayed. After candidate nodes are selected, the information is combined into <entry id, ensembles> tuples and stored in ensembles in the ledger metadata.
Message Replica Mechanisms
Message writing process
When the client writes a message, each entry sends a write request to Qw bookie nodes in the ensemble list used by the ledger. When Qa write acknowledgments are received, the message is considered to be written to the storage successfully. In addition, lastAddPushed (LAP) and LastAddConfirmed (LAC) are used to mark the push location and the location where the storage acknowledgment message has been received.
The LAC metadata value in each entry being pushed is the latest acknowledged location value that has been received when the entry request at the current moment is created. The LAC location and previous messages are visible to the read client.
In addition, through the fencing mechanism, Apache Pulsar prevents multiple clients from performing write operations to the same ledger at the same time. The fencing mechanism mainly applies to the scenario where the ownership of a topic/partition changes from one broker to another broker.
Message replica distribution
When each entry is written, the group of Qw bookie nodes in the ensemble list that the entry needs to use is calculated based on the entry ID of the current message and the start entry ID (that is, the key value) of the used ensemble list. Then, the broker sends write requests to these bookie nodes. When Qa write acknowledgments are received, it is considered that the message is written to the storage successfully, and at least Qa message replicas can be guaranteed.
As shown in the preceding figure, the ledger selects four bookie nodes (bookie nodes 1 to 4). Message data is written to 3 nodes each time. When 2 write acknowledgments are received, the message data is stored successfully. The ensembles selected by the ledger start from entry 1, and bookie1, bookie2, and bookie3 are used for writing. When entry 2 is written, bookie2, bookie3, and bookie4 are selected. Entry 3 is written to bookie3, bookie4, and bookie1 based on the calculation result.
Message Recovery Mechanisms
By default, the recovery service is automatically enabled for each bookie node in a BookKeeper cluster of Apache Pulsar. The service performs the following tasks:
1. auditorElector: election audit
2. replicationWorker: replication task
3. deathWatcher: crash monitoring
Bookie nodes in a BookKeeper cluster select a primary bookie node through the ephemeral node mechanism of ZooKeeper, and the primary bookie node performs the following tasks:
1. Monitoring bookie node changes
2. Marking the ledgers on failed bookie nodes as Underreplicated in ZooKeeper
3. Checking the number of replicas of all ledgers (the default cycle is one week)
4. Checking the number of replicas of entries (not enabled by default)
The data in a ledger is restored by fragment. (Each fragment corresponds to an ensemble list in the ledger. If there are multiple lists in a ledger, multiple fragments need to be processed.)
During recovery, first determine the storage nodes in fragments of the current ledger that need to be replaced with new candidate nodes for data recovery. If there is no entry data on an associated bookie node in a fragment (determined based on whether the start and end entries exist by default), the bookie node needs to be replaced, and the fragment requires data recovery.
After the data in the fragment is restored with the new bookie node, the original data of the ensemble list in the current fragment is updated in the metadata of the ledger.
In scenarios where the number of data replicas is reduced by bookie node failures, the number of data replicas is gradually restored to Qw (the number of replicas specified in the background, which is 3 in Apache Pulsar by default) after this process is completed.