diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java index 366988bdd1..044d9d1f11 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/creation/AssignJobIdToFiles.java @@ -35,7 +35,7 @@ public interface AssignJobIdToFiles { static AssignJobIdToFiles synchronous(StateStore stateStore) { return (assignJobIdRequests, tableStatus) -> { - stateStore.assignJobIds(assignJobIdRequests); + new AssignJobIdsTransaction(assignJobIdRequests).synchronousCommit(stateStore); }; } diff --git a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java index 48c56ec0ca..ebbe3131c2 100644 --- a/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java +++ b/java/core/src/main/java/sleeper/core/statestore/DelegatingStateStore.java @@ -73,15 +73,6 @@ public void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws StateStoreException { - if (requests.isEmpty()) { - LOGGER.info("Ignoring assignJobIds call with no requests"); - return; - } - fileReferenceStore.assignJobIds(requests); - } - @Override public void deleteGarbageCollectedFileReferenceCounts(List filenames) throws StateStoreException { if (filenames.isEmpty()) { diff --git a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java b/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java index 7429523df1..7d90fad852 100644 --- a/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java +++ b/java/core/src/main/java/sleeper/core/statestore/FileReferenceStoreUpdates.java @@ -18,8 +18,6 @@ import sleeper.core.statestore.exception.FileAlreadyExistsException; import sleeper.core.statestore.exception.FileHasReferencesException; import sleeper.core.statestore.exception.FileNotFoundException; -import sleeper.core.statestore.exception.FileReferenceAssignedToJobException; -import sleeper.core.statestore.exception.FileReferenceNotFoundException; import sleeper.core.statestore.exception.ReplaceRequestsFailedException; import sleeper.core.statestore.exception.SplitRequestsFailedException; @@ -103,18 +101,6 @@ default void addFiles(List fileReferences) throws StateStoreExcep */ void atomicallyReplaceFileReferencesWithNewOnes(List requests) throws ReplaceRequestsFailedException; - /** - * Atomically updates the job field of file references, as long as the job field is currently unset. This will be - * used for compaction job input files. - * - * @param requests A list of {@link AssignJobIdRequest}s which should each be applied - * atomically - * @throws FileReferenceNotFoundException if a reference does not exist - * @throws FileReferenceAssignedToJobException if a reference is already assigned to a job - * @throws StateStoreException if the update fails for another reason - */ - void assignJobIds(List requests) throws StateStoreException; - /** * Records that files were garbage collected and have been deleted. The reference counts for those files should be * deleted. diff --git a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java index 1f74f41318..f4df850836 100644 --- a/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java +++ b/java/core/src/main/java/sleeper/core/statestore/transactionlog/transaction/impl/AssignJobIdsTransaction.java @@ -20,9 +20,11 @@ import sleeper.core.statestore.AssignJobIdRequest; import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreException; import sleeper.core.statestore.exception.FileReferenceAssignedToJobException; import sleeper.core.statestore.exception.FileReferenceNotFoundException; +import sleeper.core.statestore.transactionlog.AddTransactionRequest; import sleeper.core.statestore.transactionlog.state.StateStoreFile; import sleeper.core.statestore.transactionlog.state.StateStoreFiles; import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction; @@ -72,6 +74,16 @@ public void apply(StateStoreFiles stateStoreFiles, Instant updateTime) { } } + /** + * Commit this transaction directly to the state store without going to the commit queue. This will throw any + * validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit. + * + * @param stateStore the state store + */ + public void synchronousCommit(StateStore stateStore) { + stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build()); + } + @Override public boolean isEmpty() { return requests.isEmpty(); diff --git a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java index cf1b6f0287..4b150bd80a 100644 --- a/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java +++ b/java/core/src/test/java/sleeper/core/statestore/testutils/StateStoreUpdatesWrapper.java @@ -196,7 +196,7 @@ public void splitFileReferences(List splitRequests) t * @throws StateStoreException if the update fails for another reason */ public void assignJobIds(List requests) throws StateStoreException { - addTransaction(new AssignJobIdsTransaction(requests)); + new AssignJobIdsTransaction(requests).synchronousCommit(stateStore); } /**