diff --git a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java index 11a374e801..ca0b8cfe3e 100644 --- a/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java +++ b/java/bulk-import/bulk-import-runner/src/main/java/sleeper/bulkimport/runner/BulkImportJobDriver.java @@ -37,7 +37,6 @@ import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.properties.table.TablePropertiesProvider; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreProvider; import sleeper.core.statestore.commit.StateStoreCommitRequest; @@ -116,16 +115,15 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce Instant finishTime = getTime.get(); boolean asyncCommit = tableProperties.getBoolean(BULK_IMPORT_FILES_COMMIT_ASYNC); try { + AddFilesTransaction transaction = AddFilesTransaction.builder() + .jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime) + .fileReferences(output.fileReferences()) + .build(); if (asyncCommit) { - AddFilesTransaction transaction = AddFilesTransaction.builder() - .jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime) - .files(AllReferencesToAFile.newFilesWithReferences(output.fileReferences())) - .build(); asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(), transaction)); LOGGER.info("Submitted asynchronous request to state store committer to add {} files for job {} in table {}", output.numFiles(), job.getId(), table); } else { - stateStoreProvider.getStateStore(tableProperties) - .addFiles(output.fileReferences()); + transaction.synchronousCommit(stateStoreProvider.getStateStore(tableProperties)); LOGGER.info("Added {} files to statestore for job {} in table {}", output.numFiles(), job.getId(), table); } } catch (RuntimeException e) { diff --git a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java index 8899b6d287..4661e37873 100644 --- a/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java +++ b/java/bulk-import/bulk-import-runner/src/test/java/sleeper/bulkimport/runner/BulkImportJobDriverTest.java @@ -23,7 +23,6 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.properties.testutils.FixedTablePropertiesProvider; import sleeper.core.schema.Schema; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; @@ -181,7 +180,7 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception { assertThat(commitRequestQueue).containsExactly(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID), AddFilesTransaction.builder() .jobId(job.getId()).taskId("test-task").jobRunId("test-run").writtenTime(finishTime) - .files(AllReferencesToAFile.newFilesWithReferences(outputFiles)) + .fileReferences(outputFiles) .build())); } diff --git a/java/clients/src/main/java/sleeper/clients/status/update/DeleteTable.java b/java/clients/src/main/java/sleeper/clients/status/update/DeleteTable.java index eb2a61d3a9..bd01889d86 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/DeleteTable.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/DeleteTable.java @@ -41,17 +41,18 @@ public class DeleteTable { private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTable.class); + private final AmazonS3 s3Client; private final InstanceProperties instanceProperties; private final TablePropertiesStore tablePropertiesStore; private final StateStoreProvider stateStoreProvider; public DeleteTable(AmazonS3 s3Client, AmazonDynamoDB dynamoDB, InstanceProperties instanceProperties) { - this(instanceProperties, s3Client, S3TableProperties.createStore(instanceProperties, s3Client, dynamoDB), + this(s3Client, instanceProperties, S3TableProperties.createStore(instanceProperties, s3Client, dynamoDB), StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDB, getConfigurationForClient())); } - public DeleteTable(InstanceProperties instanceProperties, AmazonS3 s3Client, TablePropertiesStore tablePropertiesStore, StateStoreProvider stateStoreProvider) { + public DeleteTable(AmazonS3 s3Client, InstanceProperties instanceProperties, TablePropertiesStore tablePropertiesStore, StateStoreProvider stateStoreProvider) { this.s3Client = s3Client; this.instanceProperties = instanceProperties; this.tablePropertiesStore = tablePropertiesStore; diff --git a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTable.java b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTable.java index b0665fe8aa..6f81e0fe32 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTable.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTable.java @@ -30,10 +30,13 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.properties.table.TablePropertiesProvider; import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.transaction.impl.ClearFilesTransaction; +import sleeper.core.statestore.transactionlog.transaction.impl.ClearPartitionsTransaction; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import sleeper.statestore.StateStoreFactory; -import java.io.IOException; import java.util.Objects; +import java.util.function.Function; import static sleeper.clients.util.BucketUtils.deleteObjectsInBucketWithPrefix; import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client; @@ -70,7 +73,11 @@ public ReinitialiseTable( } } - public void run() throws IOException { + public void run() { + run(tableProperties -> InitialisePartitionsTransaction.singlePartition(tableProperties.getSchema())); + } + + public void run(Function buildPartitions) { InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId); TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoDBClient); TableProperties tableProperties = tablePropertiesProvider.getByName(tableName); @@ -83,23 +90,18 @@ public void run() throws IOException { LOGGER.info("State store type: {}", stateStore.getClass().getName()); + new ClearFilesTransaction().synchronousCommit(stateStore); if (deletePartitions) { - stateStore.clearSleeperTable(); - } else { - stateStore.clearFileData(); + ClearPartitionsTransaction.create().synchronousCommit(stateStore); } deleteObjectsInBucketWithPrefix(s3Client, instanceProperties.get(DATA_BUCKET), tableProperties.get(TABLE_ID), key -> key.matches(tableProperties.get(TABLE_ID) + "/partition.*/.*")); if (deletePartitions) { LOGGER.info("Fully reinitialising table"); - initialiseStateStore(tableProperties, stateStore); + buildPartitions.apply(tableProperties).synchronousCommit(stateStore); } } - protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException { - stateStore.initialise(); - } - public static void main(String[] args) { if (args.length < 2 || args.length > 3) { throw new IllegalArgumentException("Usage: "); @@ -127,7 +129,7 @@ public static void main(String[] args) { ReinitialiseTable reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, deletePartitions); reinitialiseTable.run(); LOGGER.info("Table reinitialised successfully"); - } catch (RuntimeException | IOException e) { + } catch (RuntimeException e) { LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " + "The error message is as follows:\n\n" + e.getMessage() + "\n\nCause:" + e.getCause()); diff --git a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromExportedPartitions.java b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromExportedPartitions.java index 44b17304a8..74d13f6b30 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromExportedPartitions.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromExportedPartitions.java @@ -25,14 +25,14 @@ import sleeper.clients.status.partitions.ExportPartitions; import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionSerDe; -import sleeper.core.properties.table.TableProperties; import sleeper.core.schema.Schema; -import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -45,8 +45,9 @@ * is reinitialised using the partitions in the provided file. This file should * have been created using the class {@link ExportPartitions}. */ -public class ReinitialiseTableFromExportedPartitions extends ReinitialiseTable { +public class ReinitialiseTableFromExportedPartitions { private static final Logger LOGGER = LoggerFactory.getLogger(ReinitialiseTableFromExportedPartitions.class); + private final ReinitialiseTable reinitialiseTable; private final String partitionsFile; public ReinitialiseTableFromExportedPartitions( @@ -55,16 +56,15 @@ public ReinitialiseTableFromExportedPartitions( String instanceId, String tableName, String partitionsFile) { - super(s3Client, dynamoDBClient, instanceId, tableName, true); + this.reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, true); this.partitionsFile = partitionsFile; } - @Override - protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException { - stateStore.initialise(readPartitions(tableProperties.getSchema())); + public void run() { + reinitialiseTable.run(tableProperties -> new InitialisePartitionsTransaction(readPartitions(tableProperties.getSchema()))); } - private List readPartitions(Schema schema) throws IOException { + private List readPartitions(Schema schema) { PartitionSerDe partitionSerDe = new PartitionSerDe(schema); List partitions = new ArrayList<>(); LOGGER.info("Attempting to read partitions from file {}", partitionsFile); @@ -75,6 +75,8 @@ private List readPartitions(Schema schema) throws IOException { partitions.add(partitionSerDe.fromJson(line)); } } + } catch (IOException e) { + throw new UncheckedIOException(e); } LOGGER.info("Read {} partitions from file", partitions.size()); @@ -101,10 +103,11 @@ public static void main(String[] args) { AmazonDynamoDB dynamoDBClient = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard()); try { - ReinitialiseTable reinitialiseTable = new ReinitialiseTableFromExportedPartitions(s3Client, dynamoDBClient, instanceId, tableName, exportedPartitionsFile); + ReinitialiseTableFromExportedPartitions reinitialiseTable = new ReinitialiseTableFromExportedPartitions( + s3Client, dynamoDBClient, instanceId, tableName, exportedPartitionsFile); reinitialiseTable.run(); LOGGER.info("Table reinitialised successfully"); - } catch (RuntimeException | IOException e) { + } catch (RuntimeException e) { LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " + "The error message is as follows:\n\n" + e.getMessage() + "\n\nCause:" + e.getCause()); diff --git a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromSplitPoints.java b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromSplitPoints.java index 8dbd3d49d9..9cfcf5a2eb 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromSplitPoints.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/ReinitialiseTableFromSplitPoints.java @@ -22,11 +22,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionsFromSplitPoints; import sleeper.core.properties.table.TableProperties; -import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import static sleeper.configuration.ReadSplitPoints.readSplitPoints; @@ -37,8 +39,9 @@ * and all the information in the state store. Then the state store for the table * is reinitialised using the split points in the provided file. */ -public class ReinitialiseTableFromSplitPoints extends ReinitialiseTable { +public class ReinitialiseTableFromSplitPoints { private static final Logger LOGGER = LoggerFactory.getLogger(ReinitialiseTableFromSplitPoints.class); + private final ReinitialiseTable reinitialiseTable; private final boolean splitPointStringsBase64Encoded; private final String splitPointsFileLocation; @@ -49,15 +52,22 @@ public ReinitialiseTableFromSplitPoints( String tableName, String splitPointsFileLocation, boolean splitPointStringsBase64Encoded) { - super(s3Client, dynamoDBClient, instanceId, tableName, true); + this.reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, true); this.splitPointStringsBase64Encoded = splitPointStringsBase64Encoded; this.splitPointsFileLocation = splitPointsFileLocation; } - @Override - protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException { - List splitPoints = readSplitPoints(tableProperties, splitPointsFileLocation, splitPointStringsBase64Encoded); - stateStore.initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct()); + public void run() { + reinitialiseTable.run(tableProperties -> new InitialisePartitionsTransaction(readPartitions(tableProperties))); + } + + private List readPartitions(TableProperties tableProperties) { + try { + List splitPoints = readSplitPoints(tableProperties, splitPointsFileLocation, splitPointStringsBase64Encoded); + return new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } public static void main(String[] args) { @@ -83,11 +93,11 @@ public static void main(String[] args) { AmazonDynamoDB dynamoDBClient = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard()); try { - ReinitialiseTable reinitialiseTable = new ReinitialiseTableFromSplitPoints(s3Client, dynamoDBClient, instanceId, tableName, + ReinitialiseTableFromSplitPoints reinitialiseTable = new ReinitialiseTableFromSplitPoints(s3Client, dynamoDBClient, instanceId, tableName, splitPointsFile, splitPointsFileBase64Encoded); reinitialiseTable.run(); LOGGER.info("Table reinitialised successfully"); - } catch (RuntimeException | IOException e) { + } catch (RuntimeException e) { LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " + "The error message is as follows:\n\n" + e.getMessage() + "\n\nCause:" + e.getCause()); diff --git a/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java b/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java index 52490a2c49..446b32aa96 100644 --- a/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java +++ b/java/clients/src/test/java/sleeper/clients/status/update/DeleteTableIT.java @@ -33,6 +33,8 @@ import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.log.TransactionLogEntry; +import sleeper.core.statestore.transactionlog.log.TransactionLogRange; import sleeper.core.table.TableNotFoundException; import sleeper.core.table.TableStatus; import sleeper.core.util.ObjectFactory; @@ -41,6 +43,7 @@ import sleeper.ingest.runner.IngestRecords; import sleeper.localstack.test.LocalStackTestBase; import sleeper.statestore.StateStoreFactory; +import sleeper.statestore.transactionlog.DynamoDBTransactionLogStore; import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator; import sleeper.statestore.transactionlog.snapshots.DynamoDBTransactionLogSnapshotCreator; @@ -113,6 +116,8 @@ void shouldDeleteOnlyTable() throws Exception { assertThatThrownBy(() -> propertiesStore.loadByName("table-1")) .isInstanceOf(TableNotFoundException.class); assertThat(streamTableObjects(table)).isEmpty(); + assertThat(streamTableFileTransactions(table)).isEmpty(); + assertThat(streamTablePartitionTransactions(table)).isEmpty(); } @Test @@ -144,9 +149,13 @@ void shouldDeleteOneTableWhenAnotherTableIsPresent() throws Exception { assertThatThrownBy(() -> propertiesStore.loadByName("table-1")) .isInstanceOf(TableNotFoundException.class); assertThat(streamTableObjects(table1)).isEmpty(); + assertThat(streamTableFileTransactions(table1)).isEmpty(); + assertThat(streamTablePartitionTransactions(table1)).isEmpty(); assertThat(propertiesStore.loadByName("table-2")) .isEqualTo(table2); assertThat(streamTableObjects(table2)).isNotEmpty(); + assertThat(streamTableFileTransactions(table2)).isNotEmpty(); + assertThat(streamTablePartitionTransactions(table2)).isNotEmpty(); } @Test @@ -196,7 +205,7 @@ void shouldFailToDeleteTableThatDoesNotExist() { } private void deleteTable(String tableName) throws Exception { - new DeleteTable(instanceProperties, s3Client, propertiesStore, + new DeleteTable(s3Client, instanceProperties, propertiesStore, StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf)) .delete(tableName); } @@ -237,4 +246,14 @@ private Stream streamTableObjects(TableProperties tableProperti .getObjectSummaries().stream() .filter(s3ObjectSummary -> s3ObjectSummary.getSize() > 0); } + + private Stream streamTableFileTransactions(TableProperties tableProperties) { + return DynamoDBTransactionLogStore.forFiles(instanceProperties, tableProperties, dynamoClient, s3Client) + .readTransactions(TransactionLogRange.fromMinimum(1)); + } + + private Stream streamTablePartitionTransactions(TableProperties tableProperties) { + return DynamoDBTransactionLogStore.forPartitions(instanceProperties, tableProperties, dynamoClient, s3Client) + .readTransactions(TransactionLogRange.fromMinimum(1)); + } } diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/CompactionJobCommitterOrSendToLambda.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/CompactionJobCommitterOrSendToLambda.java index cb4ce0f5d3..fa4bcd1d33 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/CompactionJobCommitterOrSendToLambda.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/CompactionJobCommitterOrSendToLambda.java @@ -86,7 +86,8 @@ public void commit(CompactionJob job, CompactionJobFinishedEvent finishedEvent) } } else { LOGGER.debug("Committing compaction job {} inside compaction task", job.getId()); - stateStoreProvider.getStateStore(tableProperties).atomicallyReplaceFileReferencesWithNewOnes(List.of(request)); + new ReplaceFileReferencesTransaction(List.of(request)) + .synchronousCommit(stateStoreProvider.getStateStore(tableProperties)); tracker.jobCommitted(job.committedEventBuilder(timeSupplier.get()) .jobRunId(finishedEvent.getJobRunId()) .taskId(finishedEvent.getTaskId()) diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java index 366988bdd1..044d9d1f11 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java @@ -35,7 +35,7 @@ public interface AssignJobIdToFiles { static AssignJobIdToFiles synchronous(StateStore stateStore) { return (assignJobIdRequests, tableStatus) -> { - stateStore.assignJobIds(assignJobIdRequests); + new AssignJobIdsTransaction(assignJobIdRequests).synchronousCommit(stateStore); }; } diff --git a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java index 48c56ec0ca..4c69a2af56 100644 --- a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java @@ -19,8 +19,6 @@ import org.slf4j.LoggerFactory; import sleeper.core.partition.Partition; -import sleeper.core.statestore.exception.ReplaceRequestsFailedException; -import sleeper.core.statestore.exception.SplitRequestsFailedException; import sleeper.core.statestore.transactionlog.AddTransactionRequest; import java.time.Instant; @@ -31,7 +29,7 @@ /** * A state store implementation that delegates to implementations of a file reference and partition store. */ -public class DelegatingStateStore implements StateStore { +public abstract class DelegatingStateStore implements StateStore { public static final Logger LOGGER = LoggerFactory.getLogger(DelegatingStateStore.class); private final FileReferenceStore fileReferenceStore; private final PartitionStore partitionStore; @@ -41,56 +39,6 @@ public DelegatingStateStore(FileReferenceStore fileReferenceStore, PartitionStor this.partitionStore = partitionStore; } - @Override - public void addFiles(List fileReferences) throws StateStoreException { - if (fileReferences.isEmpty()) { - LOGGER.info("Ignoring addFiles call with no files"); - return; - } - fileReferenceStore.addFiles(fileReferences); - } - - @Override - public void addFilesWithReferences(List files) throws StateStoreException { - if (files.isEmpty()) { - LOGGER.info("Ignoring addFilesWithReferences call with no files"); - return; - } - fileReferenceStore.addFilesWithReferences(files); - } - - @Override - public void splitFileReferences(List splitRequests) throws SplitRequestsFailedException { - if (splitRequests.isEmpty()) { - LOGGER.info("Ignoring splitFileReferences call with no requests"); - return; - } - fileReferenceStore.splitFileReferences(splitRequests); - } - - @Override - public void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException { - fileReferenceStore.atomicallyReplaceFileReferencesWithNewOnes(requests); - } - - @Override - public void assignJobIds(List requests) throws StateStoreException { - if (requests.isEmpty()) { - LOGGER.info("Ignoring assignJobIds call with no requests"); - return; - } - fileReferenceStore.assignJobIds(requests); - } - - @Override - public void deleteGarbageCollectedFileReferenceCounts(List filenames) throws StateStoreException { - if (filenames.isEmpty()) { - LOGGER.info("Ignoring deleteGarbageCollectedFileReferenceCounts call with no files"); - return; - } - fileReferenceStore.deleteGarbageCollectedFileReferenceCounts(filenames); - } - @Override public List getFileReferences() throws StateStoreException { return fileReferenceStore.getFileReferences(); @@ -116,29 +64,6 @@ public AllReferencesToAllFiles getAllFilesWithMaxUnreferenced(int maxUnreference return fileReferenceStore.getAllFilesWithMaxUnreferenced(maxUnreferencedFiles); } - @Override - public void initialise() throws StateStoreException { - if (!hasNoFiles()) { - throw new StateStoreException("Cannot initialise state store when files are present"); - } - partitionStore.initialise(); - fileReferenceStore.initialise(); - } - - @Override - public void initialise(List partitions) throws StateStoreException { - if (!hasNoFiles()) { - throw new StateStoreException("Cannot initialise state store when files are present"); - } - partitionStore.initialise(partitions); - fileReferenceStore.initialise(); - } - - @Override - public void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, Partition newPartition1, Partition newPartition2) throws StateStoreException { - partitionStore.atomicallyUpdatePartitionAndCreateNewOnes(splitPartition, newPartition1, newPartition2); - } - @Override public List getAllPartitions() throws StateStoreException { return partitionStore.getAllPartitions(); @@ -159,16 +84,6 @@ public boolean hasNoFiles() throws StateStoreException { return fileReferenceStore.hasNoFiles(); } - @Override - public void clearFileData() throws StateStoreException { - fileReferenceStore.clearFileData(); - } - - @Override - public void clearPartitionData() throws StateStoreException { - partitionStore.clearPartitionData(); - } - @Override public void fixFileUpdateTime(Instant now) { fileReferenceStore.fixFileUpdateTime(now); diff --git a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStore.java b/java/core/src/main/java/sleeper/core/statestore/FileReferenceStore.java index cfec020bfc..d3c5e8e7dd 100644 --- a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/FileReferenceStore.java @@ -23,14 +23,7 @@ * Stores information about the data files in a Sleeper table. This includes a count of the number of references * to the file, and internal references which assign all the data in the file to non-overlapping partitions. */ -public interface FileReferenceStore extends FileReferenceStoreQueries, FileReferenceStoreUpdates { - - /** - * Performs extra setup steps that are needed before the file reference store can be used. - * - * @throws StateStoreException if initialisation fails - */ - void initialise() throws StateStoreException; +public interface FileReferenceStore extends FileReferenceStoreQueries { /** * Used to fix the time of file updates. Should only be called during tests. diff --git a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java b/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java deleted file mode 100644 index 7429523df1..0000000000 --- a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.core.statestore; - -import sleeper.core.statestore.exception.FileAlreadyExistsException; -import sleeper.core.statestore.exception.FileHasReferencesException; -import sleeper.core.statestore.exception.FileNotFoundException; -import sleeper.core.statestore.exception.FileReferenceAssignedToJobException; -import sleeper.core.statestore.exception.FileReferenceNotFoundException; -import sleeper.core.statestore.exception.ReplaceRequestsFailedException; -import sleeper.core.statestore.exception.SplitRequestsFailedException; - -import java.util.List; - -/** - * Handles updates to the data files in a Sleeper table. This includes a count of the number of references - * to the file, and internal references which assign all the data in the file to non-overlapping partitions. - */ -public interface FileReferenceStoreUpdates { - - /** - * Adds files to the Sleeper table, with any number of references. Each reference to be added should be for a file - * which does not yet exist in the table. - *

- * When adding multiple references for a file, a file must never be referenced in two partitions where one is a - * descendent of another. This means each record in a file must only be covered by one reference. A partition covers - * a range of records. A partition which is the child of another covers a sub-range within the parent partition. - * - * @param fileReferences The file references to be added - * @throws FileAlreadyExistsException if a file already exists - * @throws StateStoreException if the update fails for another reason - */ - default void addFiles(List fileReferences) throws StateStoreException { - addFilesWithReferences(AllReferencesToAFile.newFilesWithReferences(fileReferences)); - } - - /** - * Adds files to the Sleeper table, with any number of references. Each new file should be specified once, with all - * its references. - *

- * A file must never be referenced in two partitions where one is a descendent of another. This means each record in - * a file must only be covered by one reference. A partition covers a range of records. A partition which is the - * child of another covers a sub-range within the parent partition. - * - * @param files The files to be added - * @throws FileAlreadyExistsException if a file already exists - * @throws StateStoreException if the update fails for another reason - */ - void addFilesWithReferences(List files) throws StateStoreException; - - /** - * Performs atomic updates to split file references. This is used to push file references down the partition tree, - * eg. where records are ingested to a non-leaf partition, or when a partition is split. A file referenced in a - * larger, non-leaf partition may be split between smaller partitions which cover non-overlapping sub-ranges of the - * original partition. This includes these records in compactions of the descendent partitions. - *

- * The aim is to combine all records into a small number of files for each leaf partition, where the leaves of the - * partition tree should represent a separation of the data into manageable chunks. Compaction operates on file - * references to pull records from multiple files into one, when they are referenced in the same partition. This - * reduces the number of files in the system, and improves statistics and indexing within each partition. This - * should result in faster queries, and more accurate partitioning when a partition is split. - *

- * Each {@link SplitFileReferenceRequest} will remove one file reference, and create new references to the same file - * in descendent partitions. The reference counts will be tracked accordingly. - *

- * The ranges covered by the partitions of the new references must not overlap, so there - * must never be two references to the same file where one partition is a descendent of the other. - *

- * Note that it is possible that the necessary updates may not fit in a single transaction. Each - * {@link SplitFileReferenceRequest} is guaranteed to be done atomically in one transaction, but it is possible that - * some may succeed and some may fail. If a single {@link SplitFileReferenceRequest} adds too many references to - * apply in one transaction, this will also fail. - * - * @param splitRequests A list of {@link SplitFileReferenceRequest}s to apply - * @throws SplitRequestsFailedException if any of the requests fail, even if some succeeded - */ - void splitFileReferences(List splitRequests) throws SplitRequestsFailedException; - - /** - * Atomically applies the results of jobs. Removes file references for a job's input files, and adds a reference to - * an output file. This will be used for compaction. - *

- * This will validate that the input files were assigned to the job. - *

- * This will decrement the number of references for each of the input files. If no other references exist for those - * files, they will become available for garbage collection. - * - * @param requests requests for jobs to each have their results atomically applied - * @throws ReplaceRequestsFailedException if any of the updates fail - */ - void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException; - - /** - * Atomically updates the job field of file references, as long as the job field is currently unset. This will be - * used for compaction job input files. - * - * @param requests A list of {@link AssignJobIdRequest}s which should each be applied - * atomically - * @throws FileReferenceNotFoundException if a reference does not exist - * @throws FileReferenceAssignedToJobException if a reference is already assigned to a job - * @throws StateStoreException if the update fails for another reason - */ - void assignJobIds(List requests) throws StateStoreException; - - /** - * Records that files were garbage collected and have been deleted. The reference counts for those files should be - * deleted. - *

- * If there are any remaining internal references for the files on partitions, this should fail, as it should not be - * possible to reach that state. - *

- * If the reference count is non-zero for any other reason, it may be that the count was incremented after the file - * was ready for garbage collection. This should fail in that case as well, as we would like this to not be - * possible. - * - * @param filenames The names of files that were deleted. - * @throws FileNotFoundException if a file does not exist - * @throws FileHasReferencesException if a file still has references - * @throws StateStoreException if the update fails for another reason - */ - void deleteGarbageCollectedFileReferenceCounts(List filenames) throws StateStoreException; - - /** - * Clears all file data from the file reference store. Note that this does not delete any of the actual files. - */ - void clearFileData() throws StateStoreException; -} diff --git a/java/core/src/main/java/sleeper/core/statestore/PartitionStore.java b/java/core/src/main/java/sleeper/core/statestore/PartitionStore.java index 0810ff6bfb..9ec33deeb0 100644 --- a/java/core/src/main/java/sleeper/core/statestore/PartitionStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/PartitionStore.java @@ -16,7 +16,6 @@ package sleeper.core.statestore; import sleeper.core.partition.Partition; -import sleeper.core.partition.PartitionTree; import sleeper.core.statestore.transactionlog.AddTransactionRequest; import java.time.Instant; @@ -27,19 +26,6 @@ */ public interface PartitionStore { - /** - * Atomically splits a partition to create child partitions. Updates the existing partition to record it as split, - * and creates new leaf partitions. - * - * @param splitPartition The {@link Partition} to be updated (must refer to the new leaves as children). - * @param newPartition1 The first new {@link Partition} (must be a leaf partition). - * @param newPartition2 The second new {@link Partition} (must be a leaf partition). - * @throws StateStoreException if split is not valid or update fails - */ - void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, - Partition newPartition1, - Partition newPartition2) throws StateStoreException; - /** * Returns all partitions. * @@ -65,29 +51,6 @@ void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, */ Partition getPartition(String partitionId) throws StateStoreException; - /** - * Initialises the store with a single partition covering all keys. This is the root partition which may be split - * in the future. - * - * @throws StateStoreException if update fails - */ - void initialise() throws StateStoreException; - - /** - * Initialises the store with a list of all partitions. This must be a complete {@link PartitionTree}. - * - * @param partitions The initial list of {@link Partition}s - * @throws StateStoreException if partitions not provided or update fails - */ - void initialise(List partitions) throws StateStoreException; - - /** - * Clears all partition data from the store. Note that this will invalidate any file references held in the store, - * so this should only be used when no files are present. The store must be initialised before the Sleeper table can - * be used again. Any file references will need to be added again. - */ - void clearPartitionData() throws StateStoreException; - /** * Used to fix the time of partition updates. Should only be called during tests. * diff --git a/java/core/src/main/java/sleeper/core/statestore/SplitFileReferences.java b/java/core/src/main/java/sleeper/core/statestore/SplitFileReferences.java index df5c50dabb..ec7f8f19b2 100644 --- a/java/core/src/main/java/sleeper/core/statestore/SplitFileReferences.java +++ b/java/core/src/main/java/sleeper/core/statestore/SplitFileReferences.java @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import sleeper.core.partition.Partition; +import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction; import java.util.ArrayList; import java.util.List; @@ -67,9 +68,7 @@ public void split() throws StateStoreException { .map(fileReference -> splitFileInPartition(fileReference, partition))) .forEach(splitRequests::add); LOGGER.info("Found {} files in non-leaf partitions that need splitting", splitRequests.size()); - if (!splitRequests.isEmpty()) { - stateStore.splitFileReferences(splitRequests); - } + new SplitFileReferencesTransaction(splitRequests).synchronousCommit(stateStore); } private static SplitFileReferenceRequest splitFileInPartition(FileReference file, Partition partition) { diff --git a/java/core/src/main/java/sleeper/core/statestore/StateStore.java b/java/core/src/main/java/sleeper/core/statestore/StateStore.java index 84fc724b4d..aae5a472eb 100644 --- a/java/core/src/main/java/sleeper/core/statestore/StateStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/StateStore.java @@ -24,16 +24,14 @@ * and the {@link sleeper.core.partition.Partition}s). */ public interface StateStore extends FileReferenceStore, PartitionStore { + /** * Clears all file data and partition data from the state store. Note that this does not delete any of the actual * files, and after calling this method the store must be initialised before the Sleeper table can be used again. * * @throws StateStoreException if the update fails */ - default void clearSleeperTable() throws StateStoreException { - clearFileData(); - clearPartitionData(); - } + void clearSleeperTable(); /** * Applies a transaction to the state store. diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStore.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStore.java index 4c3dcb0a14..622d3cfed9 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogFileReferenceStore.java @@ -18,25 +18,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.AllReferencesToAllFiles; -import sleeper.core.statestore.AssignJobIdRequest; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceStore; -import sleeper.core.statestore.ReplaceFileReferencesRequest; -import sleeper.core.statestore.SplitFileReferenceRequest; import sleeper.core.statestore.StateStoreException; -import sleeper.core.statestore.exception.ReplaceRequestsFailedException; -import sleeper.core.statestore.exception.SplitRequestsFailedException; import sleeper.core.statestore.transactionlog.log.TransactionLogEntry; import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; -import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction; -import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction; import sleeper.core.statestore.transactionlog.transaction.impl.ClearFilesTransaction; -import sleeper.core.statestore.transactionlog.transaction.impl.DeleteFilesTransaction; -import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction; -import sleeper.core.statestore.transactionlog.transaction.impl.SplitFileReferencesTransaction; import java.time.Clock; import java.time.Instant; @@ -59,46 +48,6 @@ class TransactionLogFileReferenceStore implements FileReferenceStore { this.head = head; } - @Override - public void addFilesWithReferences(List files) throws StateStoreException { - AddFilesTransaction transaction = new AddFilesTransaction(files); - transaction.validateFiles(head.state()); - head.addTransaction(clock.instant(), transaction); - } - - @Override - public void assignJobIds(List requests) throws StateStoreException { - AssignJobIdsTransaction transaction = new AssignJobIdsTransaction(requests); - if (transaction.isEmpty()) { - LOGGER.info("Ignoring assignJobIds call with no file assignments, received requests: {}", requests); - } else { - head.addTransaction(clock.instant(), transaction); - } - } - - @Override - public void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException { - try { - for (ReplaceFileReferencesRequest request : requests) { - request.validateNewReference(); - request.validateStateChange(head.state()); - } - head.addTransaction(clock.instant(), new ReplaceFileReferencesTransaction(requests)); - } catch (StateStoreException e) { - throw new ReplaceRequestsFailedException(requests, e); - } - } - - @Override - public void clearFileData() throws StateStoreException { - head.addTransaction(clock.instant(), new ClearFilesTransaction()); - } - - @Override - public void deleteGarbageCollectedFileReferenceCounts(List filenames) throws StateStoreException { - head.addTransaction(clock.instant(), new DeleteFilesTransaction(filenames)); - } - @Override public void fixFileUpdateTime(Instant time) { clock = Clock.fixed(time, ZoneId.of("UTC")); @@ -131,19 +80,6 @@ public boolean hasNoFiles() throws StateStoreException { return files().isEmpty(); } - @Override - public void initialise() throws StateStoreException { - } - - @Override - public void splitFileReferences(List splitRequests) throws SplitRequestsFailedException { - try { - head.addTransaction(clock.instant(), new SplitFileReferencesTransaction(splitRequests)); - } catch (StateStoreException e) { - throw new SplitRequestsFailedException(List.of(), splitRequests, e); - } - } - void updateFromLog() throws StateStoreException { head.update(); } @@ -157,6 +93,10 @@ void applyEntryFromLog(TransactionLogEntry logEntry, StateListenerBeforeApply li head.applyTransactionUpdatingIfNecessary(logEntry, listener); } + void clearTransactionLog() { + head.clearTransactionLog(new ClearFilesTransaction(), clock.instant()); + } + private StateStoreFiles files() throws StateStoreException { head.update(); return head.state(); diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java index 56e8e65bf6..b8f7a31610 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogHead.java @@ -307,6 +307,12 @@ public long getLastTransactionNumber() { return lastTransactionNumber; } + void clearTransactionLog(StateStoreTransaction updateState, Instant time) { + logStore.deleteTransactionsAtOrBefore(Long.MAX_VALUE); + updateState.apply(state, time); + lastTransactionNumber = 0; + } + /** * Builder to initialise the head to read from a transaction log and snapshots. * diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java index 9f3935c866..eb2e19ce32 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogPartitionStore.java @@ -16,14 +16,12 @@ package sleeper.core.statestore.transactionlog; import sleeper.core.partition.Partition; -import sleeper.core.schema.Schema; import sleeper.core.statestore.PartitionStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.transactionlog.log.TransactionLogEntry; import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply; import sleeper.core.statestore.transactionlog.state.StateStorePartitions; -import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; -import sleeper.core.statestore.transactionlog.transaction.impl.SplitPartitionTransaction; +import sleeper.core.statestore.transactionlog.transaction.impl.ClearPartitionsTransaction; import java.time.Clock; import java.time.Instant; @@ -35,25 +33,13 @@ */ class TransactionLogPartitionStore implements PartitionStore { - private final Schema schema; private final TransactionLogHead head; private Clock clock = Clock.systemUTC(); - TransactionLogPartitionStore(Schema schema, TransactionLogHead head) { - this.schema = schema; + TransactionLogPartitionStore(TransactionLogHead head) { this.head = head; } - @Override - public void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, Partition newPartition1, Partition newPartition2) throws StateStoreException { - head.addTransaction(clock.instant(), new SplitPartitionTransaction(splitPartition, List.of(newPartition1, newPartition2))); - } - - @Override - public void clearPartitionData() throws StateStoreException { - head.addTransaction(clock.instant(), new InitialisePartitionsTransaction(List.of())); - } - @Override public List getAllPartitions() throws StateStoreException { return partitions().all().stream().toList(); @@ -70,16 +56,6 @@ public Partition getPartition(String partitionId) throws StateStoreException { .orElseThrow(() -> new StateStoreException("Partition not found: " + partitionId)); } - @Override - public void initialise() throws StateStoreException { - head.addTransaction(clock.instant(), InitialisePartitionsTransaction.singlePartition(schema)); - } - - @Override - public void initialise(List partitions) throws StateStoreException { - head.addTransaction(clock.instant(), new InitialisePartitionsTransaction(partitions)); - } - @Override public void fixPartitionUpdateTime(Instant time) { clock = Clock.fixed(time, ZoneId.of("UTC")); @@ -103,6 +79,10 @@ void applyEntryFromLog(TransactionLogEntry logEntry, StateListenerBeforeApply li head.applyTransactionUpdatingIfNecessary(logEntry, listener); } + void clearTransactionLog() { + head.clearTransactionLog(ClearPartitionsTransaction.create(), clock.instant()); + } + private StateStorePartitions partitions() throws StateStoreException { head.update(); return head.state(); diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogStateStore.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogStateStore.java index dfd665ef08..b3a1ed7b2b 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogStateStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/TransactionLogStateStore.java @@ -15,7 +15,6 @@ */ package sleeper.core.statestore.transactionlog; -import sleeper.core.schema.Schema; import sleeper.core.statestore.DelegatingStateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.transactionlog.log.TransactionBodyStore; @@ -61,12 +60,11 @@ private TransactionLogStateStore(Builder builder) { } private TransactionLogStateStore(Builder builder, TransactionLogHead.Builder headBuilder) { - this(builder.schema, - headBuilder.forFiles() - .logStore(builder.filesLogStore) - .snapshotLoader(builder.filesSnapshotLoader) - .stateUpdateClock(builder.filesStateUpdateClock) - .build(), + this(headBuilder.forFiles() + .logStore(builder.filesLogStore) + .snapshotLoader(builder.filesSnapshotLoader) + .stateUpdateClock(builder.filesStateUpdateClock) + .build(), headBuilder.forPartitions() .logStore(builder.partitionsLogStore) .snapshotLoader(builder.partitionsSnapshotLoader) @@ -74,9 +72,9 @@ private TransactionLogStateStore(Builder builder, TransactionLogHead.Builder .build()); } - private TransactionLogStateStore(Schema schema, TransactionLogHead filesHead, TransactionLogHead partitionsHead) { + private TransactionLogStateStore(TransactionLogHead filesHead, TransactionLogHead partitionsHead) { this(new TransactionLogFileReferenceStore(filesHead), - new TransactionLogPartitionStore(schema, partitionsHead)); + new TransactionLogPartitionStore(partitionsHead)); } private TransactionLogStateStore(TransactionLogFileReferenceStore files, TransactionLogPartitionStore partitions) { @@ -111,6 +109,12 @@ public void applyEntryFromLog(TransactionLogEntry logEntry, StateListenerBeforeA } } + @Override + public void clearSleeperTable() { + files.clearTransactionLog(); + partitions.clearTransactionLog(); + } + public static Builder builder() { return new Builder(); } @@ -120,7 +124,6 @@ public static Builder builder() { */ public static class Builder { private TableStatus sleeperTable; - private Schema schema; private TransactionLogStore filesLogStore; private TransactionLogStore partitionsLogStore; private TransactionBodyStore transactionBodyStore; @@ -149,17 +152,6 @@ public Builder sleeperTable(TableStatus sleeperTable) { return this; } - /** - * Sets the schema of the Sleeper table. Used to initialise partitions. - * - * @param schema the schema - * @return the builder - */ - public Builder schema(Schema schema) { - this.schema = schema; - return this; - } - /** * Sets the transaction log for file reference transactions. * diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java index 6edc62b3f1..76de8c6017 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AddFilesTransaction.java @@ -19,8 +19,12 @@ import org.slf4j.LoggerFactory; import sleeper.core.statestore.AllReferencesToAFile; +import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileAlreadyExistsException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; +import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -34,7 +38,12 @@ import java.util.Objects; /** - * A transaction to add files to the state store. + * Adds files to the Sleeper table, with any number of references. Each new file should be specified once, with all + * its references. Once a file has been added, it may not be added again, even as a reference on a different partition. + *

+ * A file must never be referenced in two partitions where one is a descendent of another. This means each record in + * a file must only be covered by one reference. A partition covers a range of records. A partition which is the + * child of another covers a sub-range within the parent partition. */ public class AddFilesTransaction implements FileReferenceTransaction { @@ -63,6 +72,30 @@ public static Builder builder() { return new Builder(); } + /** + * Creates a transaction to add files with the given references. + * + * @param fileReferences the file references + * @return the transaction + */ + public static AddFilesTransaction fromReferences(List fileReferences) { + return builder().fileReferences(fileReferences).build(); + } + + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws FileAlreadyExistsException if a file already exists + * @throws StateStoreException if the update fails for another reason + */ + public void synchronousCommit(StateStore stateStore) throws StateStoreException { + stateStore.addTransaction(AddTransactionRequest.withTransaction(this) + .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> validateFiles(state))) + .build()); + } + @Override public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException { // We want to update the job tracker whether the new files are valid or not, and the job tracker is updated @@ -75,8 +108,8 @@ public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException * regardless of whether the files may be added, so that any failure can be reported to the job tracker after the * fact. * - * @param stateStoreFiles the state before the transaction - * @throws StateStoreException thrown if the files should not be added + * @param stateStoreFiles the state before the transaction + * @throws FileAlreadyExistsException if a file already exists */ public void validateFiles(StateStoreFiles stateStoreFiles) throws StateStoreException { for (AllReferencesToAFile file : files) { @@ -265,6 +298,16 @@ public Builder files(List files) { return this; } + /** + * Sets the files to add to the state store. + * + * @param fileReferences the file references to add + * @return this builder + */ + public Builder fileReferences(List fileReferences) { + return files(AllReferencesToAFile.newFilesWithReferences(fileReferences)); + } + public AddFilesTransaction build() { return new AddFilesTransaction(this); } diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java index 1f74f41318..d6fb83b847 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java @@ -20,9 +20,11 @@ import sleeper.core.statestore.AssignJobIdRequest; import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileReferenceAssignedToJobException; import sleeper.core.statestore.exception.FileReferenceNotFoundException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -34,7 +36,8 @@ import static java.util.stream.Collectors.toUnmodifiableList; /** - * A transaction to assign files to jobs. + * Atomically updates the job field of file references, as long as the job field is currently unset. This will be + * used for compaction job input files. */ public class AssignJobIdsTransaction implements FileReferenceTransaction { public static final Logger LOGGER = LoggerFactory.getLogger(AssignJobIdsTransaction.class); @@ -47,6 +50,19 @@ public AssignJobIdsTransaction(List requests) { .collect(toUnmodifiableList()); } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws FileReferenceNotFoundException if a reference does not exist + * @throws FileReferenceAssignedToJobException if a reference is already assigned to a job + * @throws StateStoreException if the update fails for another reason + */ + public void synchronousCommit(StateStore stateStore) { + stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException { for (AssignJobIdRequest request : requests) { diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearFilesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearFilesTransaction.java index ebdb8b20ae..dbe2827c8a 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearFilesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearFilesTransaction.java @@ -15,7 +15,9 @@ */ package sleeper.core.statestore.transactionlog.transaction.impl; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -23,7 +25,7 @@ import java.util.Objects; /** - * A transaction to delete all files in the state store. + * Clears all file data from the file reference store. Note that this does not delete any of the actual files. */ public class ClearFilesTransaction implements FileReferenceTransaction { @@ -31,6 +33,17 @@ public class ClearFilesTransaction implements FileReferenceTransaction { public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException { } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws StateStoreException if the update fails + */ + public void synchronousCommit(StateStore stateStore) throws StateStoreException { + stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public void apply(StateStoreFiles stateStoreFiles, Instant updateTime) { stateStoreFiles.clear(); diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearPartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearPartitionsTransaction.java new file mode 100644 index 0000000000..add6241906 --- /dev/null +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ClearPartitionsTransaction.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.core.statestore.transactionlog.transaction.impl; + +import java.util.List; + +/** + * Clears all partition data from the store. Note that this will invalidate any file references held in the store, + * so this should only be used when no files are present. The store must be initialised before the Sleeper table can + * be used again. Any file references will need to be added again. + */ +public class ClearPartitionsTransaction { + + private ClearPartitionsTransaction() { + } + + /** + * Creates a transaction to clear partitions in the table. + * + * @return the transaction + */ + public static InitialisePartitionsTransaction create() { + return new InitialisePartitionsTransaction(List.of()); + } + +} diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/DeleteFilesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/DeleteFilesTransaction.java index e102d0390d..2ab4e26715 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/DeleteFilesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/DeleteFilesTransaction.java @@ -15,9 +15,11 @@ */ package sleeper.core.statestore.transactionlog.transaction.impl; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileHasReferencesException; import sleeper.core.statestore.exception.FileNotFoundException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -27,7 +29,15 @@ import java.util.Objects; /** - * A transaction to delete files that have no remaining references. This is used to perform garbage collection. + * Records that files were garbage collected and have been deleted. The reference counts for those files should be + * deleted. + *

+ * If there are any remaining internal references for the files on partitions, this should fail, as it should not be + * possible to reach that state. + *

+ * If the reference count is non-zero for any other reason, it may be that the count was incremented after the file + * was ready for garbage collection. This should fail in that case as well, as we would like this to not be + * possible. */ public class DeleteFilesTransaction implements FileReferenceTransaction { @@ -37,6 +47,19 @@ public DeleteFilesTransaction(List filenames) { this.filenames = filenames; } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws FileNotFoundException if a file does not exist + * @throws FileHasReferencesException if a file still has references + * @throws StateStoreException if the update fails for another reason + */ + public void synchronousCommit(StateStore stateStore) throws StateStoreException { + stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException { for (String filename : filenames) { diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java index c37e9162e1..0bf0c246f2 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/InitialisePartitionsTransaction.java @@ -20,6 +20,7 @@ import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStorePartitions; import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction; @@ -29,8 +30,9 @@ import java.util.Objects; /** - * A transaction to set all partitions in a Sleeper table. This should specify the whole partition tree. Any partitions - * that were present before will be deleted. + * Sets all partitions in a Sleeper table. These should build into a complete partition tree, where there is a single + * root and all partitions are connected to that either directly or indirectly via other partitions. Any partitions that + * were present before will be deleted. */ public class InitialisePartitionsTransaction implements PartitionTransaction { @@ -41,7 +43,8 @@ public InitialisePartitionsTransaction(List partitions) { } /** - * Creates a transaction to initialise the table with a single root partition. + * Creates a transaction to initialise the table with a single partition covering all keys. This is the root + * partition which may be split in the future. * * @param schema the table schema * @return the transaction @@ -50,6 +53,17 @@ public static InitialisePartitionsTransaction singlePartition(Schema schema) { return new InitialisePartitionsTransaction(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct()); } + /** + * Commits this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws StateStoreException if the update fails + */ + public void synchronousCommit(StateStore stateStore) throws StateStoreException { + stateStore.addPartitionsTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public void validate(StateStorePartitions stateStorePartitions) { } diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java index 5764694295..9a273087d2 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/ReplaceFileReferencesTransaction.java @@ -19,12 +19,16 @@ import org.slf4j.LoggerFactory; import sleeper.core.statestore.ReplaceFileReferencesRequest; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileAlreadyExistsException; import sleeper.core.statestore.exception.FileNotFoundException; import sleeper.core.statestore.exception.FileReferenceNotAssignedToJobException; import sleeper.core.statestore.exception.FileReferenceNotFoundException; import sleeper.core.statestore.exception.NewReferenceSameAsOldReferenceException; +import sleeper.core.statestore.exception.ReplaceRequestsFailedException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; +import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -38,8 +42,13 @@ import static java.util.stream.Collectors.toUnmodifiableList; /** - * A transaction to remove a number of file references that were assigned to a job, and replace them with a new file. - * This can be used to apply the results of a compaction. + * Atomically applies the results of jobs. Removes file references for a job's input files, and adds a reference to + * an output file. This will be used for compaction. + *

+ * This will validate that the input files were assigned to the job. + *

+ * This will decrement the number of references for each of the input files. If no other references exist for those + * files, they will become available for garbage collection. */ public class ReplaceFileReferencesTransaction implements FileReferenceTransaction { public static final Logger LOGGER = LoggerFactory.getLogger(ReplaceFileReferencesTransaction.class); @@ -50,8 +59,29 @@ public ReplaceFileReferencesTransaction(List jobs) this.jobs = jobs.stream() .map(job -> job.withNoUpdateTime()) .collect(toUnmodifiableList()); - for (ReplaceFileReferencesRequest job : jobs) { - job.validateNewReference(); + try { + for (ReplaceFileReferencesRequest job : jobs) { + job.validateNewReference(); + } + } catch (StateStoreException e) { + throw new ReplaceRequestsFailedException(jobs, e); + } + } + + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws ReplaceRequestsFailedException if any of the updates fail + */ + public void synchronousCommit(StateStore stateStore) throws ReplaceRequestsFailedException { + try { + stateStore.addTransaction(AddTransactionRequest.withTransaction(this) + .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> validateStateChange(state))) + .build()); + } catch (StateStoreException e) { + throw new ReplaceRequestsFailedException(jobs, e); } } diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitFileReferencesTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitFileReferencesTransaction.java index 229cb6f9c6..2e6181334c 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitFileReferencesTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitFileReferencesTransaction.java @@ -17,11 +17,14 @@ import sleeper.core.statestore.FileReference; import sleeper.core.statestore.SplitFileReferenceRequest; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileNotFoundException; import sleeper.core.statestore.exception.FileReferenceAlreadyExistsException; import sleeper.core.statestore.exception.FileReferenceAssignedToJobException; import sleeper.core.statestore.exception.FileReferenceNotFoundException; +import sleeper.core.statestore.exception.SplitRequestsFailedException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -33,8 +36,27 @@ import static java.util.stream.Collectors.toUnmodifiableList; /** - * A transaction to replace file references with new references to the same file on other partitions. Used when a file - * is referenced on a partition that has child partitions, to push it down the partition tree. + * Performs atomic updates to split file references. This is used to push file references down the partition tree, + * eg. where records are ingested to a non-leaf partition, or when a partition is split. A file referenced in a + * larger, non-leaf partition may be split between smaller partitions which cover non-overlapping sub-ranges of the + * original partition. This includes these records in compactions of the descendent partitions. + *

+ * The aim is to combine all records into a small number of files for each leaf partition, where the leaves of the + * partition tree should represent a separation of the data into manageable chunks. Compaction operates on file + * references to pull records from multiple files into one, when they are referenced in the same partition. This + * reduces the number of files in the system, and improves statistics and indexing within each partition. This + * should result in faster queries, and more accurate partitioning when a partition is split. + *

+ * Each {@link SplitFileReferenceRequest} will remove one file reference, and create new references to the same file + * in descendent partitions. The reference counts will be tracked accordingly. + *

+ * The ranges covered by the partitions of the new references must not overlap, so there must never be two references to + * the same file where one partition is a descendent of the other. + *

+ * Currently this is always done atomically, but future implementations of the state store may not fit this in a single + * transaction. Each {@link SplitFileReferenceRequest} is guaranteed to be done atomically in one transaction, but it is + * possible that some may succeed and some may fail. If a single {@link SplitFileReferenceRequest} adds too many + * references to apply in one transaction, this will also fail. */ public class SplitFileReferencesTransaction implements FileReferenceTransaction { @@ -46,6 +68,21 @@ public SplitFileReferencesTransaction(List requests) .collect(toUnmodifiableList()); } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws SplitRequestsFailedException if any of the requests fail, even if some succeeded + */ + public void synchronousCommit(StateStore stateStore) { + try { + stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build()); + } catch (StateStoreException e) { + throw new SplitRequestsFailedException(List.of(), requests, e); + } + } + @Override public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException { for (SplitFileReferenceRequest request : requests) { diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitPartitionTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitPartitionTransaction.java index f7d2f25867..f6f6025e16 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitPartitionTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/SplitPartitionTransaction.java @@ -16,7 +16,9 @@ package sleeper.core.statestore.transactionlog.transaction.impl; import sleeper.core.partition.Partition; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStorePartitions; import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction; @@ -29,8 +31,11 @@ import static java.util.stream.Collectors.toUnmodifiableSet; /** - * A transaction to add new partitions to the tree. This is done by setting child partitions for a partition that was - * previously a leaf partition. This will turn the leaf partition into a parent and add new leaf partitions below it. + * Atomically splits a partition to create child partitions. This is done by setting child partitions for a partition + * that was previously a leaf partition. + *

+ * The new partitions must be leaf partitions. The parent partition must already exist, and will be replaced with the + * updated version. The new version must refer to the new leaf partitions as children. */ public class SplitPartitionTransaction implements PartitionTransaction { @@ -42,6 +47,17 @@ public SplitPartitionTransaction(Partition parent, List newChildren) this.newChildren = newChildren; } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + * @throws StateStoreException if the split is not valid or the update fails + */ + public void synchronousCommit(StateStore stateStore) { + stateStore.addPartitionsTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public void validate(StateStorePartitions stateStorePartitions) throws StateStoreException { Partition existingParent = stateStorePartitions.byId(parent.getId()) diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogSnapshotSetup.java b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogSnapshotSetup.java index 9007e6dfd9..43672b3179 100644 --- a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogSnapshotSetup.java +++ b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogSnapshotSetup.java @@ -58,7 +58,6 @@ public static InMemoryTransactionLogSnapshotSetup setupSnapshotWithFreshState( InMemoryTransactionBodyStore transactionBodyStore = new InMemoryTransactionBodyStore(); StateStore stateStore = TransactionLogStateStore.builder() .sleeperTable(sleeperTable) - .schema(schema) .filesLogStore(fileTransactions) .partitionsLogStore(partitionTransactions) .transactionBodyStore(transactionBodyStore) diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogs.java b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogs.java index 663d6f87f1..9eee3d0078 100644 --- a/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogs.java +++ b/java/core/src/test/java/sleeper/core/statestore/testutils/InMemoryTransactionLogs.java @@ -90,7 +90,6 @@ public static InMemoryTransactionLogs recordRetryWaits(List retryWaits public TransactionLogStateStore.Builder stateStoreBuilder(TableStatus sleeperTable, Schema schema) { return TransactionLogStateStore.builder() .sleeperTable(sleeperTable) - .schema(schema) .filesLogStore(filesLogStore) .filesSnapshotLoader(filesSnapshots) .partitionsLogStore(partitionsLogStore) diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java index cf1b6f0287..c78321bb75 100644 --- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java +++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java @@ -34,9 +34,6 @@ import sleeper.core.statestore.exception.FileReferenceNotFoundException; import sleeper.core.statestore.exception.ReplaceRequestsFailedException; import sleeper.core.statestore.exception.SplitRequestsFailedException; -import sleeper.core.statestore.transactionlog.AddTransactionRequest; -import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply; -import sleeper.core.statestore.transactionlog.transaction.StateStoreTransaction; import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction; import sleeper.core.statestore.transactionlog.transaction.impl.AssignJobIdsTransaction; import sleeper.core.statestore.transactionlog.transaction.impl.ClearFilesTransaction; @@ -78,7 +75,7 @@ public static StateStoreUpdatesWrapper update(StateStore stateStore) { * @throws StateStoreException if the update fails */ public void initialise(Schema schema) throws StateStoreException { - addTransaction(InitialisePartitionsTransaction.singlePartition(schema)); + InitialisePartitionsTransaction.singlePartition(schema).synchronousCommit(stateStore); } /** @@ -89,7 +86,7 @@ public void initialise(Schema schema) throws StateStoreException { * @throws StateStoreException if the update fails */ public void initialise(List partitions) throws StateStoreException { - addTransaction(new InitialisePartitionsTransaction(partitions)); + new InitialisePartitionsTransaction(partitions).synchronousCommit(stateStore); } /** @@ -102,7 +99,7 @@ public void initialise(List partitions) throws StateStoreException { * @throws StateStoreException if split is not valid or update fails */ public void atomicallyUpdatePartitionAndCreateNewOnes(Partition splitPartition, Partition newPartition1, Partition newPartition2) throws StateStoreException { - addTransaction(new SplitPartitionTransaction(splitPartition, List.of(newPartition1, newPartition2))); + new SplitPartitionTransaction(splitPartition, List.of(newPartition1, newPartition2)).synchronousCommit(stateStore); } /** @@ -129,7 +126,7 @@ public void addFile(FileReference fileReference) throws StateStoreException { * @throws StateStoreException if the update fails for another reason */ public void addFiles(List fileReferences) throws StateStoreException { - addFilesWithReferences(AllReferencesToAFile.newFilesWithReferences(fileReferences)); + AddFilesTransaction.fromReferences(fileReferences).synchronousCommit(stateStore); } /** @@ -145,10 +142,7 @@ public void addFiles(List fileReferences) throws StateStoreExcept * @throws StateStoreException if the update fails for another reason */ public void addFilesWithReferences(List files) throws StateStoreException { - AddFilesTransaction transaction = new AddFilesTransaction(files); - stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction) - .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> transaction.validateFiles(state))) - .build()); + new AddFilesTransaction(files).synchronousCommit(stateStore); } /** @@ -178,11 +172,7 @@ public void addFilesWithReferences(List files) throws Stat * @throws SplitRequestsFailedException if any of the requests fail, even if some succeeded */ public void splitFileReferences(List splitRequests) throws SplitRequestsFailedException { - try { - addTransaction(new SplitFileReferencesTransaction(splitRequests)); - } catch (StateStoreException e) { - throw new SplitRequestsFailedException(List.of(), splitRequests, e); - } + new SplitFileReferencesTransaction(splitRequests).synchronousCommit(stateStore); } /** @@ -196,7 +186,7 @@ public void splitFileReferences(List splitRequests) t * @throws StateStoreException if the update fails for another reason */ public void assignJobIds(List requests) throws StateStoreException { - addTransaction(new AssignJobIdsTransaction(requests)); + new AssignJobIdsTransaction(requests).synchronousCommit(stateStore); } /** @@ -212,14 +202,7 @@ public void assignJobIds(List requests) throws StateStoreExc * @throws ReplaceRequestsFailedException if any of the updates fail */ public void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException { - try { - ReplaceFileReferencesTransaction transaction = new ReplaceFileReferencesTransaction(requests); - stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction) - .beforeApplyListener(StateListenerBeforeApply.withFilesState(state -> transaction.validateStateChange(state))) - .build()); - } catch (StateStoreException e) { - throw new ReplaceRequestsFailedException(requests, e); - } + new ReplaceFileReferencesTransaction(requests).synchronousCommit(stateStore); } /** @@ -239,7 +222,7 @@ public void atomicallyReplaceFileReferencesWithNewOnes(List filenames) throws StateStoreException { - addTransaction(new DeleteFilesTransaction(filenames)); + new DeleteFilesTransaction(filenames).synchronousCommit(stateStore); } /** @@ -249,28 +232,27 @@ public void deleteGarbageCollectedFileReferenceCounts(List filenames) th * @throws StateStoreException if the update fails */ public void clearSleeperTable() throws StateStoreException { - clearFileData(); - clearPartitionData(); + stateStore.clearSleeperTable(); } /** * Clears all file data from the file reference store. Note that this does not delete any of the actual files. + * + * @throws StateStoreException if the update fails */ public void clearFileData() throws StateStoreException { - addTransaction(new ClearFilesTransaction()); + new ClearFilesTransaction().synchronousCommit(stateStore); } /** * Clears all partition data from the store. Note that this will invalidate any file references held in the store, * so this should only be used when no files are present. The store must be initialised before the Sleeper table can * be used again. Any file references will need to be added again. + * + * @throws StateStoreException if the update fails */ public void clearPartitionData() throws StateStoreException { - addTransaction(new InitialisePartitionsTransaction(List.of())); - } - - private void addTransaction(StateStoreTransaction transaction) { - stateStore.addTransaction(AddTransactionRequest.withTransaction(transaction).build()); + new InitialisePartitionsTransaction(List.of()).synchronousCommit(stateStore); } } diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java index bc5721e3c7..91b3088c66 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/TransactionLogStateStoreLogSpecificTest.java @@ -456,7 +456,7 @@ class StoreTransactionBodySeparately { void shouldAddFileTransactionWhoseBodyIsHeldInS3() { // Given FileReference file = fileFactory().rootFile("file.parquet", 100); - FileReferenceTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); + FileReferenceTransaction transaction = AddFilesTransaction.fromReferences(List.of(file)); String key = "table/transactions/myTransaction.json"; transactionBodyStore.store(key, tableId, transaction); @@ -486,7 +486,7 @@ void shouldAddPartitionTransactionWhoseBodyIsHeldInS3() { void shouldFailToLoadTransactionIfBodyIsNotInBodyStore() { // Given FileReference file = fileFactory().rootFile("file.parquet", 100); - FileReferenceTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); + FileReferenceTransaction transaction = AddFilesTransaction.fromReferences(List.of(file)); String key = "table/transactions/myTransaction.json"; store.addTransaction(AddTransactionRequest.withTransaction(transaction).bodyKey(key).build()); @@ -500,7 +500,7 @@ void shouldFailToLoadTransactionIfBodyIsNotInBodyStore() { void shouldFailToLoadTransactionIfTypeHeldInLogDoesNotMatchTypeInBodyStore() { // Given FileReference file = fileFactory().rootFile("file.parquet", 100); - FileReferenceTransaction transactionInStore = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); + FileReferenceTransaction transactionInStore = AddFilesTransaction.fromReferences(List.of(file)); FileReferenceTransaction transactionInLog = new ClearFilesTransaction(); String key = "table/transactions/myTransaction.json"; transactionBodyStore.store(key, tableId, transactionInStore); diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/CompactionJobCommitTransactionTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/CompactionJobCommitTransactionTest.java index d8a684b24c..8e2a453bcf 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/CompactionJobCommitTransactionTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/CompactionJobCommitTransactionTest.java @@ -22,6 +22,7 @@ import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.exception.NewReferenceSameAsOldReferenceException; +import sleeper.core.statestore.exception.ReplaceRequestsFailedException; import sleeper.core.statestore.testutils.InMemoryTransactionLogStateStoreCompactionTrackerTestBase; import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.TransactionLogStateStore; @@ -219,7 +220,8 @@ void shouldFailWhenFileToBeMarkedReadyForGCHasSameFileNameAsNewFile() { // When / Then assertThatThrownBy(() -> new ReplaceFileReferencesTransaction(List.of( replaceJobFileReferences("job1", List.of("file1"), file)))) - .isInstanceOf(NewReferenceSameAsOldReferenceException.class); + .isInstanceOf(ReplaceRequestsFailedException.class) + .cause().isInstanceOf(NewReferenceSameAsOldReferenceException.class); } @Test diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/IngestJobCommitTransactionTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/IngestJobCommitTransactionTest.java index 6144fc89c3..c855801ee2 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/IngestJobCommitTransactionTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/IngestJobCommitTransactionTest.java @@ -85,7 +85,7 @@ void shouldCommitIngestJob() { void shouldNotUpdateTrackerWhenCommitIsNotForAnyIngestJob() { // Given we have a commit request without an ingest job (e.g. from an endless stream of records) FileReference file = factory.rootFile("file.parquet", 100L); - AddFilesTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); + AddFilesTransaction transaction = AddFilesTransaction.fromReferences(List.of(file)); // When addTransactionWithTracking(transaction); @@ -99,7 +99,7 @@ void shouldNotUpdateTrackerWhenCommitIsNotForAnyIngestJob() { void shouldFailWhenFileAlreadyExists() { // Given FileReference file = factory.rootFile("file.parquet", 100L); - addTransactionWithTracking(new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file)))); + addTransactionWithTracking(AddFilesTransaction.fromReferences(List.of(file))); trackJobRun("test-job", "test-run", 1, file); AddFilesTransaction transaction = AddFilesTransaction.builder() .files(AllReferencesToAFile.newFilesWithReferences(List.of(file))) diff --git a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/TransactionSerDeTest.java b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/TransactionSerDeTest.java index 8b9a2bc6f8..abea959f60 100644 --- a/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/TransactionSerDeTest.java +++ b/java/core/src/test/java/sleeper/core/statestore/transactionlog/transaction/TransactionSerDeTest.java @@ -290,9 +290,8 @@ class NoSchema { void shouldSerialiseFileTransactionWithoutSchema() { // Given PartitionTree partitions = new PartitionsBuilder(schemaWithKey("key")).singlePartition("root").buildTree(); - FileReferenceTransaction transaction = new AddFilesTransaction( - AllReferencesToAFile.newFilesWithReferences(List.of( - FileReferenceFactory.from(partitions).rootFile("file.parquet", 100)))); + FileReferenceTransaction transaction = AddFilesTransaction.fromReferences(List.of( + FileReferenceFactory.from(partitions).rootFile("file.parquet", 100))); // When String json = serDe.toJson(transaction); diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java index 68230677f6..9f53f5b0e1 100644 --- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java +++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollector.java @@ -132,7 +132,7 @@ private void deleteBatch(List batch, TableProperties tableProperties, St tableProperties.get(TABLE_ID), new DeleteFilesTransaction(deletedFilenames))); LOGGER.info("Submitted asynchronous request to state store committer for {} deleted files in table {}", deletedFilenames.size(), tableProperties.getStatus()); } else { - stateStore.deleteGarbageCollectedFileReferenceCounts(deletedFilenames); + new DeleteFilesTransaction(deletedFilenames).synchronousCommit(stateStore); LOGGER.info("Applied deletion to state store"); } } catch (Exception e) { diff --git a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java index 05bce7503f..e2060563e5 100644 --- a/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java +++ b/java/ingest/ingest-runner/src/main/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStore.java @@ -41,14 +41,14 @@ public interface AddFilesToStateStore { void addFiles(List references) throws StateStoreException; static AddFilesToStateStore synchronous(StateStore stateStore) { - return stateStore::addFiles; + return references -> AddFilesTransaction.fromReferences(references).synchronousCommit(stateStore); } static AddFilesToStateStore synchronous( StateStore stateStore, IngestJobTracker tracker, IngestJobAddedFilesEvent.Builder statusUpdateBuilder) { return references -> { List files = AllReferencesToAFile.newFilesWithReferences(references); - stateStore.addFilesWithReferences(files); + new AddFilesTransaction(files).synchronousCommit(stateStore); tracker.jobAddedFiles(statusUpdateBuilder.files(files).build()); }; } diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreTest.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreTest.java index 0960bc8b53..84f1bfbdd8 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreTest.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/impl/commit/AddFilesToStateStoreTest.java @@ -22,7 +22,6 @@ import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.schema.Schema; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.commit.StateStoreCommitRequest; @@ -56,7 +55,7 @@ void shouldSendCommitRequestToQueue() throws Exception { // Then assertThat(stateStoreCommitQueue) .containsExactly(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file1))))); + AddFilesTransaction.fromReferences(List.of(file1)))); } private AddFilesToStateStore bySqs() { diff --git a/java/splitter/splitter-core/src/main/java/sleeper/splitter/core/split/SplitPartition.java b/java/splitter/splitter-core/src/main/java/sleeper/splitter/core/split/SplitPartition.java index 0bc58041f4..d2540d2f0c 100644 --- a/java/splitter/splitter-core/src/main/java/sleeper/splitter/core/split/SplitPartition.java +++ b/java/splitter/splitter-core/src/main/java/sleeper/splitter/core/split/SplitPartition.java @@ -102,7 +102,7 @@ private void apply(SplitPartitionResult result) { LOGGER.info("New partition: {}", rightChild); if (!tableProperties.getBoolean(PARTITION_SPLIT_ASYNC_COMMIT)) { - stateStore.atomicallyUpdatePartitionAndCreateNewOnes(parentPartition, leftChild, rightChild); + new SplitPartitionTransaction(parentPartition, List.of(leftChild, rightChild)).synchronousCommit(stateStore); } else { sendAsyncCommit.sendCommit(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID), new SplitPartitionTransaction(parentPartition, List.of(leftChild, rightChild)))); diff --git a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java index d41dae4d14..b1fef0ac62 100644 --- a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java +++ b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java @@ -334,7 +334,7 @@ void shouldApplyIngestStreamAddFilesCommitRequest() throws Exception { // Given we have a commit request without an ingest job (e.g. from an endless stream of records) StateStore stateStore = createTableGetStateStore("test-table"); FileReference outputFile = fileFactory.rootFile("output.parquet", 123L); - AddFilesTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(outputFile))); + AddFilesTransaction transaction = AddFilesTransaction.fromReferences(List.of(outputFile)); // When apply(StateStoreCommitRequest.create("test-table", transaction)); diff --git a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterThroughputIT.java b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterThroughputIT.java index 13ae09715c..1694dd5839 100644 --- a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterThroughputIT.java +++ b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterThroughputIT.java @@ -30,7 +30,6 @@ import sleeper.core.properties.table.TablePropertiesProvider; import sleeper.core.schema.Schema; import sleeper.core.schema.type.StringType; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStoreProvider; import sleeper.core.statestore.commit.StateStoreCommitRequest; @@ -80,13 +79,13 @@ private Stats runAddFilesRequestsWithNoJobGetStats(int numberOfRequests) throws StateStoreCommitter committer = committer(); FileReferenceFactory fileFactory = FileReferenceFactory.from(new PartitionsBuilder(schema).singlePartition("root").buildTree()); committer.apply(StateStoreCommitRequest.create(tableId, - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences( - List.of(fileFactory.rootFile("prewarm-file.parquet", 123)))))); + AddFilesTransaction.fromReferences( + List.of(fileFactory.rootFile("prewarm-file.parquet", 123))))); return runRequestsGetStats(committer, IntStream.rangeClosed(1, numberOfRequests) .mapToObj(i -> StateStoreCommitRequest.create(tableId, - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences( - List.of(fileFactory.rootFile("file-" + i + ".parquet", i))))))); + AddFilesTransaction.fromReferences( + List.of(fileFactory.rootFile("file-" + i + ".parquet", i)))))); } private Stats runRequestsGetStats(StateStoreCommitter committer, Stream requests) throws Exception { diff --git a/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java b/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java index 9bc6da3ea1..5893a8a86e 100644 --- a/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java +++ b/java/statestore-lambda/src/test/java/sleeper/statestore/lambda/committer/StateStoreCommitterLambdaTest.java @@ -30,7 +30,6 @@ import sleeper.core.properties.testutils.FixedTablePropertiesProvider; import sleeper.core.schema.Schema; import sleeper.core.schema.type.StringType; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.FilesReportTestHelper; @@ -149,7 +148,7 @@ private SQSEvent event(SQSMessage... messages) { private SQSMessage addFilesMessage(String messageId, FileReference... files) { StateStoreCommitRequest request = StateStoreCommitRequest.create(tableProperties.get(TABLE_ID), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(files)))); + AddFilesTransaction.fromReferences(List.of(files))); SQSMessage message = new SQSMessage(); message.setMessageId(messageId); message.setBody(new StateStoreCommitRequestSerDe(tableProperties).toJson(request)); diff --git a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStore.java b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStore.java index 9ff97f706e..a392cfcd15 100644 --- a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStore.java +++ b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStore.java @@ -26,6 +26,7 @@ import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import sleeper.parquet.utils.HadoopConfigurationProvider; import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client; @@ -61,7 +62,7 @@ public static void main(String[] args) { Configuration conf = HadoopConfigurationProvider.getConfigurationForClient(); StateStore stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, conf).getStateStore(tableProperties); - stateStore.initialise(); + InitialisePartitionsTransaction.singlePartition(tableProperties.getSchema()).synchronousCommit(stateStore); } finally { dynamoDBClient.shutdown(); s3Client.shutdown(); diff --git a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromExportedPartitions.java b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromExportedPartitions.java index 6f09a25914..4146d42fe4 100644 --- a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromExportedPartitions.java +++ b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromExportedPartitions.java @@ -28,6 +28,7 @@ import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import sleeper.parquet.utils.HadoopConfigurationProvider; import java.io.BufferedReader; @@ -86,7 +87,7 @@ public static void main(String[] args) throws IOException { } System.out.println("Read " + partitions.size() + " partitions from file"); - stateStore.initialise(partitions); + new InitialisePartitionsTransaction(partitions).synchronousCommit(stateStore); } finally { dynamoDBClient.shutdown(); s3Client.shutdown(); diff --git a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromSplitPoints.java b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromSplitPoints.java index 23e27b93e2..12e9912f23 100644 --- a/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromSplitPoints.java +++ b/java/statestore/src/main/java/sleeper/statestore/InitialiseStateStoreFromSplitPoints.java @@ -27,8 +27,8 @@ import sleeper.core.partition.PartitionsFromSplitPoints; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; -import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreProvider; +import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction; import sleeper.parquet.utils.HadoopConfigurationProvider; import java.io.IOException; @@ -64,8 +64,8 @@ public InitialiseStateStoreFromSplitPoints( * Initialises the state store. */ public void run() { - StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); - stateStore.initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct()); + List partitions = new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct(); + new InitialisePartitionsTransaction(partitions).synchronousCommit(stateStoreProvider.getStateStore(tableProperties)); } /** diff --git a/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStateStoreNoSnapshots.java b/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStateStoreNoSnapshots.java index a011a83f88..66e41e886f 100644 --- a/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStateStoreNoSnapshots.java +++ b/java/statestore/src/main/java/sleeper/statestore/transactionlog/DynamoDBTransactionLogStateStoreNoSnapshots.java @@ -56,7 +56,6 @@ public static TransactionLogStateStore.Builder builderFrom( AmazonDynamoDB dynamoDB, AmazonS3 s3) { return TransactionLogStateStore.builder() .sleeperTable(tableProperties.getStatus()) - .schema(tableProperties.getSchema()) .timeBetweenSnapshotChecks(Duration.ofSeconds(tableProperties.getLong(TIME_BETWEEN_SNAPSHOT_CHECKS_SECS))) .timeBetweenTransactionChecks(Duration.ofMillis(tableProperties.getLong(TIME_BETWEEN_TRANSACTION_CHECKS_MS))) .minTransactionsAheadToLoadSnapshot(tableProperties.getLong(MIN_TRANSACTIONS_AHEAD_TO_LOAD_SNAPSHOT)) diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/S3TransactionBodyStoreIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/S3TransactionBodyStoreIT.java index ed436dfa20..28014644f1 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/S3TransactionBodyStoreIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/S3TransactionBodyStoreIT.java @@ -24,7 +24,6 @@ import sleeper.core.partition.PartitionsBuilder; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TableProperties; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.transactionlog.log.StoreTransactionBodyResult; import sleeper.core.statestore.transactionlog.log.TransactionBodyStore; @@ -102,8 +101,8 @@ class SupportOnlyFileTransactions { void shouldStoreFileTransaction() { // Given PartitionTree partitions = new PartitionsBuilder(tableProperties).singlePartition("root").buildTree(); - FileReferenceTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of( - FileReferenceFactory.from(partitions).rootFile("test.parquet", 100)))); + FileReferenceTransaction transaction = AddFilesTransaction.fromReferences(List.of( + FileReferenceFactory.from(partitions).rootFile("test.parquet", 100))); // When StateStoreTransaction found = saveAndLoad(store, transaction); @@ -137,8 +136,8 @@ class RetainSerialisedTransaction { void shouldNotStoreTransactionSmallerThanThreshold() { // Given PartitionTree partitions = new PartitionsBuilder(tableProperties).singlePartition("root").buildTree(); - AddFilesTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of( - FileReferenceFactory.from(partitions).rootFile("test.parquet", 100)))); + AddFilesTransaction transaction = AddFilesTransaction.fromReferences(List.of( + FileReferenceFactory.from(partitions).rootFile("test.parquet", 100))); // When StoreTransactionBodyResult result = store.storeIfTooBig(tableId, transaction); @@ -154,10 +153,10 @@ void shouldNotStoreTransactionSmallerThanThreshold() { void shouldStoreTransactionLargerThanThreshold() { // Given PartitionTree partitions = new PartitionsBuilder(tableProperties).singlePartition("root").buildTree(); - AddFilesTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of( + AddFilesTransaction transaction = AddFilesTransaction.fromReferences(List.of( FileReferenceFactory.from(partitions).rootFile("test1.parquet", 100), FileReferenceFactory.from(partitions).rootFile("test2.parquet", 200), - FileReferenceFactory.from(partitions).rootFile("test3.parquet", 300)))); + FileReferenceFactory.from(partitions).rootFile("test3.parquet", 300))); // When StoreTransactionBodyResult result = store.storeIfTooBig(tableId, transaction); diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java index 569ce65084..76ca6645c0 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/TransactionLogStateStoreDynamoDBSpecificIT.java @@ -27,7 +27,6 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; @@ -126,7 +125,7 @@ void shouldAddATransactionAlreadyHeldInS3() { PartitionTree tree = new PartitionsBuilder(schema).singlePartition("root").buildTree(); update(stateStore).initialise(tree.getAllPartitions()); FileReference file = fileFactory(tree).rootFile("test.parquet", 100); - FileReferenceTransaction transaction = new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file))); + FileReferenceTransaction transaction = AddFilesTransaction.fromReferences(List.of(file)); String key = TransactionBodyStore.createObjectKey(tableProperties); TransactionBodyStore transactionBodyStore = new S3TransactionBodyStore(instanceProperties, s3Client, TransactionSerDeProvider.forOneTable(tableProperties)); diff --git a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java index 299eff594d..eeea7531a3 100644 --- a/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java +++ b/java/statestore/src/test/java/sleeper/statestore/transactionlog/snapshots/TransactionLogTransactionDeleterTest.java @@ -23,7 +23,6 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.schema.Schema; import sleeper.core.schema.type.StringType; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.StateStore; @@ -81,7 +80,7 @@ void shouldDeleteOldTransactionWhenTwoAreBeforeLatestSnapshot() throws Exception // Then the transaction is deleted assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))) .containsExactly(new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2))))); + AddFilesTransaction.fromReferences(List.of(file2)))); } @Test @@ -106,9 +105,9 @@ void shouldNotDeleteOldTransactionWhenNoSnapshotIsOldEnough() throws Exception { assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))) .containsExactly( new TransactionLogEntry(1, Instant.parse("2024-06-24T15:45:45Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file1)))), + AddFilesTransaction.fromReferences(List.of(file1))), new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2))))); + AddFilesTransaction.fromReferences(List.of(file2)))); } @Test @@ -136,9 +135,9 @@ void shouldDeleteOldTransactionWhenOldSnapshotIsOldEnough() throws Exception { // Then a transaction is deleted behind the first snapshot assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))).containsExactly( new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2)))), + AddFilesTransaction.fromReferences(List.of(file2))), new TransactionLogEntry(3, Instant.parse("2024-06-24T15:47:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file3))))); + AddFilesTransaction.fromReferences(List.of(file3)))); } @Test @@ -163,9 +162,9 @@ void shouldNotDeleteOldTransactionWhenSnapshotIsOldEnoughButTransactionNotFarEno assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))) .containsExactly( new TransactionLogEntry(1, Instant.parse("2024-06-24T15:45:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file1)))), + AddFilesTransaction.fromReferences(List.of(file1))), new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2))))); + AddFilesTransaction.fromReferences(List.of(file2)))); } @Test @@ -187,9 +186,9 @@ void shouldNotDeleteTransactionsWhenNoSnapshotExistsYet() throws Exception { assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))) .containsExactly( new TransactionLogEntry(1, Instant.parse("2024-06-24T15:45:45Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file1)))), + AddFilesTransaction.fromReferences(List.of(file1))), new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2))))); + AddFilesTransaction.fromReferences(List.of(file2)))); } @Test @@ -246,9 +245,9 @@ void shouldDeleteOldTransactionWhenTwoSnapshotsAreOldEnough() throws Exception { // Then transactions are deleted behind the second snapshot assertThat(filesLogStore.readTransactions(toUpdateLocalStateAt(0))).containsExactly( new TransactionLogEntry(2, Instant.parse("2024-06-24T15:46:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file2)))), + AddFilesTransaction.fromReferences(List.of(file2))), new TransactionLogEntry(3, Instant.parse("2024-06-24T15:47:00Z"), - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file3))))); + AddFilesTransaction.fromReferences(List.of(file3)))); } private void setupAtTime(Instant time, SetupFunction setup) throws Exception { diff --git a/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriverIT.java b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriverIT.java index 653f5b5154..d2565af7d9 100644 --- a/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriverIT.java +++ b/java/system-test/system-test-drivers/src/test/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriverIT.java @@ -24,7 +24,6 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; import sleeper.core.statestore.commit.StateStoreCommitRequest; @@ -76,7 +75,7 @@ void shouldSendCommitToSqsQueue(SleeperSystemTest sleeper) { .extracting(this::getMessageGroupId, this::readCommitRequest) .containsExactly(tuple(tableId, StateStoreCommitRequest.create(tableId, - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file)))))); + AddFilesTransaction.fromReferences(List.of(file))))); } @Test @@ -96,7 +95,7 @@ void shouldSendMoreCommitsThanBatchSize(SleeperSystemTest sleeper) { .extracting(this::getMessageGroupId, this::readCommitRequest) .containsExactlyInAnyOrderElementsOf(files.stream().map(file -> tuple(tableId, StateStoreCommitRequest.create(tableId, - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(List.of(file)))))) + AddFilesTransaction.fromReferences(List.of(file))))) .collect(toUnmodifiableList())); } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/ingest/SystemTestIngestToStateStore.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/ingest/SystemTestIngestToStateStore.java index b0b81d141c..fb46d6b250 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/ingest/SystemTestIngestToStateStore.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/ingest/SystemTestIngestToStateStore.java @@ -18,7 +18,6 @@ import sleeper.core.partition.Partition; import sleeper.core.partition.PartitionTree; -import sleeper.core.statestore.AllReferencesToAFile; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction; @@ -87,7 +86,7 @@ public SystemTestIngestToStateStore addFileOnEveryPartition(String name, long nu private void addFiles(List fileReferences) { instance.getStateStore().addTransaction(AddTransactionRequest.withTransaction( - new AddFilesTransaction(AllReferencesToAFile.newFilesWithReferences(fileReferences))) + AddFilesTransaction.fromReferences(fileReferences)) .build()); } }