diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java index 3e6109e7b1..73b941cdc2 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverIT.java @@ -87,6 +87,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_NAME; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.core.tracker.ingest.job.IngestJobStatusTestData.ingestFinishedStatus; import static sleeper.core.tracker.job.run.JobRunSummaryTestHelper.summary; import static sleeper.core.tracker.job.run.JobRunTestData.jobRunOnTask; @@ -500,7 +501,7 @@ private static void writeRecordsToFile(List records, String file) throws private StateStore createTable(InstanceProperties instanceProperties, TableProperties tableProperties, List splitPoints) { tablePropertiesStore(instanceProperties).save(tableProperties); StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf).getStateStore(tableProperties); - stateStore.initialise(new PartitionsFromSplitPoints(getSchema(), splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct()); return stateStore; } diff --git a/java/clients/src/test/java/sleeper/clients/admin/FilesStatusReportScreenTest.java b/java/clients/src/test/java/sleeper/clients/admin/FilesStatusReportScreenTest.java index 6fc0a6b710..3789641225 100644 --- a/java/clients/src/test/java/sleeper/clients/admin/FilesStatusReportScreenTest.java +++ b/java/clients/src/test/java/sleeper/clients/admin/FilesStatusReportScreenTest.java @@ -59,7 +59,7 @@ class FilesStatusReportScreenTest extends AdminClientMockStoreBase { @BeforeEach void setUp() { setInstanceProperties(instanceProperties, tableProperties); - stateStore.initialise(PartitionsBuilderSplitsFirst.leavesWithSplits( + update(stateStore).initialise(PartitionsBuilderSplitsFirst.leavesWithSplits( schema, List.of("A", "B"), List.of("aaa")) .parentJoining("parent", "A", "B").buildList()); update(stateStore).addFiles(FileReferenceFactory.from(stateStore).singleFileInEachLeafPartitionWithRecords(5).toList()); diff --git a/java/clients/src/test/java/sleeper/clients/admin/PartitionsStatusReportScreenTest.java b/java/clients/src/test/java/sleeper/clients/admin/PartitionsStatusReportScreenTest.java index e779a04e18..a1e9ed739f 100644 --- a/java/clients/src/test/java/sleeper/clients/admin/PartitionsStatusReportScreenTest.java +++ b/java/clients/src/test/java/sleeper/clients/admin/PartitionsStatusReportScreenTest.java @@ -64,7 +64,7 @@ void setUp() { @Test void shouldRunPartitionStatusReport() throws Exception { // Given - stateStore.initialise(PartitionsBuilderSplitsFirst.leavesWithSplits( + update(stateStore).initialise(PartitionsBuilderSplitsFirst.leavesWithSplits( schema, List.of("A", "B"), List.of("aaa")) .parentJoining("parent", "A", "B").buildList()); update(stateStore).addFiles(FileReferenceFactory.from(stateStore).singleFileInEachLeafPartitionWithRecords(5).toList()); diff --git a/java/clients/src/test/java/sleeper/clients/status/partitions/ExportSplitPointsTest.java b/java/clients/src/test/java/sleeper/clients/status/partitions/ExportSplitPointsTest.java index b89c0ce7ee..23eceab158 100644 --- a/java/clients/src/test/java/sleeper/clients/status/partitions/ExportSplitPointsTest.java +++ b/java/clients/src/test/java/sleeper/clients/status/partitions/ExportSplitPointsTest.java @@ -37,6 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class ExportSplitPointsTest { InstanceProperties instanceProperties = createTestInstanceProperties(); @@ -62,7 +63,7 @@ public void shouldExportCorrectSplitPointsIntType() { List splitPoints = new ArrayList<>(); splitPoints.add(-10); splitPoints.add(1000); - stateStore.initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); ExportSplitPoints exportSplitPoints = new ExportSplitPoints(stateStore, schema); // When @@ -80,7 +81,7 @@ public void shouldExportCorrectSplitPointsLongType() { List splitPoints = new ArrayList<>(); splitPoints.add(-10L); splitPoints.add(1000L); - stateStore.initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); ExportSplitPoints exportSplitPoints = new ExportSplitPoints(stateStore, schema); // When @@ -98,7 +99,7 @@ public void shouldExportCorrectSplitPointsStringType() { List splitPoints = new ArrayList<>(); splitPoints.add("A"); splitPoints.add("T"); - stateStore.initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); ExportSplitPoints exportSplitPoints = new ExportSplitPoints(stateStore, schema); // When @@ -116,7 +117,7 @@ public void shouldExportCorrectSplitPointsByteArrayType() { List splitPoints = new ArrayList<>(); splitPoints.add(new byte[]{10}); splitPoints.add(new byte[]{100}); - stateStore.initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(schema, splitPoints).construct()); ExportSplitPoints exportSplitPoints = new ExportSplitPoints(stateStore, schema); // When diff --git a/java/clients/src/test/java/sleeper/clients/status/report/filestatus/FilesStatusReportTest.java b/java/clients/src/test/java/sleeper/clients/status/report/filestatus/FilesStatusReportTest.java index 34539320f3..b3295f473c 100644 --- a/java/clients/src/test/java/sleeper/clients/status/report/filestatus/FilesStatusReportTest.java +++ b/java/clients/src/test/java/sleeper/clients/status/report/filestatus/FilesStatusReportTest.java @@ -46,7 +46,7 @@ public void shouldReportFilesStatusGivenOneActiveFilePerLeafPartition() throws E .splitToNewChildren("5", "6", "C", "bbb") .splitToNewChildren("6", "A", "B", "aaa") .buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(partitions); update(stateStore).addFiles(List.of( fileReferenceFactory.partitionFile("A", 50000001), @@ -73,7 +73,7 @@ public void shouldReportFilesStatusGivenActiveFileInLeafAndMiddlePartition() thr .splitToNewChildren("A", "B", "C", "mmm") .splitToNewChildren("B", "D", "E", "ggg") .buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(partitions); update(stateStore).addFiles(List.of( fileReferenceFactory.partitionFile("D", 50000001), @@ -93,7 +93,7 @@ public void shouldReportFilesStatusGivenFilesWithNoReferencesBelowMaxCount() thr .rootFirst("A") .splitToNewChildren("A", "B", "C", "mmm") .buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(partitions); update(stateStore).addFiles(List.of( fileReferenceFactory.partitionFile("B", "file1.parquet", 100), @@ -117,7 +117,7 @@ public void shouldReportFilesStatusGivenFilesWithNoReferencesAboveMaxCount() thr .rootFirst("A") .splitToNewChildren("A", "B", "C", "mmm") .buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.from(partitions); update(stateStore).addFiles(List.of( fileReferenceFactory.partitionFile("B", "file1.parquet", 100), @@ -146,7 +146,7 @@ public void shouldReportFilesStatusWhenSomeFilesHaveBeenSplit() throws Exception .rootFirst("A") .splitToNewChildren("A", "B", "C", "mmm") .buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.fromUpdatedAt(partitions, lastStateStoreUpdate); FileReference rootFile = fileReferenceFactory.partitionFile("A", "not-split.parquet", 1000); FileReference pendingSplit = fileReferenceFactory.partitionFile("B", "pending-split.parquet", 2000); @@ -165,11 +165,11 @@ public void shouldReportFilesStatusWhenSomeFilesHaveBeenSplit() throws Exception public void shouldReportFilesStatusWhenPartitionsNoLongerExist() throws Exception { // Given PartitionTree partitions = new PartitionsBuilder(schema).rootFirst("A").buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); FileReferenceFactory fileReferenceFactory = FileReferenceFactory.fromUpdatedAt(partitions, lastStateStoreUpdate); FileReference file1 = fileReferenceFactory.rootFile("file1.parquet", 1000L); FileReference file2 = fileReferenceFactory.rootFile("file2.parquet", 2000L); - stateStore.initialise(new PartitionsBuilder(schema).rootFirst("B").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).rootFirst("B").buildList()); update(stateStore).addFiles(List.of(file1, file2)); // When / Then diff --git a/java/clients/src/test/java/sleeper/clients/status/report/partitions/PartitionsStatusReportIT.java b/java/clients/src/test/java/sleeper/clients/status/report/partitions/PartitionsStatusReportIT.java index e797c7065b..c4420da530 100644 --- a/java/clients/src/test/java/sleeper/clients/status/report/partitions/PartitionsStatusReportIT.java +++ b/java/clients/src/test/java/sleeper/clients/status/report/partitions/PartitionsStatusReportIT.java @@ -61,7 +61,7 @@ void shouldGetReportWhenTwoLeafPartitionsBothNeedSplitting() throws Exception { .splitToNewChildren("parent", "A", "B", "aaa") .buildTree(); StateStore stateStore = stateStore(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); update(stateStore).addFiles(FileReferenceFactory.from(tree).singleFileInEachLeafPartitionWithRecords(100).toList()); // When / Then diff --git a/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java b/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java index 036e9b1898..52490a2c49 100644 --- a/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java +++ b/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java @@ -89,7 +89,7 @@ void shouldDeleteOnlyTable() throws Exception { // Given TableProperties table = createTable(uniqueIdAndName("test-table-1", "table-1")); StateStore stateStoreBefore = createStateStore(table); - stateStoreBefore.initialise(new PartitionsBuilder(schema) + update(stateStoreBefore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", 50L) .buildList()); @@ -154,7 +154,7 @@ void shouldDeleteTableWhenSnapshotIsPresent() throws Exception { // Given TableProperties table = createTable(uniqueIdAndName("test-table-1", "table-1")); StateStore stateStore = createStateStore(table); - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", 50L) .buildList()); diff --git a/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java index 5375d4f4da..e956834198 100644 --- a/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java +++ b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java @@ -167,7 +167,7 @@ private TableProperties addTable(InstanceProperties instanceProperties, Schema s TableProperties tableProperties = createTestTableProperties(instanceProperties, schema); S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient) .createTable(tableProperties); - stateStoreProvider.getStateStore(tableProperties) + update(stateStoreProvider.getStateStore(tableProperties)) .initialise(partitions.getAllPartitions()); return tableProperties; } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerEmptyOutputIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerEmptyOutputIT.java index d6003ce4f7..431354fa7b 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerEmptyOutputIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerEmptyOutputIT.java @@ -35,6 +35,7 @@ import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestData.writeRootFile; import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestUtils.assignJobIdToInputFiles; import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestUtils.createSchemaWithTypesForKeyAndTwoValues; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; class JavaCompactionRunnerEmptyOutputIT extends CompactionRunnerTestBase { @@ -43,7 +44,7 @@ void shouldMergeFilesWhenSomeAreEmpty() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data = keyAndTwoValuesSortedEvenLongs(); FileReference file1 = ingestRecordsGetFile(data); @@ -67,7 +68,7 @@ void shouldMergeFilesWhenAllAreEmpty() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); FileReference file1 = writeRootFile(schema, stateStore, dataFolderName + "/file1.parquet", List.of()); FileReference file2 = writeRootFile(schema, stateStore, dataFolderName + "/file2.parquet", List.of()); @@ -90,7 +91,7 @@ void shouldWriteSketchWhenWritingEmptyFile() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); FileReference file1 = writeRootFile(schema, stateStore, dataFolderName + "/file1.parquet", List.of()); FileReference file2 = writeRootFile(schema, stateStore, dataFolderName + "/file2.parquet", List.of()); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIT.java index ba1b25e9b8..04b66e4168 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIT.java @@ -37,6 +37,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestUtils.assignJobIdToInputFiles; import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestUtils.createSchemaWithTypesForKeyAndTwoValues; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; class JavaCompactionRunnerIT extends CompactionRunnerTestBase { @@ -45,7 +46,7 @@ void shouldMergeFilesWithLongKey() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data1 = CompactionRunnerTestData.keyAndTwoValuesSortedEvenLongs(); List data2 = CompactionRunnerTestData.keyAndTwoValuesSortedOddLongs(); @@ -71,7 +72,7 @@ void shouldWriteSketchWhenMergingFiles() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data1 = CompactionRunnerTestData.keyAndTwoValuesSortedEvenLongs(); List data2 = CompactionRunnerTestData.keyAndTwoValuesSortedOddLongs(); @@ -120,7 +121,7 @@ void shouldMergeFilesWithStringKey() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new StringType(), new StringType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data1 = CompactionRunnerTestData.keyAndTwoValuesSortedEvenStrings(); List data2 = CompactionRunnerTestData.keyAndTwoValuesSortedOddStrings(); @@ -173,7 +174,7 @@ void shouldMergeFilesWithByteArrayKey() throws Exception { // Given Schema schema = createSchemaWithTypesForKeyAndTwoValues(new ByteArrayType(), new ByteArrayType(), new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data1 = CompactionRunnerTestData.keyAndTwoValuesSortedEvenByteArrays(); List data2 = CompactionRunnerTestData.keyAndTwoValuesSortedOddByteArrays(); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIteratorIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIteratorIT.java index 2c6c846f58..dc7e9f5568 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIteratorIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerIteratorIT.java @@ -35,6 +35,7 @@ import static sleeper.compaction.job.execution.testutils.CompactionRunnerTestUtils.assignJobIdToInputFiles; import static sleeper.core.properties.table.TableProperty.ITERATOR_CLASS_NAME; import static sleeper.core.properties.table.TableProperty.ITERATOR_CONFIG; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; class JavaCompactionRunnerIteratorIT extends CompactionRunnerTestBase { @@ -43,7 +44,7 @@ void shouldApplyIteratorDuringCompaction() throws Exception { // Given Schema schema = CompactionRunnerTestUtils.createSchemaWithKeyTimestampValue(); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); List data1 = CompactionRunnerTestData.specifiedFromEvens((even, record) -> { record.put("key", (long) even); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerLocalStackIT.java index c087b3d3ae..8ea49678a9 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/JavaCompactionRunnerLocalStackIT.java @@ -48,6 +48,7 @@ import static sleeper.core.properties.instance.CommonProperty.ID; import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_INGEST_PARTITION_FILE_WRITER_TYPE; import static sleeper.core.properties.table.TableProperty.GARBAGE_COLLECTOR_DELAY_BEFORE_DELETION; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class JavaCompactionRunnerLocalStackIT extends CompactionRunnerTestBase { @@ -74,7 +75,7 @@ public void shouldRunCompactionJob() throws Exception { tableProperties.setSchema(schema); createStateStore(); PartitionTree tree = new PartitionsBuilder(schema).singlePartition("root").buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); List data1 = CompactionRunnerTestData.keyAndTwoValuesSortedEvenLongs(); List data2 = CompactionRunnerTestData.keyAndTwoValuesSortedOddLongs(); diff --git a/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerIT.java b/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerIT.java index 97d3982c1c..5fcf81461e 100644 --- a/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerIT.java +++ b/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerIT.java @@ -64,6 +64,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class RustCompactionRunnerIT { @@ -89,7 +90,7 @@ void shouldMergeFilesWithStringKey() throws Exception { // Given Schema schema = schemaWithKey("key", new StringType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", "record-1")); Record record2 = new Record(Map.of("key", "record-2")); String file1 = writeFileForPartition("root", List.of(record1)); @@ -112,7 +113,7 @@ void shouldMergeFilesWithLongKey() throws Exception { // Given Schema schema = schemaWithKey("key", new LongType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", 1L)); Record record2 = new Record(Map.of("key", 2L)); String file1 = writeFileForPartition("root", List.of(record1)); @@ -136,7 +137,7 @@ void shouldMergeFilesWithIntKey() throws Exception { // Given Schema schema = schemaWithKey("key", new IntType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", 1)); Record record2 = new Record(Map.of("key", 2)); String file1 = writeFileForPartition("root", List.of(record1)); @@ -160,7 +161,7 @@ void shouldMergeFilesWithByteArrayKey() throws Exception { // Given Schema schema = schemaWithKey("key", new ByteArrayType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", new byte[]{1, 2})); Record record2 = new Record(Map.of("key", new byte[]{3, 4})); String file1 = writeFileForPartition("root", List.of(record1)); @@ -189,7 +190,7 @@ void shouldMergeEmptyAndNonEmptyFile() throws Exception { // Given Schema schema = schemaWithKey("key", new StringType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record = new Record(Map.of("key", "test-value")); String emptyFile = writeFileForPartition("root", List.of()); String nonEmptyFile = writeFileForPartition("root", List.of(record)); @@ -210,7 +211,7 @@ void shouldMergeTwoEmptyFiles() throws Exception { // Given Schema schema = schemaWithKey("key", new StringType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); String file1 = writeFileForPartition("root", List.of()); String file2 = writeFileForPartition("root", List.of()); CompactionJob job = createCompactionForPartition("test-job", "root", List.of(file1, file2)); @@ -234,7 +235,7 @@ void shouldWriteSketchWhenMergingFiles() throws Exception { // Given Schema schema = schemaWithKey("key", new StringType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", "record-1")); Record record2 = new Record(Map.of("key", "record-2")); String file1 = writeFileForPartition("root", List.of(record1)); diff --git a/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerLocalStackIT.java b/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerLocalStackIT.java index 7c7b574351..b8ecaa11c9 100644 --- a/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerLocalStackIT.java +++ b/java/compaction/compaction-rust/src/test/java/sleeper/compaction/rust/RustCompactionRunnerLocalStackIT.java @@ -59,6 +59,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class RustCompactionRunnerLocalStackIT extends LocalStackTestBase { @@ -78,7 +79,7 @@ void shouldRunCompactionJob() throws Exception { // Given Schema schema = schemaWithKey("key", new StringType()); tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); Record record1 = new Record(Map.of("key", "record-1")); Record record2 = new Record(Map.of("key", "record-2")); String file1 = writeFileForPartition("root", List.of(record1)); diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStore.java b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStore.java index 14da6b49bf..d3120f6411 100644 --- a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStore.java +++ b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogStateStore.java @@ -68,7 +68,7 @@ public static TransactionLogStateStore createAndInitialise(TableProperties table */ public static TransactionLogStateStore createAndInitialiseWithPartitions(List partitions, TableProperties tableProperties, InMemoryTransactionLogs transactionLogs) { TransactionLogStateStore stateStore = create(tableProperties, transactionLogs); - stateStore.initialise(partitions); + update(stateStore).initialise(partitions); return stateStore; } diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreSnapshotsTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreSnapshotsTest.java index fa8a113555..51a1bf80eb 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreSnapshotsTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreSnapshotsTest.java @@ -74,7 +74,7 @@ void shouldLoadSnapshotOnFirstQueryForPartitions() throws Exception { // When createSnapshotWithFreshStateAtTransactionNumber(1, stateStore -> { - stateStore.initialise(partitions.buildList()); + update(stateStore).initialise(partitions.buildList()); }); // Then @@ -133,11 +133,11 @@ void shouldNotLoadPartitionsSnapshotWhenOnlyOneTransactionAheadAfterLoadingLog() .timeBetweenSnapshotChecks(Duration.ZERO)); List logPartitions = new PartitionsBuilder(schema).rootFirst("A").buildList(); List snapshotPartitions = new PartitionsBuilder(schema).rootFirst("B").buildList(); - stateStore.initialise(logPartitions); + update(stateStore).initialise(logPartitions); // When createSnapshotWithFreshStateAtTransactionNumber(2, snapshotStateStore -> { - snapshotStateStore.initialise(snapshotPartitions); + update(snapshotStateStore).initialise(snapshotPartitions); }); // Then @@ -152,11 +152,11 @@ void shouldLoadPartitionsSnapshotWhenMoreThanConfiguredTransactionsAheadAfterLoa .timeBetweenSnapshotChecks(Duration.ZERO)); List logPartitions = new PartitionsBuilder(schema).rootFirst("A").buildList(); List snapshotPartitions = new PartitionsBuilder(schema).rootFirst("B").buildList(); - stateStore.initialise(logPartitions); + update(stateStore).initialise(logPartitions); // When createSnapshotWithFreshStateAtTransactionNumber(3, snapshotStateStore -> { - snapshotStateStore.initialise(snapshotPartitions); + update(snapshotStateStore).initialise(snapshotPartitions); }); // Then diff --git a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java index b5fca90b7e..21701bd8cb 100644 --- a/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java +++ b/java/garbage-collector/src/test/java/sleeper/garbagecollector/GarbageCollectorIT.java @@ -449,7 +449,7 @@ private void writeFile(String filename) throws Exception { private TableProperties createTable() { TableProperties tableProperties = createTestTableProperties(instanceProperties, TEST_SCHEMA); tables.add(tableProperties); - stateStoreProvider.getStateStore(tableProperties).initialise(partitions.getAllPartitions()); + update(stateStoreProvider.getStateStore(tableProperties)).initialise(partitions.getAllPartitions()); return tableProperties; } diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorCommonIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorCommonIT.java index 1aba2f479a..d3631dd7d9 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorCommonIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorCommonIT.java @@ -71,6 +71,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.ingest.runner.testutils.RecordGenerator.genericKey1D; import static sleeper.ingest.runner.testutils.ResultVerifier.readMergedRecordsFromPartitionDataFiles; import static sleeper.ingest.runner.testutils.ResultVerifier.readRecordsFromPartitionDataFile; @@ -122,7 +123,7 @@ public void shouldWriteRecordsCorrectly(TestIngestType ingestType) throws Except PartitionTree tree = new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -161,7 +162,7 @@ public void shouldWriteRecordsSplitByPartitionIntKey(TestIngestType ingestType) .rootFirst("root") .splitToNewChildren("root", "left", "right", 2) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -206,7 +207,7 @@ public void shouldWriteRecordsSplitByPartitionLongKey(TestIngestType ingestType) .rootFirst("root") .splitToNewChildren("root", "left", "right", 2L) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -252,7 +253,7 @@ public void shouldWriteRecordsSplitByPartitionStringKey(TestIngestType ingestTyp .rootFirst("root") .splitToNewChildren("root", "left", "right", "000000102") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -300,7 +301,7 @@ public void shouldWriteRecordsSplitByPartitionByteArrayKey(TestIngestType ingest .rootFirst("root") .splitToNewChildren("root", "left", "right", new byte[]{64, 64}) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -355,7 +356,7 @@ public void shouldWriteRecordsSplitByPartitionStringKeyLongSortKey(TestIngestTyp .rootFirst("root") .splitToNewChildrenOnDimension("root", "left", "right", 0, "000000102") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -405,7 +406,7 @@ public void shouldWriteRecordsSplitByPartition2DimensionalByteArrayKey(TestInges .rootFirst("root") .splitToNewChildrenOnDimension("root", "left", "right", 0, new byte[]{10}) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -459,7 +460,7 @@ public void shouldWriteRecordsSplitByPartition2DimensionalIntLongKeyWhenSplitOnD .rootFirst("root") .splitToNewChildrenOnDimension("root", "left", "right", 1, 10L) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -507,7 +508,7 @@ public void shouldWriteRecordsSplitByPartition2DimensionalLongStringKeyWhenSplit .rootFirst("root") .splitToNewChildrenOnDimension("root", "left", "right", 1, "2") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -560,7 +561,7 @@ public void shouldWriteRecordsSplitByPartitionWhenThereIsOnlyDataInOnePartition( .rootFirst("root") .splitToNewChildren("root", "left", "right", 2L) .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -602,7 +603,7 @@ public void shouldWriteDuplicateRecords( PartitionTree tree = new PartitionsBuilder(duplicatedRecordListAndSchema.sleeperSchema) .rootFirst("root") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() @@ -639,7 +640,7 @@ public void shouldWriteNoRecordsSuccessfully(TestIngestType ingestType) throws E PartitionTree tree = new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of()) .build(); @@ -675,7 +676,7 @@ public void shouldApplyIterator(TestIngestType ingestType) throws Exception { PartitionTree tree = new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); Instant stateStoreUpdateTime = Instant.parse("2023-08-08T11:20:00Z"); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorFileWritingStrategyIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorFileWritingStrategyIT.java index 80867c0c38..4e5a09f5bf 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorFileWritingStrategyIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorFileWritingStrategyIT.java @@ -65,6 +65,7 @@ import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF; import static sleeper.core.properties.validation.IngestFileWritingStrategy.ONE_REFERENCE_PER_LEAF; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.ingest.runner.testutils.IngestCoordinatorTestHelper.accurateFileReferenceBuilder; import static sleeper.ingest.runner.testutils.IngestCoordinatorTestHelper.accurateSplitFileReference; import static sleeper.ingest.runner.testutils.RecordGenerator.genericKey1D; @@ -110,7 +111,7 @@ public void shouldWriteOneFileToRootPartition() throws Exception { setSchema(recordListAndSchema.sleeperSchema); PartitionTree tree = new PartitionsBuilder(recordListAndSchema.sleeperSchema) .singlePartition("root").buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("rootFile")) @@ -143,7 +144,7 @@ public void shouldWriteOneFileToOneLeafPartition() throws Exception { .rootFirst("root") .splitToNewChildren("root", "L", "R", "000000050") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("lFile")) @@ -178,7 +179,7 @@ public void shouldWriteOneFileInEachLeafPartition() throws Exception { .splitToNewChildren("L", "LL", "LR", "000000020") .splitToNewChildren("R", "RL", "RR", "000000080") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("llFile", "lrFile", "rlFile", "rrFile")) @@ -218,7 +219,7 @@ public void shouldWriteRecordsWhenThereAreMoreRecordsThanCanFitInLocalStore() th .rootFirst("root") .splitToNewChildren("root", "L", "R", "000000010") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("leftFile1", "rightFile1", "leftFile2", "rightFile2")) @@ -273,7 +274,7 @@ public void shouldWriteOneFileToRootPartition() throws Exception { setSchema(recordListAndSchema.sleeperSchema); PartitionTree tree = new PartitionsBuilder(recordListAndSchema.sleeperSchema) .singlePartition("root").buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("rootFile")) @@ -307,7 +308,7 @@ public void shouldWriteOneFileWithReferenceInOneLeafPartition() throws Exception .rootFirst("root") .splitToNewChildren("root", "L", "R", "000000050") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("rootFile")) @@ -343,7 +344,7 @@ public void shouldWriteOneFileWithReferencesInLeafPartitions() throws Exception .splitToNewChildren("L", "LL", "LR", "000000020") .splitToNewChildren("R", "RL", "RR", "000000080") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("rootFile")) @@ -381,7 +382,7 @@ public void shouldWriteRecordsWhenThereAreMoreRecordsThanCanFitInLocalStore() th .rootFirst("root") .splitToNewChildren("root", "L", "R", "000000010") .buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); IngestCoordinatorTestParameters parameters = createTestParameterBuilder() .fileNames(List.of("rootFile1", "rootFile2")) diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrayListIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrayListIT.java index edf00e1171..16b0716be7 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrayListIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrayListIT.java @@ -56,6 +56,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTablePropertiesWithNoSchema; import static sleeper.core.properties.validation.IngestFileWritingStrategy.ONE_FILE_PER_LEAF; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.ingest.runner.testutils.IngestCoordinatorTestHelper.parquetConfiguration; import static sleeper.ingest.runner.testutils.IngestCoordinatorTestHelper.standardIngestCoordinatorBuilder; import static sleeper.ingest.runner.testutils.ResultVerifier.readMergedRecordsFromPartitionDataFiles; @@ -85,7 +86,7 @@ public void before() { new TransactionLogStateStoreCreator(instanceProperties, dynamoClient).create(); tableProperties.setEnum(INGEST_FILE_WRITING_STRATEGY, ONE_FILE_PER_LEAF); stateStore = createStateStore(recordListAndSchema.sleeperSchema); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); stateStore.fixFileUpdateTime(stateStoreUpdateTime); } diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowIT.java index ec42cee99d..9b10bf3f68 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowIT.java @@ -41,6 +41,7 @@ import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_BATCH_BUFFER_BYTES; import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_MAX_LOCAL_STORE_BYTES; import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_WORKING_BUFFER_BYTES; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; class IngestCoordinatorUsingDirectWriteBackedByArrowIT extends DirectWriteBackedByArrowTestBase { @Test @@ -49,7 +50,7 @@ void shouldWriteRecordsWhenThereAreMoreRecordsInAPartitionThanCanFitInMemory() t new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) + update(stateStore).initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) .buildList()); @@ -95,7 +96,7 @@ void shouldWriteRecordsWhenThereAreMoreRecordsThanCanFitInLocalFile() throws Exc new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) + update(stateStore).initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) .buildList()); @@ -146,7 +147,7 @@ void shouldErrorWhenBatchBufferAndWorkingBufferAreSmall() throws Exception { new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) + update(stateStore).initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) .buildList()); diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowRecordWriterAcceptingRecordListIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowRecordWriterAcceptingRecordListIT.java index 52b9c18a95..20c870e636 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowRecordWriterAcceptingRecordListIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/IngestCoordinatorUsingDirectWriteBackedByArrowRecordWriterAcceptingRecordListIT.java @@ -46,6 +46,7 @@ import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_BATCH_BUFFER_BYTES; import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_MAX_LOCAL_STORE_BYTES; import static sleeper.core.properties.instance.ArrowIngestProperty.ARROW_INGEST_WORKING_BUFFER_BYTES; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; class IngestCoordinatorUsingDirectWriteBackedByArrowRecordWriterAcceptingRecordListIT extends DirectWriteBackedByArrowTestBase { @@ -55,7 +56,7 @@ void shouldWriteRecordsWhenThereAreMoreRecordsInAPartitionThanCanFitInMemory() t new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) + update(stateStore).initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) .buildList()); @@ -100,7 +101,7 @@ void shouldWriteRecordsWhenThereAreMoreRecordsThanCanFitInLocalFile() throws Exc new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) + update(stateStore).initialise(new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) .buildList()); @@ -150,7 +151,7 @@ void shouldErrorWhenBatchBufferAndWorkingBufferAreSmall() throws Exception { new LongType(), LongStream.range(-10000, 10000).boxed().collect(Collectors.toList())); tableProperties.setSchema(recordListAndSchema.sleeperSchema); - stateStore.initialise( + update(stateStore).initialise( new PartitionsBuilder(recordListAndSchema.sleeperSchema) .rootFirst("root") .splitToNewChildren("root", "left", "right", 0L) diff --git a/java/query/query-core/src/test/java/sleeper/query/core/recordretrieval/QueryExecutorTest.java b/java/query/query-core/src/test/java/sleeper/query/core/recordretrieval/QueryExecutorTest.java index e783b3e3d7..81e95c8e9d 100644 --- a/java/query/query-core/src/test/java/sleeper/query/core/recordretrieval/QueryExecutorTest.java +++ b/java/query/query-core/src/test/java/sleeper/query/core/recordretrieval/QueryExecutorTest.java @@ -116,7 +116,7 @@ void shouldNotFindRecordOutsideSubRangeInSinglePartition() throws Exception { @Test void shouldNotFindRecordOutsidePartitionRangeWhenFileContainsAnInactiveRecord() throws Exception { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", 5L) .buildList()); @@ -161,7 +161,7 @@ class RequestValueFields { @BeforeEach void setUp() throws Exception { tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); } @Test @@ -219,7 +219,7 @@ class ApplyIterators { @BeforeEach void setUp() throws Exception { tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); addRootFile("file.parquet", List.of( new Record(Map.of("key", "A", "value", 2L)), new Record(Map.of("key", "A", "value", 2L)), diff --git a/java/query/query-lambda/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java b/java/query/query-lambda/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java index 8922eb5a55..5d124ba111 100644 --- a/java/query/query-lambda/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java +++ b/java/query/query-lambda/src/test/java/sleeper/query/lambda/SqsQueryProcessorLambdaIT.java @@ -107,6 +107,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_NAME; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.query.core.tracker.QueryState.COMPLETED; import static sleeper.query.core.tracker.QueryState.IN_PROGRESS; import static sleeper.query.core.tracker.QueryState.QUEUED; @@ -776,7 +777,7 @@ private TableProperties createTimeSeriesTable(List splitPoints) { StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf) .getStateStore(tableProperties); try { - stateStore.initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct()); } catch (StateStoreException e) { throw new RuntimeException(e); } diff --git a/java/query/query-lambda/src/test/java/sleeper/query/lambda/WarmQueryExecutorLambdaIT.java b/java/query/query-lambda/src/test/java/sleeper/query/lambda/WarmQueryExecutorLambdaIT.java index 7c1471ef2d..580672eb67 100644 --- a/java/query/query-lambda/src/test/java/sleeper/query/lambda/WarmQueryExecutorLambdaIT.java +++ b/java/query/query-lambda/src/test/java/sleeper/query/lambda/WarmQueryExecutorLambdaIT.java @@ -25,7 +25,6 @@ import sleeper.configuration.properties.S3InstanceProperties; import sleeper.configuration.properties.S3TableProperties; import sleeper.configuration.table.index.DynamoDBTableIndexCreator; -import sleeper.core.partition.PartitionsFromSplitPoints; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.range.Range; @@ -46,7 +45,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -65,6 +63,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_NAME; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.query.runner.output.NoResultsOutput.NO_RESULTS_OUTPUT; public class WarmQueryExecutorLambdaIT extends LocalStackTestBase { @@ -152,6 +151,6 @@ private void createTable(TableProperties tableProperties) { S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient).save(tableProperties); StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient, hadoopConf) .getStateStore(tableProperties); - stateStore.initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), new ArrayList<>()).construct()); + update(stateStore).initialise(tableProperties.getSchema()); } } diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/find/FindPartitionsToSplitTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/find/FindPartitionsToSplitTest.java index 4ac8bca95f..aa1865d758 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/find/FindPartitionsToSplitTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/find/FindPartitionsToSplitTest.java @@ -205,7 +205,7 @@ private List findPartitionsToSplit() throws Excepti private void setPartitions(Consumer config) throws Exception { PartitionsBuilder builder = new PartitionsBuilder(tableProperties.getSchema()); config.accept(builder); - stateStore.initialise(builder.buildList()); + update(stateStore).initialise(builder.buildList()); fileReferenceFactory = fileReferenceFactory(); } diff --git a/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/status/PartitionsStatusTest.java b/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/status/PartitionsStatusTest.java index 092ef0f926..5d460d9eb7 100644 --- a/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/status/PartitionsStatusTest.java +++ b/java/splitter/splitter-core/src/test/java/sleeper/splitter/core/status/PartitionsStatusTest.java @@ -58,7 +58,7 @@ class CountRecords { @Test void shouldCountRecordsFromReferencesOnLeaves() throws Exception { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("parent") .splitToNewChildren("parent", "A", "B", "aaa") .buildList()); @@ -83,7 +83,7 @@ void shouldCountRecordsFromReferencesOnLeaves() throws Exception { @Test void shouldCountRecordsFromReferenceOnLeafAndRoot() throws Exception { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("parent") .splitToNewChildren("parent", "A", "B", "aaa") .buildList()); @@ -105,7 +105,7 @@ void shouldCountRecordsFromReferenceOnLeafAndRoot() throws Exception { @Test void shouldCountRecordsFromNestedTree() throws Exception { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", "aaa") .splitToNewChildren("R", "RL", "RR", "bbb") @@ -136,7 +136,7 @@ class CountPartitions { @Test void shouldCountLeafPartitions() { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("parent") .splitToNewChildren("parent", "A", "B", "aaa") .buildList()); @@ -152,7 +152,7 @@ void shouldCountLeafPartitions() { void shouldFindNoSplittingPartitionsWhenThresholdNotExceeded() { // Given tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("parent") .splitToNewChildren("parent", "A", "B", "aaa") .buildList()); @@ -169,7 +169,7 @@ void shouldFindNoSplittingPartitionsWhenThresholdNotExceeded() { void shouldFindSplittingPartitionsWhenThresholdExceeded() { // Given tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("parent") .splitToNewChildren("parent", "A", "B", "aaa") .buildList()); @@ -186,7 +186,7 @@ void shouldFindSplittingPartitionsWhenThresholdExceeded() { void shouldExcludeNonLeafPartitionsInNeedsSplittingCount() { // Given tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "L", "R", "abc") .buildList()); @@ -203,7 +203,7 @@ void shouldExcludeNonLeafPartitionsInNeedsSplittingCount() { @Test void shouldOrderPartitionsByTreeLeavesFirst() { // Given - stateStore.initialise(new PartitionsBuilder(schema) + update(stateStore).initialise(new PartitionsBuilder(schema) .rootFirst("root") .splitToNewChildren("root", "some-middle", "other-middle", "aaa") .splitToNewChildren("some-middle", "some-leaf", "other-leaf", "bbb") diff --git a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/SplitPartitionLambdaIT.java b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/SplitPartitionLambdaIT.java index 26bd85b4a9..02bd7c1465 100644 --- a/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/SplitPartitionLambdaIT.java +++ b/java/splitter/splitter-lambda/src/test/java/sleeper/splitter/lambda/SplitPartitionLambdaIT.java @@ -63,6 +63,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class SplitPartitionLambdaIT extends LocalStackTestBase { @@ -148,7 +149,7 @@ private List receiveCommitMessages() { private TableProperties createTable(Schema schema, PartitionTree partitionTree) { TableProperties tableProperties = createTestTableProperties(instanceProperties, schema); S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient).createTable(tableProperties); - stateStoreProvider().getStateStore(tableProperties) + update(stateStoreProvider().getStateStore(tableProperties)) .initialise(partitionTree.getAllPartitions()); return tableProperties; } diff --git a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java index d606a81188..d41dae4d14 100644 --- a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java +++ b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java @@ -732,7 +732,7 @@ private TableProperties createTable() { private void createTable(TableProperties tableProperties) { propertiesByTableId.put(tableProperties.get(TABLE_ID), tableProperties); - stateStore(tableProperties).initialise(partitions.getAllPartitions()); + update(stateStore(tableProperties)).initialise(partitions.getAllPartitions()); } private StateStore stateStore(TableProperties tableProperties) { diff --git a/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java b/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java index 5f972579a5..9bc6da3ea1 100644 --- a/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java +++ b/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java @@ -56,6 +56,7 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class StateStoreCommitterLambdaTest { private static final Instant DEFAULT_FILE_UPDATE_TIME = FilesReportTestHelper.DEFAULT_UPDATE_TIME; @@ -69,7 +70,7 @@ public class StateStoreCommitterLambdaTest { @BeforeEach void setUp() { - stateStore().initialise(partitions.getAllPartitions()); + update(stateStore()).initialise(partitions.getAllPartitions()); } @Test diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStoreIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStoreIT.java index 90b9c29c2b..fe1db9752b 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStoreIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStoreIT.java @@ -138,7 +138,7 @@ void shouldStoreFileUpdateTimeInLogEntry() throws Exception { StateStore stateStore = createStateStore(tableProperties); stateStore.fixFileUpdateTime(updateTime); PartitionTree partitions = new PartitionsBuilder(schema).singlePartition("root").buildTree(); - stateStore.initialise(partitions.getAllPartitions()); + update(stateStore).initialise(partitions.getAllPartitions()); update(stateStore).addFile(FileReferenceFactory.from(partitions).rootFile(100)); // When @@ -233,7 +233,7 @@ void shouldStoreTransactionInS3() throws Exception { Instant updateTime = Instant.parse("2024-04-09T14:19:01Z"); StateStore stateStore = createStateStore(tableProperties); stateStore.fixPartitionUpdateTime(updateTime); - stateStore.initialise(partitions); + update(stateStore).initialise(partitions); // Then the transaction is held in S3 String file = singleFileInDataBucket(); @@ -253,7 +253,7 @@ void shouldDeleteTransactionStoredInS3() throws Exception { Instant updateTime = Instant.parse("2024-04-09T14:19:01Z"); StateStore stateStore = createStateStore(tableProperties); stateStore.fixPartitionUpdateTime(updateTime); - stateStore.initialise(PartitionsBuilderSplitsFirst + update(stateStore).initialise(PartitionsBuilderSplitsFirst .leavesWithSplits(schema, leafIds, splitPoints) .anyTreeJoiningAllLeaves().buildList()); diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java index 55f365ee18..569ce65084 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java @@ -94,7 +94,7 @@ void shouldInitialiseTableWithManyPartitionsCreatingTransactionTooLargeToFitInAD .anyTreeJoiningAllLeaves().buildTree(); // When - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); // Then assertThat(new HashSet<>(stateStore.getAllPartitions())).isEqualTo(new HashSet<>(tree.getAllPartitions())); @@ -114,7 +114,7 @@ void shouldReadTransactionTooLargeToFitInADynamoDBItemWithFreshStateStoreInstanc .anyTreeJoiningAllLeaves().buildTree(); // When - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); // Then assertThat(createStateStore().getAllPartitions()).containsExactlyElementsOf(tree.getAllPartitions()); @@ -124,7 +124,7 @@ void shouldReadTransactionTooLargeToFitInADynamoDBItemWithFreshStateStoreInstanc void shouldAddATransactionAlreadyHeldInS3() { // Given PartitionTree tree = new PartitionsBuilder(schema).singlePartition("root").buildTree(); - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); FileReference file = fileFactory(tree).rootFile("test.parquet", 100); FileReferenceTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); String key = TransactionBodyStore.createObjectKey(tableProperties); @@ -157,7 +157,7 @@ void shouldLoadLatestSnapshotsWhenCreatingStateStore() throws Exception { factory.partitionFile("L", "file2.parquet", 25L), factory.partitionFile("R", "file3.parquet", 50L)); createSnapshotWithFreshState(stateStore -> { - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); update(stateStore).addFiles(files); }); @@ -185,7 +185,7 @@ void shouldNotLoadLatestSnapshotsByClassname() throws Exception { factory.partitionFile("L", "file2.parquet", 25L), factory.partitionFile("R", "file3.parquet", 50L)); createSnapshotWithFreshState(stateStore -> { - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); update(stateStore).addFiles(files); }); @@ -207,7 +207,7 @@ void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Except .buildTree(); FileReferenceFactory factory1 = fileFactory(tree1); FileReference file1 = factory1.rootFile("file1.parquet", 123L); - stateStore.initialise(tree1.getAllPartitions()); + update(stateStore).initialise(tree1.getAllPartitions()); update(stateStore).addFile(file1); PartitionTree tree2 = new PartitionsBuilder(schema) @@ -217,7 +217,7 @@ void shouldExcludePreviousTransactionsWhenLoadingLatestSnapshots() throws Except FileReferenceFactory factory2 = fileFactory(tree2); FileReference file2 = factory2.rootFile("file2.parquet", 456L); createSnapshotWithFreshState(stateStore2 -> { - stateStore2.initialise(tree2.getAllPartitions()); + update(stateStore2).initialise(tree2.getAllPartitions()); update(stateStore2).addFile(file2); }); @@ -239,7 +239,7 @@ void shouldLoadLatestPartitionsSnapshotIfNoFilesSnapshotIsPresent() throws Excep .splitToNewChildren("root", "L", "R", 123L) .buildTree(); createSnapshotWithFreshState(stateStore -> { - stateStore.initialise(tree.getAllPartitions()); + update(stateStore).initialise(tree.getAllPartitions()); }); // When diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreMultipleTablesIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreMultipleTablesIT.java index bc39d92ced..492f9395c4 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreMultipleTablesIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreMultipleTablesIT.java @@ -69,8 +69,8 @@ void shouldCreatePartitionsForTwoTables() throws Exception { PartitionTree tree2 = new PartitionsBuilder(schema).singlePartition("partition2").buildTree(); // When - stateStore1.initialise(tree1.getAllPartitions()); - stateStore2.initialise(tree2.getAllPartitions()); + update(stateStore1).initialise(tree1.getAllPartitions()); + update(stateStore2).initialise(tree2.getAllPartitions()); // Then assertThat(stateStore1.getAllPartitions()).containsExactly(tree1.getRootPartition()); @@ -104,8 +104,8 @@ void shouldClearPartitionsAndFilesForOneTableWhenTwoTablesArePresent() throws Ex StateStore stateStore2 = getTableStateStore(); PartitionTree tree1 = new PartitionsBuilder(schema).singlePartition("partition1").buildTree(); PartitionTree tree2 = new PartitionsBuilder(schema).singlePartition("partition2").buildTree(); - stateStore1.initialise(tree1.getAllPartitions()); - stateStore2.initialise(tree2.getAllPartitions()); + update(stateStore1).initialise(tree1.getAllPartitions()); + update(stateStore2).initialise(tree2.getAllPartitions()); FileReference file1 = FileReferenceFactory.from(tree1).rootFile("file1.parquet", 12); FileReference file2 = FileReferenceFactory.from(tree2).rootFile("file2.parquet", 34); update(stateStore1).addFile(file1); diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreSnapshotsIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreSnapshotsIT.java index 68c48e69d1..6888797b32 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreSnapshotsIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreSnapshotsIT.java @@ -70,7 +70,7 @@ void shouldLoadPartitionsFromSnapshotWhenNotInLogOnFirstLoad() throws Exception // When createSnapshotWithFreshStateAtTransactionNumber(1, stateStore -> { - stateStore.initialise(partitions.buildList()); + update(stateStore).initialise(partitions.buildList()); }); // Then @@ -121,11 +121,11 @@ void shouldNotLoadPartitionsSnapshotWhenOnlyOneTransactionAheadAfterLoadingLog() .minTransactionsAheadToLoadSnapshot(2)); List logPartitions = new PartitionsBuilder(schema).rootFirst("A").buildList(); List snapshotPartitions = new PartitionsBuilder(schema).rootFirst("B").buildList(); - stateStore.initialise(logPartitions); + update(stateStore).initialise(logPartitions); // When createSnapshotWithFreshStateAtTransactionNumber(2, snapshotStateStore -> { - snapshotStateStore.initialise(snapshotPartitions); + update(snapshotStateStore).initialise(snapshotPartitions); }); // Then @@ -139,11 +139,11 @@ void shouldLoadPartitionsSnapshotWhenMoreThanConfiguredTransactionsAheadAfterLoa .minTransactionsAheadToLoadSnapshot(2)); List logPartitions = new PartitionsBuilder(schema).rootFirst("A").buildList(); List snapshotPartitions = new PartitionsBuilder(schema).rootFirst("B").buildList(); - stateStore.initialise(logPartitions); + update(stateStore).initialise(logPartitions); // When createSnapshotWithFreshStateAtTransactionNumber(3, snapshotStateStore -> { - snapshotStateStore.initialise(snapshotPartitions); + update(snapshotStateStore).initialise(snapshotPartitions); }); // Then diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotCreatorIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotCreatorIT.java index 4358d6fdcd..6febe0a1ea 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotCreatorIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotCreatorIT.java @@ -49,7 +49,7 @@ void shouldCreateSnapshotsForOneTable() throws Exception { FileReference file = FileReferenceFactory.fromUpdatedAt(partitions, DEFAULT_UPDATE_TIME).rootFile(123); TableProperties table = createTable("test-table-id-1", "test-table-1"); StateStore inMemoryStateStore = createStateStoreWithInMemoryTransactionLog(table); - inMemoryStateStore.initialise(partitions.getAllPartitions()); + update(inMemoryStateStore).initialise(partitions.getAllPartitions()); update(inMemoryStateStore).addFile(file); // When we create a snapshot from the in-memory transactions @@ -84,7 +84,7 @@ void shouldCreateSnapshotsForMultipleTables() throws Exception { FileReference file1 = FileReferenceFactory.fromUpdatedAt(partitions1, DEFAULT_UPDATE_TIME) .rootFile("file1.parquet", 123L); StateStore inMemoryStateStore1 = createStateStoreWithInMemoryTransactionLog(table1); - inMemoryStateStore1.initialise(partitions1.getAllPartitions()); + update(inMemoryStateStore1).initialise(partitions1.getAllPartitions()); update(inMemoryStateStore1).addFile(file1); TableProperties table2 = createTable("test-table-id-2", "test-table-2"); @@ -95,7 +95,7 @@ void shouldCreateSnapshotsForMultipleTables() throws Exception { FileReference file2 = FileReferenceFactory.fromUpdatedAt(partitions2, DEFAULT_UPDATE_TIME) .rootFile("file2.parquet", 123L); StateStore inMemoryStateStore2 = createStateStoreWithInMemoryTransactionLog(table2); - inMemoryStateStore2.initialise(partitions2.getAllPartitions()); + update(inMemoryStateStore2).initialise(partitions2.getAllPartitions()); update(inMemoryStateStore2).addFile(file2); // When @@ -275,7 +275,7 @@ void shouldNotCreateSnapshotIfLoadingPreviousPartitionSnapshotFails() throws Exc TableProperties table = createTable("test-table-id-1", "test-table-1"); PartitionsBuilder partitions = new PartitionsBuilder(schema).singlePartition("root"); StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); - stateStore.initialise(partitions.buildList()); + update(stateStore).initialise(partitions.buildList()); createSnapshots(table); // And we add a transaction that would trigger a new snapshot creation partitions.splitToNewChildren("root", "L", "R", 123L) diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotDeleterIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotDeleterIT.java index 072b076734..9652f3e800 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotDeleterIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogSnapshotDeleterIT.java @@ -41,13 +41,13 @@ void shouldDeleteOldSnapshotsForOneTable() throws Exception { table.setNumber(TRANSACTION_LOG_SNAPSHOT_EXPIRY_IN_DAYS, 1); StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); PartitionsBuilder partitionsBuilder = new PartitionsBuilder(schema).rootFirst("root"); - stateStore.initialise(partitionsBuilder.buildList()); + update(stateStore).initialise(partitionsBuilder.buildList()); FileReferenceFactory factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test1.parquet", 123L)); createSnapshotsAt(table, Instant.parse("2024-04-25T11:24:00Z")); update(stateStore).clearFileData(); - stateStore.initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); + update(stateStore).initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test3.parquet", 789L)); createSnapshotsAt(table, Instant.parse("2024-04-27T11:24:00Z")); @@ -79,13 +79,13 @@ void shouldNotDeleteOldSnapshotsIfTheyAreNotOldEnoughForOneTable() throws Except table.setNumber(TRANSACTION_LOG_SNAPSHOT_EXPIRY_IN_DAYS, 1); StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); PartitionsBuilder partitionsBuilder = new PartitionsBuilder(schema).rootFirst("root"); - stateStore.initialise(partitionsBuilder.buildList()); + update(stateStore).initialise(partitionsBuilder.buildList()); FileReferenceFactory factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test1.parquet", 123L)); createSnapshotsAt(table, Instant.parse("2024-04-27T11:23:00Z")); update(stateStore).clearFileData(); - stateStore.initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); + update(stateStore).initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test3.parquet", 789L)); createSnapshotsAt(table, Instant.parse("2024-04-27T11:24:00Z")); @@ -155,12 +155,12 @@ void shouldNotDeleteSnapshotMetadataIfSnapshotFileFailedToDelete() throws Except table.setNumber(TRANSACTION_LOG_SNAPSHOT_EXPIRY_IN_DAYS, 1); StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); PartitionsBuilder partitionsBuilder = new PartitionsBuilder(schema).rootFirst("root"); - stateStore.initialise(partitionsBuilder.buildList()); + update(stateStore).initialise(partitionsBuilder.buildList()); FileReferenceFactory factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test1.parquet", 123L)); createSnapshotsAt(table, Instant.parse("2024-04-24T11:24:00Z")); update(stateStore).clearFileData(); - stateStore.initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); + update(stateStore).initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test3.parquet", 789L)); createSnapshotsAt(table, Instant.parse("2024-04-25T11:24:00Z")); @@ -198,12 +198,12 @@ void shouldDeleteOldSnapshotMetadataIfSnapshotFilesHaveAlreadyBeenDeleted() thro table.setNumber(TRANSACTION_LOG_SNAPSHOT_EXPIRY_IN_DAYS, 1); StateStore stateStore = createStateStoreWithInMemoryTransactionLog(table); PartitionsBuilder partitionsBuilder = new PartitionsBuilder(schema).rootFirst("root"); - stateStore.initialise(partitionsBuilder.buildList()); + update(stateStore).initialise(partitionsBuilder.buildList()); FileReferenceFactory factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test1.parquet", 123L)); createSnapshotsAt(table, Instant.parse("2024-04-24T11:24:00Z")); update(stateStore).clearFileData(); - stateStore.initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); + update(stateStore).initialise(partitionsBuilder.splitToNewChildren("root", "L", "R", 123L).buildList()); factory = FileReferenceFactory.from(stateStore); update(stateStore).addFile(factory.rootFile("test3.parquet", 789L)); createSnapshotsAt(table, Instant.parse("2024-04-25T11:24:00Z")); diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java index 003796a837..299eff594d 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java @@ -196,7 +196,7 @@ void shouldNotDeleteTransactionsWhenNoSnapshotExistsYet() throws Exception { void shouldDeleteOldPartitionTransactionWhenTwoAreBeforeLatestSnapshot() throws Exception { // Given we have two partitions transactions PartitionsBuilder partitions = new PartitionsBuilder(schema).rootFirst("root"); - setupAtTime(Instant.parse("2024-06-24T15:45:00Z"), () -> stateStore.initialise(partitions.buildList())); + setupAtTime(Instant.parse("2024-06-24T15:45:00Z"), () -> update(stateStore).initialise(partitions.buildList())); setupAtTime(Instant.parse("2024-06-24T15:46:00Z"), () -> partitions .splitToNewChildren("root", "L", "R", "m") .applySplit(stateStore, "root")); diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplittingTest.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplittingTest.java index ef80b2a30a..f52ab293b1 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplittingTest.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/partitioning/WaitForPartitionSplittingTest.java @@ -52,7 +52,7 @@ class SystemIsOnlySplitting { void shouldFindSplitsNotFinishedWhenOnePartitionStillNeedsSplitting() { // Given tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); - stateStore.initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); + update(stateStore).initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); update(stateStore).addFile(fileFactory().partitionFile("root", "test.parquet", 11)); // When @@ -84,11 +84,11 @@ void shouldFindSplitFinishedWhenOnePartitionWasSplitButSplittingCompactionHasNot tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); + update(before).initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); update(before).addFile(fileFactory(before).partitionFile("root", "test.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties) + update(after).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .buildList()); @@ -108,7 +108,7 @@ void shouldFindSplitsNotFinishedWhenTwoPartitionsNeededSplittingAndOneIsFinished tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties) + update(before).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .buildList()); @@ -116,7 +116,7 @@ void shouldFindSplitsNotFinishedWhenTwoPartitionsNeededSplittingAndOneIsFinished update(before).addFile(fileFactory(before).partitionFile("right", "right.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties) + update(after).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .splitToNewChildren("left", "left left", "left right", "left split") @@ -138,7 +138,7 @@ void shouldFindSplitsFinishedWhenTwoPartitionsNeededSplittingAndBothAreFinished( tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties) + update(before).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .buildList()); @@ -146,7 +146,7 @@ void shouldFindSplitsFinishedWhenTwoPartitionsNeededSplittingAndBothAreFinished( update(before).addFile(fileFactory(before).partitionFile("right", "right.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties) + update(after).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .splitToNewChildren("left", "left left", "left right", "left split") @@ -173,11 +173,11 @@ void shouldFindSplitFinishedWhenOnePartitionWasSplitButANewSplitIsNeeded() { tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); + update(before).initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); update(before).addFile(fileFactory(before).partitionFile("root", "test.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties) + update(after).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .buildList()); @@ -198,14 +198,14 @@ void shouldFindSplitFinishedWhenTableIsReinitialisedAndDataMovedToRoot() { tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties) + update(before).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point") .buildList()); update(before).addFile(fileFactory(before).partitionFile("left", "left.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); + update(after).initialise(new PartitionsBuilder(tableProperties).singlePartition("root").buildList()); update(after).addFile(fileFactory(after).partitionFile("root", "test.parquet", 11)); // When @@ -222,14 +222,14 @@ void shouldFindSplitNotFinishedWhenTableIsReinitialisedChangingRegionButPartitio tableProperties.setNumber(PARTITION_SPLIT_THRESHOLD, 10); StateStore before = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - before.initialise(new PartitionsBuilder(tableProperties) + update(before).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point before") .buildList()); update(before).addFile(fileFactory(before).partitionFile("left", "left.parquet", 11)); StateStore after = InMemoryTransactionLogStateStore.create(tableProperties, new InMemoryTransactionLogs()); - after.initialise(new PartitionsBuilder(tableProperties) + update(after).initialise(new PartitionsBuilder(tableProperties) .rootFirst("root") .splitToNewChildren("root", "left", "right", "split point after") .buildList()); diff --git a/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java b/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java index 674939ba7b..1d9abebff0 100644 --- a/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java +++ b/java/trino/src/test/java/sleeper/trino/testutils/PopulatedSleeperExternalResource.java @@ -65,6 +65,7 @@ import static sleeper.core.properties.table.TableProperty.TABLE_NAME; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; import static sleeper.localstack.test.LocalStackAwsV1ClientHelper.buildAwsV1Client; import static sleeper.localstack.test.LocalStackAwsV2ClientHelper.buildAwsV2Client; @@ -153,7 +154,7 @@ public void beforeAll(ExtensionContext context) throws Exception { tableDefinition); StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, configuration); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); - stateStore.initialise(new PartitionsFromSplitPoints(tableDefinition.schema, tableDefinition.splitPoints).construct()); + update(stateStore).initialise(new PartitionsFromSplitPoints(tableDefinition.schema, tableDefinition.splitPoints).construct()); ingestData(instanceProperties, stateStoreProvider, tableProperties, tableDefinition.recordStream.iterator()); } catch (Exception e) { throw new RuntimeException(e);