Skip to content

Commit

Permalink
synchronousCommit for the clear partition and the initalise
Browse files Browse the repository at this point in the history
  • Loading branch information
rtjd6554 committed Feb 25, 2025
1 parent bf95cde commit 8bf7bf5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package sleeper.core.statestore.transactionlog.transaction.impl;

import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;

Expand All @@ -31,6 +33,16 @@ public class ClearFilesTransaction implements FileReferenceTransaction {
public void validate(StateStoreFiles stateStoreFiles) throws StateStoreException {
}

/**
* Commit this transaction directly to the state store without going to the commit queue. This will throw any
* validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit.
*
* @param stateStore the state store
*/
public void synchronousCommit(StateStore stateStore) {
stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build());
}

@Override
public void apply(StateStoreFiles stateStoreFiles, Instant updateTime) {
stateStoreFiles.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;

Expand Down Expand Up @@ -67,6 +68,16 @@ public void checkBefore(StateStore stateStore) throws StateStoreException {
}
}

/**
* Commit this transaction directly to the state store without going to the commit queue. This will throw any
* validation exceptions immediately, even if they wouldn't be as part of an asynchronous commit.
*
* @param stateStore the state store
*/
public void synchronousCommit(StateStore stateStore) {
stateStore.addFilesTransaction(AddTransactionRequest.withTransaction(this).build());
}

@Override
public int hashCode() {
return Objects.hash(partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void initialise(Schema schema) throws StateStoreException {
* @throws StateStoreException if the update fails
*/
public void initialise(List<Partition> partitions) throws StateStoreException {
addTransaction(new InitialisePartitionsTransaction(partitions));
new InitialisePartitionsTransaction(partitions).synchronousCommit(stateStore);
}

/**
Expand Down Expand Up @@ -257,7 +257,7 @@ public void clearSleeperTable() throws StateStoreException {
* Clears all file data from the file reference store. Note that this does not delete any of the actual files.
*/
public void clearFileData() throws StateStoreException {
addTransaction(new ClearFilesTransaction());
new ClearFilesTransaction().synchronousCommit(stateStore);
}

/**
Expand All @@ -266,7 +266,7 @@ public void clearFileData() throws StateStoreException {
* be used again. Any file references will need to be added again.
*/
public void clearPartitionData() throws StateStoreException {
addTransaction(new InitialisePartitionsTransaction(List.of()));
new InitialisePartitionsTransaction(List.of()).synchronousCommit(stateStore);
}

private void addTransaction(StateStoreTransaction<?> transaction) {
Expand Down

0 comments on commit 8bf7bf5

Please sign in to comment.