You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.
TransactionBuffer has provided the ability to check if the transaction is aborted and get the max read position of the topic for achieving transaction read committed isolation.
a. The max read position is the safe read position of the topic; all the transactions before the max read position is closed.
b. The transaction buffer has an aborted transaction list, which is able to filter out the uncommitted messages.
In order to avoid building up the transaction buffer by replaying all the messages from the original topic, the transaction buffer snapshots are taken periodically.
The snapshot will persist into a system topic under a namespace.
Motivation
In the background, we know that TransactionBuffer takes snapshots periodically. And the snapshot contains a position (maxReadPosition) and a LinkedMap (/aborts)/ which stores all the aborted transactions of the topic. But when the topic with long-term data retention and the topic has a lot of aborted transactions, a snapshot will gradually become a bottleneck that cannot accommodate enough aborted transactions, and the cost of snapshot updates will increase as the snapshot becomes larger.
The proposal will introduce the segmented transaction buffer snapshot which is able to split the snapshot into multiple parts and persist each part with an Entry to improve the above situation.
Goal
1 Support a large number of the abort transactions
2 Improve transaction buffer recovery speed.
3 Solve the problem of taking snapshots in system topic write amplification.
Approach Overview
We implement multiple snapshot segments through a secondary index design. Index and snapshot segments are stored in different compact topics. Find the snapshot segment by the index for TB to recover. After taking the snapshot segment, update the index in the index topic.
Snapshot segment
The snapshot segment is the immutable segment that contains the fixed number of aborted transactions and maxReadPosition identity of the position that this snapshot segment’s last abort transaction marker is persistent in the original topic. The size of the snapshot segment of the aborted transactions can be configured.
Snapshot topic
A new system topic will be used to store the snapshot segments. And the key of the snapshot is set to /multiple-sequenceID-topicName/. The sequence ID indicates the times of taking snapshots. When the transaction buffer takes snapshots, it will increase the sequence ID and use the sequence ID as a part of the key to taking snapshots. When the transaction buffer recovers, it obtains the max sequence ID in the indexes as its sequence ID. The sequence ID can be used to ensure the uniqueness of the corresponding snapshot, and to delete the snapshot by key = /multiple-sequenceID-topicName and value =null/.
The persistent deletion of the snapshot segment is to check whether the ledger of the max read position in the snapshot segment has been deleted in the original topic after a transaction marker is written into the original topic. If the ledger does not exist, we can delete the snapshot in the snapshot topic and update its index.
/Figure 2: snapshot topic/
Snapshot segment index
The snapshot index storing the snapshot index information will be written to a separate topic (/__transaction_buffer_snapshot_index/) so that we can read the index first instead of reading all the data of the transaction buffer snapshot segments in the snapshot topic.
We will store temporary snapshots (not reaching the fixed number of aborts stored in the snapshot segment) in the index. If we write in the snapshot topic, we also need to update the index, it will cause unnecessary overhead.
/Figure 2: snapshot segment workflow/
index topic
The index topic is a system topic shared by all topics under a namespace, and it is used to store the indexes of the snapshot segments. The indexes are updated after every time a snapshot segment is written into the snapshot topic or when needed to store the latest maxReadPosition and aborted transactions.
In the following example, the index topic stores the indexes for topic1 and topic2.
/Figure 3: snapshot index topic/
Workflow Overview
In the following figure, the aborted transaction divides into 4 segments and every segment has its max read position. The latest segment only has 3 aborted transactions, but its max read position has changed to (10:600). So the latest segment will be written into an index, and be persistent in the index topic.
When we need to check whether a transaction message is sent by an aborted transaction, we can use the position of the message to get the corresponding segment and check whether the transaction ID exists in the segment. When the transaction buffer recovers, it reads the index topic to rebuild the indexes map and uses the index to read the snapshots into memory. After recovering the transaction buffer with snapshots, the transaction buffer will read the original topic from the max read position of the latest segment to LAC to continue recovering aborted transactions and the max read position.
!
/Figure 3: transaction buffer segmented snapshot workflow/
Configuration
Configuration
Description
Already exist
transactionBuffeSnapshotSegmentSize = 256 kb
To determine the size of the snapshot segment.
false
transactionBufferSegmentedSnapshotEnabled = false
Whether to enable segmented transaction buffer snapshot which is able to handle a large number of aborted transactions.
false
transactionBufferSnapshotMaxTransactionCount
Transaction buffer updates the snapshot index after the number of transaction operations reaches this value.
true
transactionBufferSnapshotMinTimeInMillis
Transaction buffer updates the snapshot index interval time.
true
Implementation
The implementation of the transaction buffer snapshot segment index can be found below. It contains the indexes of the snapshot segment and the latest unsealed snapshot segment.
public class TransactionBufferSnapshotIndexes{ public enum Type{ Indexes, UnsealedSnapshot
}
@builder @DaTa @AllArgsConstructor @NoArgsConstructor public static class TransactionBufferSnapshotIndex{ public longsequenceID; public longmaxReadPositionLedgerID; public longmaxReadPositionEntryID; public longpersistentPositionLedgerID; public longpersistentPositionEntryID;
}
@DaTa @AllArgsConstructor @NoArgsConstructor public static class TransactionBufferSnapshot{ private StringtopicName; private longsequenceId; private longmaxReadPositionLedgerId; private longmaxReadPositionEntryId; private List<TxnID> aborts;
}
}
Metrics Change
We need to add some metrics to AbortedTxnProcesor to help users adjust the configuration.
Mainly add the following two configurations:
Name
Labels
Type
Description
pulsar_txn_tb_snapshot_segment_op_total
op=“add_del_read”
Counter
This metric is the count of operations for the pulsar transaction buffer snapshot segment. The operation can be add, delete or read.
pulsar_txn_tb_snapshot_index_op_total
op=“add_del_read”
Counter
This metric is the count of operations for the pulsar transaction buffer snapshot index. The operation can be add, delete or read.
pulsar_txn_tb_snapshot_index_total
null
Gauge
This metric records the number of the snapshot indexes maintained in the pulsar transaction buffer.
pulsar_txn_tb_snapshot_index_entry_bytes
null
Histogram
This metric records the size of the snapshot index entry maintained in the pulsar transaction buffer.
compatibility
### Upgrade
We keep the original snapshot topic and implement a new solution on the new snapshot topic and index topic. In this way, we can try new solutions without affecting the data in the original snapshot topic. After the TransactionBufferEnableSnapshotSegment is enabled for the first time, then when the transaction buffer recovers, it reads the index topic first. If the index topic has its index, the transaction buffer will be recovered with the new implementation. Otherwise, the transaction buffer will be recovered from the old snapshot topic, then the old snapshot will be written to the new snapshot topic as the first snapshot segment and its index will be written to the snapshot index topic.
Enable transaction buffer snapshot segment by configuring TransactionBufferEnableSnapshotSegment = true.
Configure the size of the snapshot segment according to the specific business environment. The transactionBuffeSnapshotSegmentSize defaults to 256kb.
Restart the broker cluster.
### Downgrade
TransactionBufferEnableSnapshotSegment can also be turned off if the user does not wish to continue using the snapshot segment. Then the transaction buffer will continue to use the original snapshot topic and the original code logic for recovery.
Close the transaction buffer snapshot segment by configuring TransactionBufferEnableSnapshotSegment = false.
Restart the broker cluster.
Implement Step
Add configurations.
Implement a system topic client for snapshot topic and index topic.
Implement a new processor to handle transaction buffer aborted transactions.
Add transaction buffer snapshot metrics.
Reject approaches
optimize the data structure that stores the aborted transactions
The transaction buffer uses a LinkedMap<TxnID, Position> to store aborted transactions. This position is where the transaction commits or abort marker is written.
We have tried using RoaringBitmap to store TxnID, but RoaringBitmap does not compress well for huge data intervals. And transaction. In normal business scenarios, there may be an aborted transaction every 10W transactions. In this case, RoaringBitmap will use more memory.
Since the shared system topic has a series of disadvantages mentioned above, we consider the implementation of one managerLedger per topic transaction buffer.
In this case, the topic transaction buffer can easily delete useless data, and the two questions above can be easily resolved.
But this solution will increase the metadata used by the topic.
The text was updated successfully, but these errors were encountered:
Original Issue: apache#16913
Background
a. The max read position is the safe read position of the topic; all the transactions before the max read position is closed.
b. The transaction buffer has an aborted transaction list, which is able to filter out the uncommitted messages.
Motivation
In the background, we know that TransactionBuffer takes snapshots periodically. And the snapshot contains a position (
maxReadPosition
) and a LinkedMap (/aborts
)/ which stores all the aborted transactions of the topic. But when the topic with long-term data retention and the topic has a lot of aborted transactions, a snapshot will gradually become a bottleneck that cannot accommodate enough aborted transactions, and the cost of snapshot updates will increase as the snapshot becomes larger.The proposal will introduce the segmented transaction buffer snapshot which is able to split the snapshot into multiple parts and persist each part with an Entry to improve the above situation.
Goal
1 Support a large number of the abort transactions
2 Improve transaction buffer recovery speed.
3 Solve the problem of taking snapshots in system topic write amplification.
Approach Overview
We implement multiple snapshot segments through a secondary index design. Index and snapshot segments are stored in different compact topics. Find the snapshot segment by the index for TB to recover. After taking the snapshot segment, update the index in the index topic.
Snapshot segment
The snapshot segment is the immutable segment that contains the fixed number of aborted transactions and maxReadPosition identity of the position that this snapshot segment’s last abort transaction marker is persistent in the original topic. The size of the snapshot segment of the aborted transactions can be configured.
Snapshot topic
A new system topic will be used to store the snapshot segments. And the key of the snapshot is set to /multiple-sequenceID-topicName/. The sequence ID indicates the times of taking snapshots. When the transaction buffer takes snapshots, it will increase the sequence ID and use the sequence ID as a part of the key to taking snapshots. When the transaction buffer recovers, it obtains the max sequence ID in the indexes as its sequence ID. The sequence ID can be used to ensure the uniqueness of the corresponding snapshot, and to delete the snapshot by key = /multiple-sequenceID-topicName and value =null/.

The persistent deletion of the snapshot segment is to check whether the ledger of the max read position in the snapshot segment has been deleted in the original topic after a transaction marker is written into the original topic. If the ledger does not exist, we can delete the snapshot in the snapshot topic and update its index.
/Figure 2: snapshot topic/
Snapshot segment index
The snapshot index storing the snapshot index information will be written to a separate topic (/__transaction_buffer_snapshot_index/) so that we can read the index first instead of reading all the data of the transaction buffer snapshot segments in the snapshot topic.

We will store temporary snapshots (not reaching the fixed number of aborts stored in the snapshot segment) in the index. If we write in the snapshot topic, we also need to update the index, it will cause unnecessary overhead.
/Figure 2: snapshot segment workflow/
index topic
The index topic is a system topic shared by all topics under a namespace, and it is used to store the indexes of the snapshot segments. The indexes are updated after every time a snapshot segment is written into the snapshot topic or when needed to store the latest maxReadPosition and aborted transactions.

In the following example, the index topic stores the indexes for topic1 and topic2.
/Figure 3: snapshot index topic/
Workflow Overview
In the following figure, the aborted transaction divides into 4 segments and every segment has its max read position. The latest segment only has 3 aborted transactions, but its max read position has changed to (10:600). So the latest segment will be written into an index, and be persistent in the index topic.
When we need to check whether a transaction message is sent by an aborted transaction, we can use the position of the message to get the corresponding segment and check whether the transaction ID exists in the segment. When the transaction buffer recovers, it reads the index topic to rebuild the indexes map and uses the index to read the snapshots into memory. After recovering the transaction buffer with snapshots, the transaction buffer will read the original topic from the max read position of the latest segment to LAC to continue recovering aborted transactions and the max read position.
!
/Figure 3: transaction buffer segmented snapshot workflow/
Configuration
Implementation
The implementation of the transaction buffer snapshot segment index can be found below. It contains the indexes of the snapshot segment and the latest unsealed snapshot segment.
Metrics Change
We need to add some metrics to AbortedTxnProcesor to help users adjust the configuration.
Mainly add the following two configurations:
compatibility
### Upgrade
We keep the original snapshot topic and implement a new solution on the new snapshot topic and index topic. In this way, we can try new solutions without affecting the data in the original snapshot topic. After the TransactionBufferEnableSnapshotSegment is enabled for the first time, then when the transaction buffer recovers, it reads the index topic first. If the index topic has its index, the transaction buffer will be recovered with the new implementation. Otherwise, the transaction buffer will be recovered from the old snapshot topic, then the old snapshot will be written to the new snapshot topic as the first snapshot segment and its index will be written to the snapshot index topic.
### Downgrade
TransactionBufferEnableSnapshotSegment can also be turned off if the user does not wish to continue using the snapshot segment. Then the transaction buffer will continue to use the original snapshot topic and the original code logic for recovery.
Implement Step
Reject approaches
optimize the data structure that stores the aborted transactions
The transaction buffer uses a LinkedMap<TxnID, Position> to store aborted transactions. This position is where the transaction commits or abort marker is written.
We have tried using RoaringBitmap to store TxnID, but RoaringBitmap does not compress well for huge data intervals. And transaction. In normal business scenarios, there may be an aborted transaction every 10W transactions. In this case, RoaringBitmap will use more memory.
The test report can be found here .
Change to managerLedger implementation
Since the shared system topic has a series of disadvantages mentioned above, we consider the implementation of one managerLedger per topic transaction buffer.
In this case, the topic transaction buffer can easily delete useless data, and the two questions above can be easily resolved.
But this solution will increase the metadata used by the topic.
The text was updated successfully, but these errors were encountered: