Skip to content

Commit

Permalink
Make upsert inner segment update atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Nov 30, 2021
1 parent eff570b commit 7f1602e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
* <p>For multiple records with the same comparison value (default to timestamp), the manager will preserve the latest
* record based on the sequence number of the segment. If 2 records with the same comparison value are in the same
* segment, the one with larger doc id will be preserved. Note that for tables with sorted column, the records will be
* re-ordered when committing the segment, and we will use the re-ordered doc ids instead of the ingestion order to
* decide which record to preserve.
* re-ordered when committing the segment, and we will use the re-ordered doc ids instead of the ingestion doc ids to
* decide the record to preserve.
*
* <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
* operation is done:
Expand All @@ -61,6 +61,7 @@
* </li>
* </ul>
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@ThreadSafe
public class PartitionUpsertMetadataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
Expand Down Expand Up @@ -100,20 +101,18 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter

while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
_primaryKeyToRecordLocationMap
.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), (primaryKey, currentRecordLocation) -> {
_primaryKeyToRecordLocationMap.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key

// The current record is in the same segment
// Update the record location when there is a tie to keep the newer record. Note that the record info
// iterator
// will return records with incremental doc ids.
// iterator will return records with incremental doc ids.
IndexSegment currentSegment = currentRecordLocation.getSegment();
if (segment == currentSegment) {
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
validDocIds.remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
validDocIds.replace(currentRecordLocation.getDocId(), recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
return currentRecordLocation;
Expand All @@ -122,12 +121,9 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter

// The current record is in an old segment being replaced
// This could happen when committing a consuming segment, or reloading a completed segment. In this
// case, we
// want to update the record location when there is a tie because the record locations should point to
// the new
// added segment instead of the old segment being replaced. Also, do not update the valid doc ids for
// the old
// segment because it has not been replaced yet.
// case, we want to update the record location when there is a tie because the record locations should
// point to the new added segment instead of the old segment being replaced. Also, do not update the valid
// doc ids for the old segment because it has not been replaced yet.
String currentSegmentName = currentSegment.getSegmentName();
if (segmentName.equals(currentSegmentName)) {
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
Expand All @@ -140,14 +136,14 @@ public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIter

// The current record is in a different segment
// Update the record location when getting a newer comparison value, or the value is the same as the
// current
// value, but the segment has a larger sequence number (the segment is newer than the current segment).
// current value, but the segment has a larger sequence number (the segment is newer than the current
// segment).
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) > 0 || (
recordInfo._comparisonValue == currentRecordLocation.getComparisonValue() && LLCSegmentName
.isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
.isLowLevelConsumerSegmentName(currentSegmentName)
&& LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
.getSequenceNumber(currentSegmentName))) {
recordInfo._comparisonValue == currentRecordLocation.getComparisonValue()
&& LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
&& LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
&& LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
currentSegmentName))) {
assert currentSegment.getValidDocIds() != null;
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
validDocIds.add(recordInfo._docId);
Expand Down Expand Up @@ -175,8 +171,8 @@ public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, Gene
// For partial-upsert, need to ensure all previous records are loaded before inserting new records.
if (_partialUpsertHandler != null) {
while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
LOGGER
.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
LOGGER.info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}",
_tableNameWithType);
try {
//noinspection BusyWait
Thread.sleep(1000L);
Expand All @@ -186,27 +182,32 @@ public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, Gene
}
}

ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
assert validDocIds != null;
_result = record;
_primaryKeyToRecordLocationMap
.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction), (primaryKey, currentRecordLocation) -> {
_primaryKeyToRecordLocationMap.compute(hashPrimaryKey(recordInfo._primaryKey, _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key

// Update the record location when the new comparison value is greater than or equal to the current value
// . Update
// the record location when there is a tie to keep the newer record.
// Update the record location when the new comparison value is greater than or equal to the current value.
// Update the record location when there is a tie to keep the newer record.
if (recordInfo._comparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
if (_partialUpsertHandler != null) {
// Partial upsert
_reuse.clear();
GenericRow previousRecord = currentSegment.getRecord(currentRecordLocation.getDocId(), _reuse);
GenericRow previousRecord = currentSegment.getRecord(currentDocId, _reuse);
_result = _partialUpsertHandler.merge(previousRecord, record);
}
assert currentSegment.getValidDocIds() != null;
currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
assert segment.getValidDocIds() != null;
segment.getValidDocIds().add(recordInfo._docId);
if (segment == currentSegment) {
validDocIds.replace(currentDocId, recordInfo._docId);
} else {
assert currentSegment.getValidDocIds() != null;
currentSegment.getValidDocIds().remove(currentDocId);
validDocIds.add(recordInfo._docId);
}
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
} else {
if (_partialUpsertHandler != null) {
Expand All @@ -219,8 +220,7 @@ public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, Gene
}
} else {
// New primary key
assert segment.getValidDocIds() != null;
segment.getValidDocIds().add(recordInfo._docId);
validDocIds.add(recordInfo._docId);
return new RecordLocation(segment, recordInfo._docId, recordInfo._comparisonValue);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public synchronized void remove(int docId) {
_mutableRoaringBitmap.remove(docId);
}

public synchronized void replace(int oldDocId, int newDocId) {
_mutableRoaringBitmap.remove(oldDocId);
_mutableRoaringBitmap.add(newDocId);
}

public synchronized MutableRoaringBitmap getMutableRoaringBitmap() {
return _mutableRoaringBitmap.clone();
}
Expand Down

0 comments on commit 7f1602e

Please sign in to comment.