Skip to content

Commit

Permalink
Removed assignJobIds method and replace with transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
rtjd6554 committed Feb 21, 2025
1 parent dbbb4d7 commit c40999a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface AssignJobIdToFiles {

static AssignJobIdToFiles synchronous(StateStore stateStore) {
return (assignJobIdRequests, tableStatus) -> {
stateStore.assignJobIds(assignJobIdRequests);
new AssignJobIdsTransaction(assignJobIdRequests).synchronousCommit(stateStore);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,6 @@ public void atomicallyReplaceFileReferencesWithNewOnes(List<ReplaceFileReference
fileReferenceStore.atomicallyReplaceFileReferencesWithNewOnes(requests);
}

@Override
public void assignJobIds(List<AssignJobIdRequest> requests) throws StateStoreException {
if (requests.isEmpty()) {
LOGGER.info("Ignoring assignJobIds call with no requests");
return;
}
fileReferenceStore.assignJobIds(requests);
}

@Override
public void deleteGarbageCollectedFileReferenceCounts(List<String> filenames) throws StateStoreException {
if (filenames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import sleeper.core.statestore.exception.FileAlreadyExistsException;
import sleeper.core.statestore.exception.FileHasReferencesException;
import sleeper.core.statestore.exception.FileNotFoundException;
import sleeper.core.statestore.exception.FileReferenceAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
import sleeper.core.statestore.exception.SplitRequestsFailedException;

Expand Down Expand Up @@ -103,18 +101,6 @@ default void addFiles(List<FileReference> fileReferences) throws StateStoreExcep
*/
void atomicallyReplaceFileReferencesWithNewOnes(List<ReplaceFileReferencesRequest> requests) throws ReplaceRequestsFailedException;

/**
* Atomically updates the job field of file references, as long as the job field is currently unset. This will be
* used for compaction job input files.
*
* @param requests A list of {@link AssignJobIdRequest}s which should each be applied
* atomically
* @throws FileReferenceNotFoundException if a reference does not exist
* @throws FileReferenceAssignedToJobException if a reference is already assigned to a job
* @throws StateStoreException if the update fails for another reason
*/
void assignJobIds(List<AssignJobIdRequest> requests) throws StateStoreException;

/**
* Records that files were garbage collected and have been deleted. The reference counts for those files should be
* deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileReferenceAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateStoreFile;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
Expand Down Expand Up @@ -72,6 +74,16 @@ public void apply(StateStoreFiles stateStoreFiles, Instant updateTime) {
}
}

/**
* Commit this transaction directly to the state store without going to the commit queue. This will throw any
* validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit.
*
* @param stateStore the state store
*/
public void synchronousCommit(StateStore stateStore) {
stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build());
}

@Override
public boolean isEmpty() {
return requests.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void splitFileReferences(List<SplitFileReferenceRequest> splitRequests) t
* @throws StateStoreException if the update fails for another reason
*/
public void assignJobIds(List<AssignJobIdRequest> requests) throws StateStoreException {
addTransaction(new AssignJobIdsTransaction(requests));
new AssignJobIdsTransaction(requests).synchronousCommit(stateStore);
}

/**
Expand Down

0 comments on commit c40999a

Please sign in to comment.