From 77d8c7861ba14d43b68c9c828631f4a389a768f3 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 10:20:22 +0000
Subject: [PATCH 01/17] Add StateStoreUpdatesWrapper.addFile...
---
.../testutils/StateStoreUpdatesWrapper.java | 96 +++++++++++++++++++
.../TransactionLogFileReferenceStoreTest.java | 31 +++---
2 files changed, 112 insertions(+), 15 deletions(-)
create mode 100644 java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
new file mode 100644
index 0000000000..b622c5c67a
--- /dev/null
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.core.statestore.testutils;
+
+import sleeper.core.statestore.AllReferencesToAFile;
+import sleeper.core.statestore.FileReference;
+import sleeper.core.statestore.StateStore;
+import sleeper.core.statestore.StateStoreException;
+import sleeper.core.statestore.exception.FileAlreadyExistsException;
+import sleeper.core.statestore.transactionlog.AddTransactionRequest;
+import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
+import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
+
+import java.util.List;
+
+/**
+ * Wraps a state store and exposes methods for shortcuts during tests.
+ */
+public class StateStoreUpdatesWrapper {
+
+ private final StateStore stateStore;
+
+ private StateStoreUpdatesWrapper(StateStore stateStore) {
+ this.stateStore = stateStore;
+ }
+
+ /**
+ * Wraps the given state store. Has a short name for ease of use in tests when it's statically imported.
+ *
+ * @param stateStore the state store
+ * @return the wrapper
+ */
+ public static StateStoreUpdatesWrapper update(StateStore stateStore) {
+ return new StateStoreUpdatesWrapper(stateStore);
+ }
+
+ /**
+ * Adds a file to the table, with one reference.
+ *
+ * @param fileReference the reference to be added
+ * @throws FileAlreadyExistsException if the file already exists
+ * @throws StateStoreException if the update fails for another reason
+ */
+ public void addFile(FileReference fileReference) throws StateStoreException {
+ addFiles(List.of(fileReference));
+ }
+
+ /**
+ * Adds files to the Sleeper table, with any number of references. Each reference to be added should be for a file
+ * which does not yet exist in the table.
+ *
+ * When adding multiple references for a file, a file must never be referenced in two partitions where one is a
+ * descendent of another. This means each record in a file must only be covered by one reference. A partition covers
+ * a range of records. A partition which is the child of another covers a sub-range within the parent partition.
+ *
+ * @param fileReferences The file references to be added
+ * @throws FileAlreadyExistsException if a file already exists
+ * @throws StateStoreException if the update fails for another reason
+ */
+ public void addFiles(List fileReferences) throws StateStoreException {
+ addFilesWithReferences(AllReferencesToAFile.newFilesWithReferences(fileReferences));
+ }
+
+ /**
+ * Adds files to the Sleeper table, with any number of references. Each new file should be specified once, with all
+ * its references.
+ *
+ * A file must never be referenced in two partitions where one is a descendent of another. This means each record in
+ * a file must only be covered by one reference. A partition covers a range of records. A partition which is the
+ * child of another covers a sub-range within the parent partition.
+ *
+ * @param files The files to be added
+ * @throws FileAlreadyExistsException if a file already exists
+ * @throws StateStoreException if the update fails for another reason
+ */
+ public void addFilesWithReferences(List files) throws StateStoreException {
+ AddFilesTransaction transaction = new AddFilesTransaction(files);
+ stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction)
+ .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> transaction.validateFiles(state)))
+ .build());
+ }
+
+}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 8394763698..9a88822184 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -63,6 +63,7 @@
import static sleeper.core.statestore.FilesReportTestHelper.readyForGCFilesReport;
import static sleeper.core.statestore.ReplaceFileReferencesRequest.replaceJobFileReferences;
import static sleeper.core.statestore.SplitFileReferenceRequest.splitFileToChildPartitions;
+import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;
public class TransactionLogFileReferenceStoreTest extends InMemoryTransactionLogStateStoreTestBase {
@@ -85,8 +86,8 @@ public void shouldAddAndReadActiveFiles() {
// When
store.fixFileUpdateTime(fixedUpdateTime);
- store.addFile(file1);
- store.addFiles(List.of(file2, file3));
+ update(store).addFile(file1);
+ update(store).addFiles(List.of(file2, file3));
// Then
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(file1, file2, file3);
@@ -105,7 +106,7 @@ void shouldSetLastUpdateTimeForFile() {
// When
store.fixFileUpdateTime(updateTime);
- store.addFile(file);
+ update(store).addFile(file);
// Then
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(withLastUpdate(updateTime, file));
@@ -120,7 +121,7 @@ void shouldAddFileSplitOverTwoPartitions() {
FileReference leftFile = splitFile(rootFile, "L");
FileReference rightFile = splitFile(rootFile, "R");
store.fixFileUpdateTime(updateTime);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
// When / Then
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(
@@ -137,7 +138,7 @@ void shouldAddFileWithReferencesSplitOverTwoPartitions() {
FileReference leftFile = splitFile(rootFile, "L");
FileReference rightFile = splitFile(rootFile, "R");
store.fixFileUpdateTime(updateTime);
- store.addFilesWithReferences(List.of(fileWithReferences(List.of(leftFile, rightFile))));
+ update(store).addFilesWithReferences(List.of(fileWithReferences(List.of(leftFile, rightFile))));
// When / Then
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(
@@ -160,7 +161,7 @@ void shouldAddTwoFilesWithReferences() {
FileReference rightFile1 = splitFile(file1, "R");
FileReference file2 = factory.rootFile("file2", 100L);
store.fixFileUpdateTime(updateTime);
- store.addFilesWithReferences(List.of(
+ update(store).addFilesWithReferences(List.of(
fileWithReferences(List.of(leftFile1, rightFile1)),
fileWithReferences(List.of(file2))));
@@ -181,7 +182,7 @@ void shouldAddFileWithNoReferencesForGC() {
// Given
Instant updateTime = Instant.parse("2023-12-01T10:45:00Z");
store.fixFileUpdateTime(updateTime);
- store.addFilesWithReferences(List.of(fileWithNoReferences("test-file")));
+ update(store).addFilesWithReferences(List.of(fileWithNoReferences("test-file")));
// When / Then
assertThat(store.getFileReferences()).isEmpty();
@@ -198,7 +199,7 @@ void shouldFailToAddSameFileTwice() {
Instant updateTime = Instant.parse("2023-12-01T10:45:00Z");
FileReference file = factory.rootFile("file1", 100L);
store.fixFileUpdateTime(updateTime);
- store.addFile(file);
+ update(store).addFile(file);
// When / Then
assertThatThrownBy(() -> store.addFile(file))
@@ -216,7 +217,7 @@ void shouldFailToAddAnotherReferenceForSameFile() {
FileReference file = factory.rootFile("file1", 100L);
FileReference leftFile = splitFile(file, "L");
FileReference rightFile = splitFile(file, "R");
- store.addFile(leftFile);
+ update(store).addFile(leftFile);
// When / Then
assertThatThrownBy(() -> store.addFile(rightFile))
@@ -236,7 +237,7 @@ void shouldSplitOneFileInRootPartition() {
// Given
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
SplitFileReferences.from(store).split();
@@ -255,7 +256,7 @@ void shouldSplitTwoFilesInOnePartition() {
splitPartition("root", "L", "R", 5);
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
// When
SplitFileReferences.from(store).split();
@@ -281,7 +282,7 @@ void shouldSplitOneFileFromTwoOriginalPartitions() {
FileReference file = factory.rootFile("file", 100L);
FileReference leftFile = splitFile(file, "L");
FileReference rightFile = splitFile(file, "R");
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
// When
SplitFileReferences.from(store).split();
@@ -306,7 +307,7 @@ void shouldSplitFilesInDifferentPartitions() {
splitPartition("R", "RL", "RR", 7);
FileReference file1 = factory.partitionFile("L", "file1", 100L);
FileReference file2 = factory.partitionFile("R", "file2", 200L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
// When
SplitFileReferences.from(store).split();
@@ -330,7 +331,7 @@ void shouldOnlyPerformOneLevelOfSplits() {
splitPartition("L", "LL", "LR", 2L);
splitPartition("R", "RL", "RR", 7L);
FileReference file = factory.rootFile("file.parquet", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
SplitFileReferences.from(store).split();
@@ -350,7 +351,7 @@ void shouldNotSplitOneFileInLeafPartition() {
// Given
splitPartition("root", "L", "R", 5L);
FileReference file = factory.partitionFile("L", "already-split.parquet", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
SplitFileReferences.from(store).split();
From 77b22b0eeaff1f0b8ce384ee7f345e9017fd18d2 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 10:23:16 +0000
Subject: [PATCH 02/17] Use wrapper to add files in
TransactionLogFileReferenceStoreTest
---
.../TransactionLogFileReferenceStoreTest.java | 92 +++++++++----------
1 file changed, 46 insertions(+), 46 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 9a88822184..2453fd9b61 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -419,7 +419,7 @@ void shouldFailToSplitFileWhenTheOriginalFileWasSplitIncorrectlyToMultipleLevels
FileReference file = factory.rootFile("file", 100L);
FileReference leftFile = splitFile(file, "L");
FileReference nestedFile = splitFile(leftFile, "LL");
- store.addFile(file);
+ update(store).addFile(file);
// Ideally this would fail as this produces duplicate references to the same records,
// but not all state stores may be able to implement that
@@ -440,7 +440,7 @@ void shouldThrowExceptionWhenSplittingFileHasBeenAssignedToTheJob() {
// Given
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
- store.addFile(file);
+ update(store).addFile(file);
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file"))));
@@ -463,7 +463,7 @@ class CreateCompactionJobs {
public void shouldMarkFileWithJobId() {
// Given
FileReference file = factory.rootFile("file", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
store.assignJobIds(List.of(
@@ -481,7 +481,7 @@ public void shouldMarkOneHalfOfSplitFileWithJobId() {
FileReference file = factory.rootFile("file", 100L);
FileReference left = splitFile(file, "L");
FileReference right = splitFile(file, "R");
- store.addFiles(List.of(left, right));
+ update(store).addFiles(List.of(left, right));
// When
store.assignJobIds(List.of(
@@ -497,7 +497,7 @@ public void shouldMarkMultipleFilesWithJobIds() {
// Given
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
// When
store.assignJobIds(List.of(
@@ -515,7 +515,7 @@ public void shouldMarkMultipleFilesWithJobIds() {
public void shouldNotMarkFileWithJobIdWhenOneIsAlreadySet() {
// Given
FileReference file = factory.rootFile("file", 100L);
- store.addFile(file);
+ update(store).addFile(file);
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file"))));
@@ -533,7 +533,7 @@ public void shouldNotUpdateOtherFilesIfOneFileAlreadyHasJobId() {
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
FileReference file3 = factory.rootFile("file3", 100L);
- store.addFiles(List.of(file1, file2, file3));
+ update(store).addFiles(List.of(file1, file2, file3));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file2"))));
@@ -550,7 +550,7 @@ public void shouldNotUpdateOtherFilesIfOneFileAlreadyHasJobId() {
public void shouldNotMarkFileWithJobIdWhenFileDoesNotExist() {
// Given
FileReference file = factory.rootFile("existingFile", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When / Then
assertThatThrownBy(() -> store.assignJobIds(List.of(
@@ -576,7 +576,7 @@ public void shouldNotMarkFileWithJobIdWhenReferenceDoesNotExistInPartition() {
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
FileReference existingReference = splitFile(file, "L");
- store.addFile(existingReference);
+ update(store).addFile(existingReference);
// When / Then
assertThatThrownBy(() -> store.assignJobIds(List.of(
@@ -596,7 +596,7 @@ void shouldFilesNotYetAssigned() {
// Given
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
// When / Then
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -609,7 +609,7 @@ void shouldCheckAllFilesAssigned() {
// Given
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1", "file2"))));
// When / Then
@@ -623,7 +623,7 @@ void shouldCheckSomeFilesAssigned() {
// Given
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1"))));
// When / Then
@@ -642,7 +642,7 @@ void shouldCheckFilesAssignedOnOnePartition() {
FileReference file1R = splitFile(file1, "R");
FileReference file2L = splitFile(file2, "L");
FileReference file2R = splitFile(file2, "R");
- store.addFiles(List.of(file1L, file1R, file2L, file2R));
+ update(store).addFiles(List.of(file1L, file1R, file2L, file2R));
store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "L", List.of("file1", "file2"))));
// When / Then
@@ -666,7 +666,7 @@ void shouldFailIfFileDoesNotExist() {
void shouldFailIfFileDoesNotExistOnPartition() {
// Given
splitPartition("root", "L", "R", 5);
- store.addFile(factory.partitionFile("L", "file", 100L));
+ update(store).addFile(factory.partitionFile("L", "file", 100L));
// When / Then
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -677,7 +677,7 @@ void shouldFailIfFileDoesNotExistOnPartition() {
@Test
void shouldFailIfFileAssignedToOtherJob() {
// Given
- store.addFile(factory.rootFile("file", 100L));
+ update(store).addFile(factory.rootFile("file", 100L));
store.assignJobIds(List.of(assignJobOnPartitionToFiles("A", "root", List.of("file"))));
// When / Then
@@ -689,7 +689,7 @@ void shouldFailIfFileAssignedToOtherJob() {
@Test
void shouldFailIfOneFileDoesNotExist() {
// Given
- store.addFile(factory.rootFile("file1", 100L));
+ update(store).addFile(factory.rootFile("file1", 100L));
// When / Then
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -707,7 +707,7 @@ public void shouldSetFileReadyForGC() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFile(oldFile);
+ update(store).addFile(oldFile);
// When
store.assignJobIds(List.of(
@@ -732,7 +732,7 @@ void shouldApplyMultipleCompactions() {
FileReference newFile1 = factory.rootFile("newFile1", 100L);
FileReference oldFile2 = factory.rootFile("oldFile2", 100L);
FileReference newFile2 = factory.rootFile("newFile2", 100L);
- store.addFiles(List.of(oldFile1, oldFile2));
+ update(store).addFiles(List.of(oldFile1, oldFile2));
// When
store.assignJobIds(List.of(
@@ -757,7 +757,7 @@ void shouldFailToSetReadyForGCWhenAlreadyReadyForGC() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFile(oldFile);
+ update(store).addFile(oldFile);
// When
store.assignJobIds(List.of(
@@ -784,7 +784,7 @@ void shouldFailWhenFilesToMarkAsReadyForGCAreNotAssignedToJob() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFile(oldFile);
+ update(store).addFile(oldFile);
// When / Then
assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
@@ -812,7 +812,7 @@ public void shouldFailToSetFilesReadyForGCWhenOneDoesNotExist() {
// Given
FileReference oldFile1 = factory.rootFile("oldFile1", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFile(oldFile1);
+ update(store).addFile(oldFile1);
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile1"))));
@@ -832,7 +832,7 @@ public void shouldFailToSetFileReadyForGCWhenReferenceDoesNotExistInPartition()
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
FileReference existingReference = splitFile(file, "L");
- store.addFile(existingReference);
+ update(store).addFile(existingReference);
// When / Then
assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
@@ -847,7 +847,7 @@ public void shouldFailToSetFileReadyForGCWhenReferenceDoesNotExistInPartition()
void shouldFailWhenFileToBeMarkedReadyForGCHasSameFileNameAsNewFile() {
// Given
FileReference file = factory.rootFile("file1", 100L);
- store.addFile(file);
+ update(store).addFile(file);
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file1"))));
@@ -868,7 +868,7 @@ public void shouldFailWhenOutputFileAlreadyExists() {
FileReference file = factory.rootFile("oldFile", 100L);
FileReference existingReference = splitFile(file, "L");
FileReference newReference = factory.partitionFile("L", "newFile", 100L);
- store.addFiles(List.of(existingReference, newReference));
+ update(store).addFiles(List.of(existingReference, newReference));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("oldFile"))));
@@ -893,7 +893,7 @@ public void shouldFindFileWithNoReferencesWhichWasUpdatedLongEnoughAgo() {
Instant updateTime = Instant.parse("2023-10-04T14:08:00Z");
Instant latestTimeForGc = Instant.parse("2023-10-04T14:09:00Z");
store.fixFileUpdateTime(updateTime);
- store.addFilesWithReferences(List.of(fileWithNoReferences("readyForGc")));
+ update(store).addFilesWithReferences(List.of(fileWithNoReferences("readyForGc")));
// When / Then
assertThat(store.getReadyForGCFilenamesBefore(latestTimeForGc))
@@ -906,7 +906,7 @@ public void shouldNotFindFileWhichWasMarkedReadyForGCTooRecently() {
Instant updateTime = Instant.parse("2023-10-04T14:08:00Z");
Instant latestTimeForGc = Instant.parse("2023-10-04T14:07:00Z");
store.fixFileUpdateTime(updateTime);
- store.addFilesWithReferences(List.of(fileWithNoReferences("readyForGc")));
+ update(store).addFilesWithReferences(List.of(fileWithNoReferences("readyForGc")));
// When / Then
assertThat(store.getReadyForGCFilenamesBefore(latestTimeForGc))
@@ -924,7 +924,7 @@ public void shouldNotFindFileWhichHasTwoReferencesAndOnlyOneWasMarkedAsReadyForG
FileReference rightFile = splitFile(rootFile, "R");
FileReference compactionOutputFile = factory.partitionFile("L", "compactedFile", 100L);
store.fixFileUpdateTime(updateTime);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("splitFile"))));
store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
@@ -947,7 +947,7 @@ public void shouldFindFileWhichHasTwoReferencesAndBothWereMarkedAsReadyForGC() {
FileReference leftOutputFile = factory.partitionFile("L", "leftOutput", 100L);
FileReference rightOutputFile = factory.partitionFile("R", "rightOutput", 100L);
store.fixFileUpdateTime(updateTime);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("readyForGc")),
assignJobOnPartitionToFiles("job2", "R", List.of("readyForGc"))));
@@ -978,7 +978,7 @@ public void shouldNotFindSplitFileWhenOnlyFirstReadyForGCUpdateIsOldEnough() {
// And ingest and compactions happened at the expected times
store.fixFileUpdateTime(ingestTime);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("readyForGc")),
assignJobOnPartitionToFiles("job2", "R", List.of("readyForGc"))));
@@ -1004,7 +1004,7 @@ public void shouldDeleteGarbageCollectedFile() {
// Given
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFile(oldFile);
+ update(store).addFile(oldFile);
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
@@ -1028,7 +1028,7 @@ void shouldDeleteGarbageCollectedFileSplitAcrossTwoPartitions() {
FileReference rightOutputFile = factory.partitionFile("R", "rightOutput", 100L);
// And the file was ingested as two references, then compacted into each partition
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file")),
assignJobOnPartitionToFiles("job2", "R", List.of("file"))));
@@ -1049,7 +1049,7 @@ void shouldDeleteGarbageCollectedFileSplitAcrossTwoPartitions() {
public void shouldFailToDeleteActiveFile() {
// Given
FileReference file = factory.rootFile("test", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When / Then
assertThatThrownBy(() -> store.deleteGarbageCollectedFileReferenceCounts(List.of("test")))
@@ -1071,7 +1071,7 @@ public void shouldFailToDeleteActiveFileWhenOneOfTwoSplitRecordsIsReadyForGC() {
FileReference leftFile = splitFile(rootFile, "L");
FileReference rightFile = splitFile(rootFile, "R");
FileReference leftOutputFile = factory.partitionFile("L", "leftOutput", 100L);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file"))));
store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
@@ -1088,7 +1088,7 @@ public void shouldDeleteGarbageCollectedFileWhileIteratingThroughReadyForGCFiles
FileReference oldFile1 = factory.rootFile("oldFile1", 100L);
FileReference oldFile2 = factory.rootFile("oldFile2", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
- store.addFiles(List.of(oldFile1, oldFile2));
+ update(store).addFiles(List.of(oldFile1, oldFile2));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile1", "oldFile2"))));
store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
@@ -1108,7 +1108,7 @@ public void shouldDeleteGarbageCollectedFileWhileIteratingThroughReadyForGCFiles
public void shouldFailToDeleteActiveFileWhenAlsoDeletingReadyForGCFile() {
// Given
FileReference activeFile = factory.rootFile("activeFile", 100L);
- store.addFilesWithReferences(List.of(
+ update(store).addFilesWithReferences(List.of(
fileWithNoReferences("gcFile"),
fileWithReferences(List.of(activeFile))));
@@ -1130,7 +1130,7 @@ class ReportFileStatus {
void shouldReportOneActiveFile() {
// Given
FileReference file = factory.rootFile("test", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
AllReferencesToAllFiles report = store.getAllFilesWithMaxUnreferenced(5);
@@ -1142,7 +1142,7 @@ void shouldReportOneActiveFile() {
@Test
void shouldReportOneReadyForGCFile() {
// Given
- store.addFilesWithReferences(List.of(fileWithNoReferences("test")));
+ update(store).addFilesWithReferences(List.of(fileWithNoReferences("test")));
// When
AllReferencesToAllFiles report = store.getAllFilesWithMaxUnreferenced(5);
@@ -1156,7 +1156,7 @@ void shouldReportTwoActiveFiles() {
// Given
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
- store.addFiles(List.of(file1, file2));
+ update(store).addFiles(List.of(file1, file2));
// When
AllReferencesToAllFiles report = store.getAllFilesWithMaxUnreferenced(5);
@@ -1172,7 +1172,7 @@ void shouldReportFileSplitOverTwoPartitions() {
FileReference rootFile = factory.rootFile("file", 100L);
FileReference leftFile = splitFile(rootFile, "L");
FileReference rightFile = splitFile(rootFile, "R");
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
// When
AllReferencesToAllFiles report = store.getAllFilesWithMaxUnreferenced(5);
@@ -1189,7 +1189,7 @@ void shouldReportFileSplitOverTwoPartitionsWithOneSideCompacted() {
FileReference leftFile = splitFile(rootFile, "L");
FileReference rightFile = splitFile(rootFile, "R");
FileReference outputFile = factory.partitionFile("L", 50L);
- store.addFiles(List.of(leftFile, rightFile));
+ update(store).addFiles(List.of(leftFile, rightFile));
store.assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file"))));
store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
@@ -1205,7 +1205,7 @@ void shouldReportFileSplitOverTwoPartitionsWithOneSideCompacted() {
@Test
void shouldReportReadyForGCFilesWithLimit() {
// Given
- store.addFilesWithReferences(List.of(
+ update(store).addFilesWithReferences(List.of(
fileWithNoReferences("test1"),
fileWithNoReferences("test2"),
fileWithNoReferences("test3")));
@@ -1220,7 +1220,7 @@ void shouldReportReadyForGCFilesWithLimit() {
@Test
void shouldReportReadyForGCFilesMeetingLimit() {
// Given
- store.addFilesWithReferences(List.of(
+ update(store).addFilesWithReferences(List.of(
fileWithNoReferences("test1"),
fileWithNoReferences("test2")));
@@ -1246,7 +1246,7 @@ public void shouldReturnMultipleFilesOnEachPartition() {
FileReference leftFile2 = factory.partitionFile("L", "leftFile2", 10);
FileReference rightFile1 = factory.partitionFile("R", "rightFile1", 10);
FileReference rightFile2 = factory.partitionFile("R", "rightFile2", 10);
- store.addFiles(List.of(rootFile1, rootFile2, leftFile1, leftFile2, rightFile1, rightFile2));
+ update(store).addFiles(List.of(rootFile1, rootFile2, leftFile1, leftFile2, rightFile1, rightFile2));
// When / Then
assertThat(store.getPartitionToReferencedFilesMap())
@@ -1264,7 +1264,7 @@ public void shouldNotReturnPartitionsWithNoFiles() {
// Given
splitPartition("root", "L", "R", 5);
FileReference file = factory.partitionFile("L", "file", 100);
- store.addFile(file);
+ update(store).addFile(file);
// When / Then
assertThat(store.getPartitionToReferencedFilesMap())
@@ -1279,7 +1279,7 @@ class ClearFiles {
void shouldDeleteReferencedFileOnClear() {
// Given
FileReference file = factory.rootFile("file", 100L);
- store.addFile(file);
+ update(store).addFile(file);
// When
store.clearSleeperTable();
@@ -1296,7 +1296,7 @@ void shouldDeleteReferencedFileOnClear() {
@Test
void shouldDeleteUnreferencedFileOnClear() {
// Given
- store.addFilesWithReferences(List.of(AllReferencesToAFile.builder()
+ update(store).addFilesWithReferences(List.of(AllReferencesToAFile.builder()
.filename("file")
.references(List.of())
.build()));
From 652b8d22790e8ae729bb13ca41d1dd4f5f9889e1 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 10:27:16 +0000
Subject: [PATCH 03/17] Use wrapper to split files in
TransactionLogFileReferenceStoreTest
---
.../testutils/StateStoreUpdatesWrapper.java | 37 +++++++++++++++++++
.../TransactionLogFileReferenceStoreTest.java | 10 ++---
2 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index b622c5c67a..4eba799e0a 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -17,12 +17,15 @@
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.FileReference;
+import sleeper.core.statestore.SplitFileReferenceRequest;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
+import sleeper.core.statestore.exception.SplitRequestsFailedException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
import java.util.List;
@@ -93,4 +96,38 @@ public void addFilesWithReferences(List files) throws Stat
.build());
}
+ /**
+ * Performs atomic updates to split file references. This is used to push file references down the partition tree,
+ * eg. where records are ingested to a non-leaf partition, or when a partition is split. A file referenced in a
+ * larger, non-leaf partition may be split between smaller partitions which cover non-overlapping sub-ranges of the
+ * original partition. This includes these records in compactions of the descendent partitions.
+ *
+ * The aim is to combine all records into a small number of files for each leaf partition, where the leaves of the
+ * partition tree should represent a separation of the data into manageable chunks. Compaction operates on file
+ * references to pull records from multiple files into one, when they are referenced in the same partition. This
+ * reduces the number of files in the system, and improves statistics and indexing within each partition. This
+ * should result in faster queries, and more accurate partitioning when a partition is split.
+ *
+ * Each {@link SplitFileReferenceRequest} will remove one file reference, and create new references to the same file
+ * in descendent partitions. The reference counts will be tracked accordingly.
+ *
+ * The ranges covered by the partitions of the new references must not overlap, so there
+ * must never be two references to the same file where one partition is a descendent of the other.
+ *
+ * Note that it is possible that the necessary updates may not fit in a single transaction. Each
+ * {@link SplitFileReferenceRequest} is guaranteed to be done atomically in one transaction, but it is possible that
+ * some may succeed and some may fail. If a single {@link SplitFileReferenceRequest} adds too many references to
+ * apply in one transaction, this will also fail.
+ *
+ * @param splitRequests A list of {@link SplitFileReferenceRequest}s to apply
+ * @throws SplitRequestsFailedException if any of the requests fail, even if some succeeded
+ */
+ public void splitFileReferences(List splitRequests) throws SplitRequestsFailedException {
+ try {
+ stateStore.addTransaction(AddTransactionRequest.withTransaction(new SplitFileReferencesTransaction(splitRequests)).build());
+ } catch (StateStoreException e) {
+ throw new SplitRequestsFailedException(List.of(), splitRequests, e);
+ }
+ }
+
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 2453fd9b61..55fb1788f6 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -384,7 +384,7 @@ void shouldFailToSplitFileWhichDoesNotExist() {
FileReference file = factory.rootFile("file", 100L);
// When / Then
- assertThatThrownBy(() -> store.splitFileReferences(List.of(
+ assertThatThrownBy(() -> update(store).splitFileReferences(List.of(
splitFileToChildPartitions(file, "L", "R"))))
.isInstanceOf(SplitRequestsFailedException.class)
.hasCauseInstanceOf(FileNotFoundException.class);
@@ -399,10 +399,10 @@ void shouldFailToSplitFileWhenReferenceDoesNotExistInPartition() {
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
FileReference existingReference = splitFile(file, "L");
- store.addFile(existingReference);
+ update(store).addFile(existingReference);
// When / Then
- assertThatThrownBy(() -> store.splitFileReferences(List.of(
+ assertThatThrownBy(() -> update(store).splitFileReferences(List.of(
splitFileToChildPartitions(file, "L", "R"))))
.isInstanceOf(SplitRequestsFailedException.class)
.hasCauseInstanceOf(FileReferenceNotFoundException.class);
@@ -423,7 +423,7 @@ void shouldFailToSplitFileWhenTheOriginalFileWasSplitIncorrectlyToMultipleLevels
// Ideally this would fail as this produces duplicate references to the same records,
// but not all state stores may be able to implement that
- store.splitFileReferences(List.of(new SplitFileReferenceRequest(file, List.of(leftFile, nestedFile))));
+ update(store).splitFileReferences(List.of(new SplitFileReferenceRequest(file, List.of(leftFile, nestedFile))));
// When / Then
assertThatThrownBy(() -> SplitFileReferences.from(store).split())
@@ -445,7 +445,7 @@ void shouldThrowExceptionWhenSplittingFileHasBeenAssignedToTheJob() {
assignJobOnPartitionToFiles("job1", "root", List.of("file"))));
// When / Then
- assertThatThrownBy(() -> store.splitFileReferences(List.of(splitFileToChildPartitions(file, "L", "R"))))
+ assertThatThrownBy(() -> update(store).splitFileReferences(List.of(splitFileToChildPartitions(file, "L", "R"))))
.isInstanceOf(SplitRequestsFailedException.class)
.hasCauseInstanceOf(FileReferenceAssignedToJobException.class);
assertThat(store.getFileReferences())
From c1d8b35b63fadc409e18aeb75385f50def66c3fc Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 10:32:32 +0000
Subject: [PATCH 04/17] Introduce StateStoreUpdatesWrapper.assignJobIds
---
.../impl/AssignJobIdsTransaction.java | 5 ++++
.../testutils/StateStoreUpdatesWrapper.java | 29 ++++++++++++++++++-
.../TransactionLogFileReferenceStoreTest.java | 22 +++++++-------
3 files changed, 44 insertions(+), 12 deletions(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java
index ebf915d788..d0959dd037 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java
@@ -15,6 +15,9 @@
*/
package sleeper.core.statestore.transactionlog.transaction.impl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStoreException;
@@ -35,6 +38,7 @@
* A transaction to assign files to jobs.
*/
public class AssignJobIdsTransaction implements FileReferenceTransaction {
+ public static final Logger LOGGER = LoggerFactory.getLogger(AssignJobIdsTransaction.class);
private final List requests;
@@ -54,6 +58,7 @@ public static Optional ignoringEmptyRequests(List !request.getFilenames().isEmpty())
.collect(toUnmodifiableList());
if (filtered.isEmpty()) {
+ LOGGER.warn("Attempted to create transaction with no file assignments, received requests: {}", requests);
return Optional.empty();
} else {
return Optional.of(new AssignJobIdsTransaction(filtered));
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index 4eba799e0a..8d38481c9a 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -15,16 +15,24 @@
*/
package sleeper.core.statestore.testutils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import sleeper.core.statestore.AllReferencesToAFile;
+import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.SplitFileReferenceRequest;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
+import sleeper.core.statestore.exception.FileReferenceAssignedToJobException;
+import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.exception.SplitRequestsFailedException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
+import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
import java.util.List;
@@ -33,6 +41,7 @@
* Wraps a state store and exposes methods for shortcuts during tests.
*/
public class StateStoreUpdatesWrapper {
+ public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreUpdatesWrapper.class);
private final StateStore stateStore;
@@ -124,10 +133,28 @@ public void addFilesWithReferences(List files) throws Stat
*/
public void splitFileReferences(List splitRequests) throws SplitRequestsFailedException {
try {
- stateStore.addTransaction(AddTransactionRequest.withTransaction(new SplitFileReferencesTransaction(splitRequests)).build());
+ addTransaction(new SplitFileReferencesTransaction(splitRequests));
} catch (StateStoreException e) {
throw new SplitRequestsFailedException(List.of(), splitRequests, e);
}
}
+ /**
+ * Atomically updates the job field of file references, as long as the job field is currently unset. This will be
+ * used for compaction job input files.
+ *
+ * @param requests A list of {@link AssignJobIdRequest}s which should each be applied
+ * atomically
+ * @throws FileReferenceNotFoundException if a reference does not exist
+ * @throws FileReferenceAssignedToJobException if a reference is already assigned to a job
+ * @throws StateStoreException if the update fails for another reason
+ */
+ public void assignJobIds(List requests) throws StateStoreException {
+ AssignJobIdsTransaction.ignoringEmptyRequests(requests).ifPresent(this::addTransaction);
+ }
+
+ private void addTransaction(StateStoreTransaction> transaction) {
+ stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction).build());
+ }
+
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 55fb1788f6..7eae506c5e 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -441,7 +441,7 @@ void shouldThrowExceptionWhenSplittingFileHasBeenAssignedToTheJob() {
splitPartition("root", "L", "R", 5);
FileReference file = factory.rootFile("file", 100L);
update(store).addFile(file);
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file"))));
// When / Then
@@ -466,7 +466,7 @@ public void shouldMarkFileWithJobId() {
update(store).addFile(file);
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job", "root", List.of("file"))));
// Then
@@ -484,7 +484,7 @@ public void shouldMarkOneHalfOfSplitFileWithJobId() {
update(store).addFiles(List.of(left, right));
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job", "L", List.of("file"))));
// Then
@@ -500,7 +500,7 @@ public void shouldMarkMultipleFilesWithJobIds() {
update(store).addFiles(List.of(file1, file2));
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file1")),
assignJobOnPartitionToFiles("job2", "root", List.of("file2"))));
@@ -516,11 +516,11 @@ public void shouldNotMarkFileWithJobIdWhenOneIsAlreadySet() {
// Given
FileReference file = factory.rootFile("file", 100L);
update(store).addFile(file);
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file"))));
// When / Then
- assertThatThrownBy(() -> store.assignJobIds(List.of(
+ assertThatThrownBy(() -> update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job2", "root", List.of("file")))))
.isInstanceOf(FileReferenceAssignedToJobException.class);
assertThat(store.getFileReferences()).containsExactly(withJobId("job1", file));
@@ -534,11 +534,11 @@ public void shouldNotUpdateOtherFilesIfOneFileAlreadyHasJobId() {
FileReference file2 = factory.rootFile("file2", 100L);
FileReference file3 = factory.rootFile("file3", 100L);
update(store).addFiles(List.of(file1, file2, file3));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file2"))));
// When / Then
- assertThatThrownBy(() -> store.assignJobIds(List.of(
+ assertThatThrownBy(() -> update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job2", "root", List.of("file1", "file2", "file3")))))
.isInstanceOf(FileReferenceAssignedToJobException.class);
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(
@@ -553,7 +553,7 @@ public void shouldNotMarkFileWithJobIdWhenFileDoesNotExist() {
update(store).addFile(file);
// When / Then
- assertThatThrownBy(() -> store.assignJobIds(List.of(
+ assertThatThrownBy(() -> update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("requestedFile")))))
.isInstanceOf(FileReferenceNotFoundException.class);
assertThat(store.getFileReferences()).containsExactly(file);
@@ -563,7 +563,7 @@ public void shouldNotMarkFileWithJobIdWhenFileDoesNotExist() {
@Test
public void shouldNotMarkFileWithJobIdWhenFileDoesNotExistAndStoreIsEmpty() {
// When / Then
- assertThatThrownBy(() -> store.assignJobIds(List.of(
+ assertThatThrownBy(() -> update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file")))))
.isInstanceOf(FileReferenceNotFoundException.class);
assertThat(store.getFileReferences()).isEmpty();
@@ -579,7 +579,7 @@ public void shouldNotMarkFileWithJobIdWhenReferenceDoesNotExistInPartition() {
update(store).addFile(existingReference);
// When / Then
- assertThatThrownBy(() -> store.assignJobIds(List.of(
+ assertThatThrownBy(() -> update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file")))))
.isInstanceOf(FileReferenceNotFoundException.class);
assertThat(store.getFileReferences()).containsExactly(existingReference);
From 0c48ca6ff4896cbbf11f065e108aa260436d6f7e Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Wed, 19 Feb 2025 11:22:48 +0000
Subject: [PATCH 05/17] Refactor atomicallyReplaceFileReferencesWithNewOnes
into wrapper class and update references
---
.../ReplaceFileReferencesTransaction.java | 22 ++++++++++
.../testutils/StateStoreUpdatesWrapper.java | 26 ++++++++++++
.../TransactionLogFileReferenceStoreTest.java | 40 +++++++++----------
3 files changed, 68 insertions(+), 20 deletions(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
index 17b7907b5b..36337ab149 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
@@ -20,6 +20,11 @@
import sleeper.core.statestore.ReplaceFileReferencesRequest;
import sleeper.core.statestore.StateStoreException;
+import sleeper.core.statestore.exception.FileAlreadyExistsException;
+import sleeper.core.statestore.exception.FileNotFoundException;
+import sleeper.core.statestore.exception.FileReferenceNotAssignedToJobException;
+import sleeper.core.statestore.exception.FileReferenceNotFoundException;
+import sleeper.core.statestore.exception.NewReferenceSameAsOldReferenceException;
import sleeper.core.statestore.transactionlog.state.StateStoreFile;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
@@ -135,4 +140,21 @@ public boolean equals(Object obj) {
public String toString() {
return "ReplaceFileReferencesTransaction{jobs=" + jobs + "}";
}
+
+ /**
+ * Validates the transaction against the current state.
+ *
+ * @param stateStoreFiles the state
+ * @throws FileNotFoundException if an input file does not exist
+ * @throws FileReferenceNotFoundException if an input file is not referenced on the same partition
+ * @throws FileReferenceNotAssignedToJobException if an input file is not assigned to the job on this partition
+ * @throws FileAlreadyExistsException if the new file already exists in the state store
+ * @throws NewReferenceSameAsOldReferenceException if any of the input files are the same as the output file
+ */
+ public void validateStateChange(StateStoreFiles stateStoreFiles) {
+ for (ReplaceFileReferencesRequest job : jobs) {
+ job.validateNewReference();
+ job.validateStateChange(stateStoreFiles);
+ }
+ }
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index 8d38481c9a..2f2ea2c60c 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -21,18 +21,21 @@
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
+import sleeper.core.statestore.ReplaceFileReferencesRequest;
import sleeper.core.statestore.SplitFileReferenceRequest;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
import sleeper.core.statestore.exception.FileReferenceAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
+import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
import sleeper.core.statestore.exception.SplitRequestsFailedException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
import java.util.List;
@@ -153,6 +156,29 @@ public void assignJobIds(List requests) throws StateStoreExc
AssignJobIdsTransaction.ignoringEmptyRequests(requests).ifPresent(this::addTransaction);
}
+ /**
+ * Atomically applies the results of jobs. Removes file references for a job's input files, and adds a reference to
+ * an output file. This will be used for compaction.
+ *
+ * This will validate that the input files were assigned to the job.
+ *
+ * This will decrement the number of references for each of the input files. If no other references exist for those
+ * files, they will become available for garbage collection.
+ *
+ * @param requests requests for jobs to each have their results atomically applied
+ * @throws ReplaceRequestsFailedException if any of the updates fail
+ */
+ public void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException {
+ try {
+ ReplaceFileReferencesTransaction transaction = new ReplaceFileReferencesTransaction(requests);
+ stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction)
+ .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> transaction.validateStateChange(state)))
+ .build());
+ } catch (StateStoreException e) {
+ throw new ReplaceRequestsFailedException(requests, e);
+ }
+ }
+
private void addTransaction(StateStoreTransaction> transaction) {
stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction).build());
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 7eae506c5e..e210c8b472 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -610,7 +610,7 @@ void shouldCheckAllFilesAssigned() {
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
update(store).addFiles(List.of(file1, file2));
- store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1", "file2"))));
+ update(store).assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1", "file2"))));
// When / Then
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -624,7 +624,7 @@ void shouldCheckSomeFilesAssigned() {
FileReference file1 = factory.rootFile("file1", 100L);
FileReference file2 = factory.rootFile("file2", 100L);
update(store).addFiles(List.of(file1, file2));
- store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1"))));
+ update(store).assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "root", List.of("file1"))));
// When / Then
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -643,7 +643,7 @@ void shouldCheckFilesAssignedOnOnePartition() {
FileReference file2L = splitFile(file2, "L");
FileReference file2R = splitFile(file2, "R");
update(store).addFiles(List.of(file1L, file1R, file2L, file2R));
- store.assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "L", List.of("file1", "file2"))));
+ update(store).assignJobIds(List.of(assignJobOnPartitionToFiles("test-job", "L", List.of("file1", "file2"))));
// When / Then
assertThat(store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -678,7 +678,7 @@ void shouldFailIfFileDoesNotExistOnPartition() {
void shouldFailIfFileAssignedToOtherJob() {
// Given
update(store).addFile(factory.rootFile("file", 100L));
- store.assignJobIds(List.of(assignJobOnPartitionToFiles("A", "root", List.of("file"))));
+ update(store).assignJobIds(List.of(assignJobOnPartitionToFiles("A", "root", List.of("file"))));
// When / Then
assertThatThrownBy(() -> store.isAssigned(List.of(CheckFileAssignmentsRequest.isJobAssignedToFilesOnPartition(
@@ -710,9 +710,9 @@ public void shouldSetFileReadyForGC() {
update(store).addFile(oldFile);
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("oldFile"), newFile)));
// Then
@@ -735,10 +735,10 @@ void shouldApplyMultipleCompactions() {
update(store).addFiles(List.of(oldFile1, oldFile2));
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile1")),
assignJobOnPartitionToFiles("job2", "root", List.of("oldFile2"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile1"), newFile1),
replaceJobFileReferences("job2", List.of("oldFile2"), newFile2)));
@@ -760,13 +760,13 @@ void shouldFailToSetReadyForGCWhenAlreadyReadyForGC() {
update(store).addFile(oldFile);
// When
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("oldFile"), newFile)));
// Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile"), newFile))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileReferenceNotFoundException.class);
@@ -787,7 +787,7 @@ void shouldFailWhenFilesToMarkAsReadyForGCAreNotAssignedToJob() {
update(store).addFile(oldFile);
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile"), newFile))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileReferenceNotAssignedToJobException.class);
@@ -799,7 +799,7 @@ public void shouldFailToSetFileReadyForGCWhichDoesNotExist() {
FileReference newFile = factory.rootFile("newFile", 100L);
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile"), newFile))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileNotFoundException.class);
@@ -813,11 +813,11 @@ public void shouldFailToSetFilesReadyForGCWhenOneDoesNotExist() {
FileReference oldFile1 = factory.rootFile("oldFile1", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
update(store).addFile(oldFile1);
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile1"))));
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile1", "oldFile2"), newFile))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileNotFoundException.class);
@@ -835,7 +835,7 @@ public void shouldFailToSetFileReadyForGCWhenReferenceDoesNotExistInPartition()
update(store).addFile(existingReference);
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("file"), factory.rootFile("file2", 100L)))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileReferenceNotFoundException.class);
@@ -848,11 +848,11 @@ void shouldFailWhenFileToBeMarkedReadyForGCHasSameFileNameAsNewFile() {
// Given
FileReference file = factory.rootFile("file1", 100L);
update(store).addFile(file);
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("file1"))));
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("file1"), file))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(NewReferenceSameAsOldReferenceException.class);
@@ -869,11 +869,11 @@ public void shouldFailWhenOutputFileAlreadyExists() {
FileReference existingReference = splitFile(file, "L");
FileReference newReference = factory.partitionFile("L", "newFile", 100L);
update(store).addFiles(List.of(existingReference, newReference));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("oldFile"))));
// When / Then
- assertThatThrownBy(() -> store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ assertThatThrownBy(() -> update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("oldFile"), newReference))))
.isInstanceOf(ReplaceRequestsFailedException.class)
.hasCauseInstanceOf(FileAlreadyExistsException.class);
From 34c80f3b543313a62c755b162a7e46d3da93b479 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 11:32:59 +0000
Subject: [PATCH 06/17] Add
StateStoreUpdatesWrapper.deleteGarbageCollectedFileReferenceCounts
---
.../testutils/StateStoreUpdatesWrapper.java | 23 +++++++++
.../TransactionLogFileReferenceStoreTest.java | 48 +++++++++----------
2 files changed, 47 insertions(+), 24 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index 2f2ea2c60c..fc8e85d027 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -26,6 +26,8 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
+import sleeper.core.statestore.exception.FileHasReferencesException;
+import sleeper.core.statestore.exception.FileNotFoundException;
import sleeper.core.statestore.exception.FileReferenceAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
@@ -35,6 +37,7 @@
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.DeleteFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
@@ -179,6 +182,26 @@ public void atomicallyReplaceFileReferencesWithNewOnes(List
+ * If there are any remaining internal references for the files on partitions, this should fail, as it should not be
+ * possible to reach that state.
+ *
+ * If the reference count is non-zero for any other reason, it may be that the count was incremented after the file
+ * was ready for garbage collection. This should fail in that case as well, as we would like this to not be
+ * possible.
+ *
+ * @param filenames The names of files that were deleted.
+ * @throws FileNotFoundException if a file does not exist
+ * @throws FileHasReferencesException if a file still has references
+ * @throws StateStoreException if the update fails for another reason
+ */
+ public void deleteGarbageCollectedFileReferenceCounts(List filenames) throws StateStoreException {
+ addTransaction(new DeleteFilesTransaction(filenames));
+ }
+
private void addTransaction(StateStoreTransaction> transaction) {
stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction).build());
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index e210c8b472..0ee67c91d2 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -925,9 +925,9 @@ public void shouldNotFindFileWhichHasTwoReferencesAndOnlyOneWasMarkedAsReadyForG
FileReference compactionOutputFile = factory.partitionFile("L", "compactedFile", 100L);
store.fixFileUpdateTime(updateTime);
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("splitFile"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("splitFile"), compactionOutputFile)));
// When / Then
@@ -948,10 +948,10 @@ public void shouldFindFileWhichHasTwoReferencesAndBothWereMarkedAsReadyForGC() {
FileReference rightOutputFile = factory.partitionFile("R", "rightOutput", 100L);
store.fixFileUpdateTime(updateTime);
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("readyForGc")),
assignJobOnPartitionToFiles("job2", "R", List.of("readyForGc"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("readyForGc"), leftOutputFile),
replaceJobFileReferences("job2", List.of("readyForGc"), rightOutputFile)));
@@ -979,14 +979,14 @@ public void shouldNotFindSplitFileWhenOnlyFirstReadyForGCUpdateIsOldEnough() {
// And ingest and compactions happened at the expected times
store.fixFileUpdateTime(ingestTime);
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("readyForGc")),
assignJobOnPartitionToFiles("job2", "R", List.of("readyForGc"))));
store.fixFileUpdateTime(firstCompactionTime);
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("readyForGc"), leftOutputFile)));
store.fixFileUpdateTime(secondCompactionTime);
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job2", List.of("readyForGc"), rightOutputFile)));
// When / Then
@@ -1005,13 +1005,13 @@ public void shouldDeleteGarbageCollectedFile() {
FileReference oldFile = factory.rootFile("oldFile", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
update(store).addFile(oldFile);
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("oldFile"), newFile)));
// When
- store.deleteGarbageCollectedFileReferenceCounts(List.of("oldFile"));
+ update(store).deleteGarbageCollectedFileReferenceCounts(List.of("oldFile"));
// Then
assertThat(store.getReadyForGCFilenamesBefore(AFTER_DEFAULT_UPDATE_TIME)).isEmpty();
@@ -1029,15 +1029,15 @@ void shouldDeleteGarbageCollectedFileSplitAcrossTwoPartitions() {
// And the file was ingested as two references, then compacted into each partition
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file")),
assignJobOnPartitionToFiles("job2", "R", List.of("file"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(
replaceJobFileReferences("job1", List.of("file"), leftOutputFile),
replaceJobFileReferences("job2", List.of("file"), rightOutputFile)));
// When
- store.deleteGarbageCollectedFileReferenceCounts(List.of("file"));
+ update(store).deleteGarbageCollectedFileReferenceCounts(List.of("file"));
// Then
assertThat(store.getReadyForGCFilenamesBefore(AFTER_DEFAULT_UPDATE_TIME)).isEmpty();
@@ -1052,14 +1052,14 @@ public void shouldFailToDeleteActiveFile() {
update(store).addFile(file);
// When / Then
- assertThatThrownBy(() -> store.deleteGarbageCollectedFileReferenceCounts(List.of("test")))
+ assertThatThrownBy(() -> update(store).deleteGarbageCollectedFileReferenceCounts(List.of("test")))
.isInstanceOf(FileHasReferencesException.class);
}
@Test
public void shouldFailToDeleteFileWhichWasNotAdded() {
// When / Then
- assertThatThrownBy(() -> store.deleteGarbageCollectedFileReferenceCounts(List.of("test")))
+ assertThatThrownBy(() -> update(store).deleteGarbageCollectedFileReferenceCounts(List.of("test")))
.isInstanceOf(FileNotFoundException.class);
}
@@ -1072,13 +1072,13 @@ public void shouldFailToDeleteActiveFileWhenOneOfTwoSplitRecordsIsReadyForGC() {
FileReference rightFile = splitFile(rootFile, "R");
FileReference leftOutputFile = factory.partitionFile("L", "leftOutput", 100L);
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("file"), leftOutputFile)));
// When / Then
- assertThatThrownBy(() -> store.deleteGarbageCollectedFileReferenceCounts(List.of("file")))
+ assertThatThrownBy(() -> update(store).deleteGarbageCollectedFileReferenceCounts(List.of("file")))
.isInstanceOf(FileHasReferencesException.class);
}
@@ -1089,14 +1089,14 @@ public void shouldDeleteGarbageCollectedFileWhileIteratingThroughReadyForGCFiles
FileReference oldFile2 = factory.rootFile("oldFile2", 100L);
FileReference newFile = factory.rootFile("newFile", 100L);
update(store).addFiles(List.of(oldFile1, oldFile2));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "root", List.of("oldFile1", "oldFile2"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("oldFile1", "oldFile2"), newFile)));
// When
Iterator iterator = store.getReadyForGCFilenamesBefore(Instant.ofEpochMilli(Long.MAX_VALUE)).iterator();
- store.deleteGarbageCollectedFileReferenceCounts(List.of(iterator.next()));
+ update(store).deleteGarbageCollectedFileReferenceCounts(List.of(iterator.next()));
// Then
assertThat(store.getReadyForGCFilenamesBefore(AFTER_DEFAULT_UPDATE_TIME))
@@ -1113,7 +1113,7 @@ public void shouldFailToDeleteActiveFileWhenAlsoDeletingReadyForGCFile() {
fileWithReferences(List.of(activeFile))));
// When / Then
- assertThatThrownBy(() -> store.deleteGarbageCollectedFileReferenceCounts(List.of("gcFile", "activeFile")))
+ assertThatThrownBy(() -> update(store).deleteGarbageCollectedFileReferenceCounts(List.of("gcFile", "activeFile")))
.isInstanceOf(FileHasReferencesException.class);
assertThat(store.getFileReferences())
.containsExactly(activeFile);
@@ -1190,9 +1190,9 @@ void shouldReportFileSplitOverTwoPartitionsWithOneSideCompacted() {
FileReference rightFile = splitFile(rootFile, "R");
FileReference outputFile = factory.partitionFile("L", 50L);
update(store).addFiles(List.of(leftFile, rightFile));
- store.assignJobIds(List.of(
+ update(store).assignJobIds(List.of(
assignJobOnPartitionToFiles("job1", "L", List.of("file"))));
- store.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
+ update(store).atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"job1", List.of("file"), outputFile)));
// When
From 859ed6eb1e574c2cf44f58902146cbbfd685c557 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 11:42:57 +0000
Subject: [PATCH 07/17] Add StateStoreUpdatesWrapper.clearSleeperTable
---
.../testutils/StateStoreUpdatesWrapper.java | 29 +++++++++++++++
.../TransactionLogFileReferenceStoreTest.java | 4 +--
.../TransactionLogPartitionStoreTest.java | 35 +++++++++++++++++++
3 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index fc8e85d027..53cfecd4a0 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -37,7 +37,9 @@
import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.ClearFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.DeleteFilesTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
@@ -202,6 +204,33 @@ public void deleteGarbageCollectedFileReferenceCounts(List filenames) th
addTransaction(new DeleteFilesTransaction(filenames));
}
+ /**
+ * Clears all file data and partition data from the state store. Note that this does not delete any of the actual
+ * files, and after calling this method the store must be initialised before the Sleeper table can be used again.
+ *
+ * @throws StateStoreException if the update fails
+ */
+ public void clearSleeperTable() throws StateStoreException {
+ clearFileData();
+ clearPartitionData();
+ }
+
+ /**
+ * Clears all file data from the file reference store. Note that this does not delete any of the actual files.
+ */
+ public void clearFileData() throws StateStoreException {
+ addTransaction(new ClearFilesTransaction());
+ }
+
+ /**
+ * Clears all partition data from the store. Note that this will invalidate any file references held in the store,
+ * so this should only be used when no files are present. The store must be initialised before the Sleeper table can
+ * be used again. Any file references will need to be added again.
+ */
+ public void clearPartitionData() throws StateStoreException {
+ addTransaction(new InitialisePartitionsTransaction(List.of()));
+ }
+
private void addTransaction(StateStoreTransaction> transaction) {
stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction).build());
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index 0ee67c91d2..ea9a0af057 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -1282,7 +1282,7 @@ void shouldDeleteReferencedFileOnClear() {
update(store).addFile(file);
// When
- store.clearSleeperTable();
+ update(store).clearSleeperTable();
store.initialise();
// Then
@@ -1302,7 +1302,7 @@ void shouldDeleteUnreferencedFileOnClear() {
.build()));
// When
- store.clearSleeperTable();
+ update(store).clearSleeperTable();
store.initialise();
// Then
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
index 2749496b33..633f8bd817 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
@@ -33,6 +33,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
+import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;
public class TransactionLogPartitionStoreTest extends InMemoryTransactionLogStateStoreTestBase {
@@ -353,4 +354,38 @@ public void shouldFailSplittingAPartitionWhenNewPartitionIsNotALeaf() {
.isInstanceOf(StateStoreException.class);
}
}
+
+ @Nested
+ @DisplayName("Clear partitions")
+ class ClearPartitions {
+ @Test
+ void shouldDeleteSinglePartitionOnClear() {
+ // Given
+ Schema schema = schemaWithKey("key", new IntType());
+ initialiseWithSchema(schema);
+
+ // When
+ update(store).clearSleeperTable();
+
+ // Then
+ assertThat(store.getAllPartitions()).isEmpty();
+ assertThat(store.getLeafPartitions()).isEmpty();
+ }
+
+ @Test
+ void shouldDeletePartitionTreeOnClear() {
+ // Given
+ Schema schema = schemaWithKey("key", new IntType());
+ initialiseWithPartitions(new PartitionsBuilder(schema)
+ .rootFirst("root")
+ .splitToNewChildren("root", "L", "R", 123));
+
+ // When
+ update(store).clearSleeperTable();
+
+ // Then
+ assertThat(store.getAllPartitions()).isEmpty();
+ assertThat(store.getLeafPartitions()).isEmpty();
+ }
+ }
}
From abfcce748ad373bc9acafc68e40f424dc0b1516b Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 11:45:47 +0000
Subject: [PATCH 08/17] Remove remaining update method references from
TransactionLogFileReferenceStoreTest
---
.../TransactionLogFileReferenceStoreTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
index ea9a0af057..73fc1cf0fd 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStoreTest.java
@@ -202,10 +202,10 @@ void shouldFailToAddSameFileTwice() {
update(store).addFile(file);
// When / Then
- assertThatThrownBy(() -> store.addFile(file))
+ assertThatThrownBy(() -> update(store).addFile(file))
.isInstanceOf(FileAlreadyExistsException.class);
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(withLastUpdate(updateTime, file));
- assertThatThrownBy(() -> store.addFilesWithReferences(List.of(fileWithReferences(file))))
+ assertThatThrownBy(() -> update(store).addFilesWithReferences(List.of(fileWithReferences(file))))
.isInstanceOf(FileAlreadyExistsException.class);
assertThat(store.getFileReferences()).containsExactlyInAnyOrder(withLastUpdate(updateTime, file));
}
@@ -220,10 +220,10 @@ void shouldFailToAddAnotherReferenceForSameFile() {
update(store).addFile(leftFile);
// When / Then
- assertThatThrownBy(() -> store.addFile(rightFile))
+ assertThatThrownBy(() -> update(store).addFile(rightFile))
.isInstanceOf(FileAlreadyExistsException.class);
assertThat(store.getFileReferences()).containsExactly(leftFile);
- assertThatThrownBy(() -> store.addFilesWithReferences(List.of(fileWithReferences(rightFile))))
+ assertThatThrownBy(() -> update(store).addFilesWithReferences(List.of(fileWithReferences(rightFile))))
.isInstanceOf(FileAlreadyExistsException.class);
assertThat(store.getFileReferences()).containsExactly(leftFile);
}
From 9564a9d9ab21dfbf933bfee47be5d8e217222395 Mon Sep 17 00:00:00 2001
From: patchwork01 <110390516+patchwork01@users.noreply.github.com>
Date: Wed, 19 Feb 2025 12:00:48 +0000
Subject: [PATCH 09/17] Introduce StateStoreUpdatesWrapper.initialise
---
.../TransactionLogPartitionStore.java | 4 +---
.../impl/InitialisePartitionsTransaction.java | 13 ++++++++++
...emoryTransactionLogStateStoreTestBase.java | 5 ++--
.../testutils/StateStoreUpdatesWrapper.java | 24 +++++++++++++++++++
4 files changed, 41 insertions(+), 5 deletions(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java
index 778b783310..8cf56a3514 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java
@@ -16,7 +16,6 @@
package sleeper.core.statestore.transactionlog;
import sleeper.core.partition.Partition;
-import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.PartitionStore;
import sleeper.core.statestore.StateStoreException;
@@ -29,7 +28,6 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
@@ -71,7 +69,7 @@ public List getLeafPartitions() throws StateStoreException {
@Override
public void initialise() throws StateStoreException {
- initialise(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
+ head.addTransaction(clock.instant(), InitialisePartitionsTransaction.singlePartition(schema));
}
@Override
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
index 17b288b4c4..b9f1f94115 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
@@ -16,10 +16,13 @@
package sleeper.core.statestore.transactionlog.transaction.impl;
import sleeper.core.partition.Partition;
+import sleeper.core.partition.PartitionsFromSplitPoints;
+import sleeper.core.schema.Schema;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;
import java.time.Instant;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -35,6 +38,16 @@ public InitialisePartitionsTransaction(List partitions) {
this.partitions = partitions;
}
+ /**
+ * Creates a transaction to initialise the table with a single root partition.
+ *
+ * @param schema the table schema
+ * @return the transaction
+ */
+ public static InitialisePartitionsTransaction singlePartition(Schema schema) {
+ return new InitialisePartitionsTransaction(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
+ }
+
@Override
public void validate(StateStorePartitions stateStorePartitions) {
}
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStoreTestBase.java b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStoreTestBase.java
index 79ad8dc8ec..8530811915 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStoreTestBase.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStoreTestBase.java
@@ -26,6 +26,7 @@
import java.util.List;
import static sleeper.core.statestore.FileReferenceTestData.DEFAULT_UPDATE_TIME;
+import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;
import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName;
public class InMemoryTransactionLogStateStoreTestBase {
@@ -43,12 +44,12 @@ public class InMemoryTransactionLogStateStoreTestBase {
protected void initialiseWithSchema(Schema schema) {
createStore(new PartitionsBuilder(schema).singlePartition("root"));
- store.initialise();
+ update(store).initialise(schema);
}
protected void initialiseWithPartitions(PartitionsBuilder partitions) {
createStore(partitions);
- store.initialise(partitions.buildList());
+ update(store).initialise(partitions.buildList());
}
private void createStore(PartitionsBuilder partitions) {
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index 53cfecd4a0..77c9feb54c 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -18,6 +18,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sleeper.core.partition.Partition;
+import sleeper.core.schema.Schema;
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
@@ -67,6 +69,28 @@ public static StateStoreUpdatesWrapper update(StateStore stateStore) {
return new StateStoreUpdatesWrapper(stateStore);
}
+ /**
+ * Initialises the store with a single partition covering all keys. This is the root partition which may be split
+ * in the future.
+ *
+ * @param schema the Sleeper table schema
+ * @throws StateStoreException if the update fails
+ */
+ public void initialise(Schema schema) throws StateStoreException {
+ addTransaction(InitialisePartitionsTransaction.singlePartition(schema));
+ }
+
+ /**
+ * Initialises the store with the given partitions. These should build into a complete partition tree, where all
+ * partitions are linked to a single root.
+ *
+ * @param partitions the partitions
+ * @throws StateStoreException if the update fails
+ */
+ public void initialise(List partitions) throws StateStoreException {
+ addTransaction(new InitialisePartitionsTransaction(partitions));
+ }
+
/**
* Adds a file to the table, with one reference.
*
From 1ee06a0a279a43f2e82ce09e133f68c7a388600e Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Wed, 19 Feb 2025 14:03:16 +0000
Subject: [PATCH 10/17] Move method to correct location in file
---
.../ReplaceFileReferencesTransaction.java | 34 +++++++++----------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
index 36337ab149..873e0d9e48 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
@@ -119,6 +119,23 @@ public void reportJobsAllFailed(CompactionJobTracker tracker, TableStatus sleepe
}
}
+ /**
+ * Validates the transaction against the current state.
+ *
+ * @param stateStoreFiles the state
+ * @throws FileNotFoundException if an input file does not exist
+ * @throws FileReferenceNotFoundException if an input file is not referenced on the same partition
+ * @throws FileReferenceNotAssignedToJobException if an input file is not assigned to the job on this partition
+ * @throws FileAlreadyExistsException if the new file already exists in the state store
+ * @throws NewReferenceSameAsOldReferenceException if any of the input files are the same as the output file
+ */
+ public void validateStateChange(StateStoreFiles stateStoreFiles) {
+ for (ReplaceFileReferencesRequest job : jobs) {
+ job.validateNewReference();
+ job.validateStateChange(stateStoreFiles);
+ }
+ }
+
@Override
public int hashCode() {
return Objects.hash(jobs);
@@ -140,21 +157,4 @@ public boolean equals(Object obj) {
public String toString() {
return "ReplaceFileReferencesTransaction{jobs=" + jobs + "}";
}
-
- /**
- * Validates the transaction against the current state.
- *
- * @param stateStoreFiles the state
- * @throws FileNotFoundException if an input file does not exist
- * @throws FileReferenceNotFoundException if an input file is not referenced on the same partition
- * @throws FileReferenceNotAssignedToJobException if an input file is not assigned to the job on this partition
- * @throws FileAlreadyExistsException if the new file already exists in the state store
- * @throws NewReferenceSameAsOldReferenceException if any of the input files are the same as the output file
- */
- public void validateStateChange(StateStoreFiles stateStoreFiles) {
- for (ReplaceFileReferencesRequest job : jobs) {
- job.validateNewReference();
- job.validateStateChange(stateStoreFiles);
- }
- }
}
From 5de500d164ab6b0a5a987f59395feaea85886607 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 10:14:35 +0000
Subject: [PATCH 11/17] Default method for new check of state before
---
.../transaction/StateStoreTransaction.java | 9 +++++++++
.../impl/InitialisePartitionsTransaction.java | 7 +++++++
2 files changed, 16 insertions(+)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
index 0eda0ae51e..86aacab2dc 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
@@ -15,6 +15,7 @@
*/
package sleeper.core.statestore.transactionlog.transaction;
+import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import java.time.Instant;
@@ -41,4 +42,12 @@ public interface StateStoreTransaction {
* @param updateTime the time that the update was added to the state store
*/
void apply(T state, Instant updateTime);
+
+ /**
+ * Checks the present state of the state store.
+ *
+ * @param stateStore the state store
+ */
+ default void checkBefore(StateStore stateStore) throws StateStoreException {
+ }
}
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
index b9f1f94115..d9459c3923 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
@@ -18,6 +18,8 @@
import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
+import sleeper.core.statestore.StateStore;
+import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;
@@ -79,4 +81,9 @@ public boolean equals(Object obj) {
public String toString() {
return "InitialisePartitionsTransaction{partitions=" + partitions + "}";
}
+
+ @Override
+ public void checkBefore(StateStore stateStore) throws StateStoreException {
+ }
+
}
From 695bda6c01882d571789a4030f474c31cb1cd2f6 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:18:11 +0000
Subject: [PATCH 12/17] Relocate check before method
---
.../transaction/impl/InitialisePartitionsTransaction.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
index d9459c3923..6a81760701 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
@@ -60,6 +60,10 @@ public void apply(StateStorePartitions stateStorePartitions, Instant updateTime)
partitions.forEach(stateStorePartitions::put);
}
+ @Override
+ public void checkBefore(StateStore stateStore) throws StateStoreException {
+ }
+
@Override
public int hashCode() {
return Objects.hash(partitions);
@@ -82,8 +86,4 @@ public String toString() {
return "InitialisePartitionsTransaction{partitions=" + partitions + "}";
}
- @Override
- public void checkBefore(StateStore stateStore) throws StateStoreException {
- }
-
}
From 2f2b034ce9b57ad053c677991fa87c0e51b9e754 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:23:00 +0000
Subject: [PATCH 13/17] Implement functionality for checkBefore
---
.../java/sleeper/core/statestore/DelegatingStateStore.java | 2 ++
.../transaction/impl/InitialisePartitionsTransaction.java | 3 +++
2 files changed, 5 insertions(+)
diff --git a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java
index b0326522a8..2efebae000 100644
--- a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java
+++ b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java
@@ -190,11 +190,13 @@ public void fixPartitionUpdateTime(Instant now) {
@Override
public void addFilesTransaction(AddTransactionRequest request) {
+ request.getTransaction().checkBefore(this);
fileReferenceStore.addFilesTransaction(request);
}
@Override
public void addPartitionsTransaction(AddTransactionRequest request) {
+ request.getTransaction().checkBefore(this);
partitionStore.addPartitionsTransaction(request);
}
}
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
index 6a81760701..c37e9162e1 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java
@@ -62,6 +62,9 @@ public void apply(StateStorePartitions stateStorePartitions, Instant updateTime)
@Override
public void checkBefore(StateStore stateStore) throws StateStoreException {
+ if (!stateStore.hasNoFiles()) {
+ throw new StateStoreException("Cannot initialise state store when files are present");
+ }
}
@Override
From fc039e32383e78322b24c61e5c037dd6638613c4 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:23:49 +0000
Subject: [PATCH 14/17] Update method call for wrapper
---
.../transactionlog/TransactionLogPartitionStoreTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
index 633f8bd817..2394b60312 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
@@ -194,7 +194,7 @@ void shouldReinitialisePartitionsWhenNoFilesArePresent() {
initialiseWithPartitions(partitionsBefore);
// When
- store.initialise(partitionsAfter.buildList());
+ update(store).initialise(partitionsAfter.buildList());
// Then
assertThat(store.getAllPartitions())
@@ -213,10 +213,10 @@ void shouldNotReinitialisePartitionsWhenAFileIsPresent() {
.splitToNewChildren("root", "after1", "after2", 10L);
initialiseWithPartitions(partitionsBefore);
- store.addFile(factory.partitionFile("before2", 100L));
+ update(store).addFile(factory.partitionFile("before2", 100L));
// When / Then
- assertThatThrownBy(() -> store.initialise(partitionsAfter.buildList()))
+ assertThatThrownBy(() -> update(store).initialise(partitionsAfter.buildList()))
.isInstanceOf(StateStoreException.class);
assertThat(store.getAllPartitions())
.containsExactlyInAnyOrderElementsOf(partitionsBefore.buildList());
From d5c96831215f7c194bab10118304a1bd588d6b38 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:31:35 +0000
Subject: [PATCH 15/17] Update wrapper for split partitions nested class
---
.../sleeper/core/partition/PartitionsBuilder.java | 4 +++-
.../testutils/StateStoreUpdatesWrapper.java | 14 ++++++++++++++
.../TransactionLogPartitionStoreTest.java | 12 ++++++------
3 files changed, 23 insertions(+), 7 deletions(-)
diff --git a/java/core/src/test/java/sleeper/core/partition/PartitionsBuilder.java b/java/core/src/test/java/sleeper/core/partition/PartitionsBuilder.java
index ff62e6a908..f2bccbd45f 100644
--- a/java/core/src/test/java/sleeper/core/partition/PartitionsBuilder.java
+++ b/java/core/src/test/java/sleeper/core/partition/PartitionsBuilder.java
@@ -25,6 +25,8 @@
import java.util.Optional;
import java.util.stream.Collectors;
+import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;
+
/**
* A convenience class for specifying a partition tree. This includes methods to define a tree to be readable in a test,
* including shorthand which would not be possible with {@link PartitionFactory}.
@@ -122,7 +124,7 @@ public void applySplit(StateStore stateStore, String partitionId) {
Partition toSplit = partitionById(partitionId);
Partition left = partitionById(toSplit.getChildPartitionIds().get(0));
Partition right = partitionById(toSplit.getChildPartitionIds().get(1));
- stateStore.atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
+ update(stateStore).atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
}
/**
diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
index 77c9feb54c..d9d5707040 100644
--- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
+++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java
@@ -44,6 +44,7 @@
import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction;
+import sleeper.core.statestore.transactionlog.transaction.impl.SplitPartitionTransaction;
import java.util.List;
@@ -91,6 +92,19 @@ public void initialise(List partitions) throws StateStoreException {
addTransaction(new InitialisePartitionsTransaction(partitions));
}
+ /**
+ * Atomically splits a partition to create child partitions. Updates the existing partition to record it as split,
+ * and creates new leaf partitions.
+ *
+ * @param splitPartition The {@link Partition} to be updated (must refer to the new leaves as children).
+ * @param newPartition1 The first new {@link Partition} (must be a leaf partition).
+ * @param newPartition2 The second new {@link Partition} (must be a leaf partition).
+ * @throws StateStoreException if split is not valid or update fails
+ */
+ public void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, Partition newPartition1, Partition newPartition2) throws StateStoreException {
+ addTransaction(new SplitPartitionTransaction(splitPartition, List.of(newPartition1, newPartition2)));
+ }
+
/**
* Adds a file to the table, with one reference.
*
diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
index 2394b60312..fcfd12f158 100644
--- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
+++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStoreTest.java
@@ -238,7 +238,7 @@ public void shouldSplitAPartition() {
.buildTree();
// When
- store.atomicallyUpdatePartitionAndCreateNewOnes(
+ update(store).atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"),
tree.getPartition("leftChild"),
tree.getPartition("rightChild"));
@@ -280,13 +280,13 @@ public void shouldFailSplittingAPartitionWhichHasAlreadyBeenSplit() {
.rootFirst("root")
.splitToNewChildren("root", "leftChild", "rightChild", 0L)
.buildTree();
- store.atomicallyUpdatePartitionAndCreateNewOnes(
+ update(store).atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"),
tree.getPartition("leftChild"),
tree.getPartition("rightChild"));
// When / Then
- assertThatThrownBy(() -> store.atomicallyUpdatePartitionAndCreateNewOnes(
+ assertThatThrownBy(() -> update(store).atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"),
tree.getPartition("leftChild"),
tree.getPartition("rightChild")))
@@ -305,7 +305,7 @@ public void shouldFailSplittingAPartitionWithWrongChildren() {
.buildTree();
// When / Then
- assertThatThrownBy(() -> store.atomicallyUpdatePartitionAndCreateNewOnes(
+ assertThatThrownBy(() -> update(store).atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"),
tree.getPartition("LL"),
tree.getPartition("LR")))
@@ -328,7 +328,7 @@ public void shouldFailSplittingAPartitionWithChildrenOfWrongParent() {
.buildTree();
// When / Then
- assertThatThrownBy(() -> store.atomicallyUpdatePartitionAndCreateNewOnes(
+ assertThatThrownBy(() -> update(store).atomicallyUpdatePartitionAndCreateNewOnes(
parentTree.getPartition("root"),
childrenTree.getPartition("child1"),
childrenTree.getPartition("child2")))
@@ -347,7 +347,7 @@ public void shouldFailSplittingAPartitionWhenNewPartitionIsNotALeaf() {
.buildTree();
// When / Then
- assertThatThrownBy(() -> store.atomicallyUpdatePartitionAndCreateNewOnes(
+ assertThatThrownBy(() -> update(store).atomicallyUpdatePartitionAndCreateNewOnes(
tree.getPartition("root"),
tree.getPartition("L"), // Not a leaf
tree.getPartition("R")))
From c89e4f5f461eb8c7e7b0cd350719e2a2276e0847 Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:52:39 +0000
Subject: [PATCH 16/17] Update javadoc
---
.../transactionlog/transaction/StateStoreTransaction.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
index 86aacab2dc..449145b2ad 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/StateStoreTransaction.java
@@ -44,9 +44,13 @@ public interface StateStoreTransaction {
void apply(T state, Instant updateTime);
/**
- * Checks the present state of the state store.
+ * Validates against the state store before attempting to add this transaction. Should not be relied upon
+ * but can be used as an extra check against state that this transaction does not operate on. Note that
+ * any checks from this are before the state is updated as part of the transaction. This cannot validate
+ * against the state the transaction is applied to as it may be updated by another process in the meantime.
*
* @param stateStore the state store
+ * @see #validate
*/
default void checkBefore(StateStore stateStore) throws StateStoreException {
}
From 4d7b1a7d17aaa18aff04cffb0d39692f7de5caae Mon Sep 17 00:00:00 2001
From: rtjd6554
Date: Thu, 20 Feb 2025 11:56:16 +0000
Subject: [PATCH 17/17] Update javadoc
---
.../transaction/impl/ReplaceFileReferencesTransaction.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
index 873e0d9e48..0f6d80cc10 100644
--- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
+++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java
@@ -120,7 +120,8 @@ public void reportJobsAllFailed(CompactionJobTracker tracker, TableStatus sleepe
}
/**
- * Validates the transaction against the current state.
+ * Validates the transaction against the current state. Used during a synchronous commit to report on any
+ * failures.
*
* @param stateStoreFiles the state
* @throws FileNotFoundException if an input file does not exist