Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 4279 - State store update wrapper for tests #4285

Merged
merged 18 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ public void fixPartitionUpdateTime(Instant now) {

@Override
public void addFilesTransaction(AddTransactionRequest request) {
request.getTransaction().checkBefore(this);
fileReferenceStore.addFilesTransaction(request);
}

@Override
public void addPartitionsTransaction(AddTransactionRequest request) {
request.getTransaction().checkBefore(this);
partitionStore.addPartitionsTransaction(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package sleeper.core.statestore.transactionlog;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.PartitionStore;
import sleeper.core.statestore.StateStoreException;
Expand All @@ -29,7 +28,6 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

Expand Down Expand Up @@ -71,7 +69,7 @@ public List<Partition> getLeafPartitions() throws StateStoreException {

@Override
public void initialise() throws StateStoreException {
initialise(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
head.addTransaction(clock.instant(), InitialisePartitionsTransaction.singlePartition(schema));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package sleeper.core.statestore.transactionlog.transaction;

import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;

import java.time.Instant;
Expand All @@ -41,4 +42,16 @@ public interface StateStoreTransaction<T> {
* @param updateTime the time that the update was added to the state store
*/
void apply(T state, Instant updateTime);

/**
* Validates against the state store before attempting to add this transaction. Should not be relied upon
* but can be used as an extra check against state that this transaction does not operate on. Note that
* any checks from this are before the state is updated as part of the transaction. This cannot validate
* against the state the transaction is applied to as it may be updated by another process in the meantime.
*
* @param stateStore the state store
* @see #validate
*/
default void checkBefore(StateStore stateStore) throws StateStoreException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package sleeper.core.statestore.transactionlog.transaction.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStoreException;
Expand All @@ -35,6 +38,7 @@
* A transaction to assign files to jobs.
*/
public class AssignJobIdsTransaction implements FileReferenceTransaction {
public static final Logger LOGGER = LoggerFactory.getLogger(AssignJobIdsTransaction.class);

private final List<AssignJobIdRequest> requests;

Expand All @@ -54,6 +58,7 @@ public static Optional<AssignJobIdsTransaction> ignoringEmptyRequests(List<Assig
.filter(request -> !request.getFilenames().isEmpty())
.collect(toUnmodifiableList());
if (filtered.isEmpty()) {
LOGGER.warn("Attempted to create transaction with no file assignments, received requests: {}", requests);
return Optional.empty();
} else {
return Optional.of(new AssignJobIdsTransaction(filtered));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
package sleeper.core.statestore.transactionlog.transaction.impl;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -35,6 +40,16 @@ public InitialisePartitionsTransaction(List<Partition> partitions) {
this.partitions = partitions;
}

/**
* Creates a transaction to initialise the table with a single root partition.
*
* @param schema the table schema
* @return the transaction
*/
public static InitialisePartitionsTransaction singlePartition(Schema schema) {
return new InitialisePartitionsTransaction(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
}

@Override
public void validate(StateStorePartitions stateStorePartitions) {
}
Expand All @@ -45,6 +60,13 @@ public void apply(StateStorePartitions stateStorePartitions, Instant updateTime)
partitions.forEach(stateStorePartitions::put);
}

@Override
public void checkBefore(StateStore stateStore) throws StateStoreException {
if (!stateStore.hasNoFiles()) {
throw new StateStoreException("Cannot initialise state store when files are present");
}
}

@Override
public int hashCode() {
return Objects.hash(partitions);
Expand All @@ -66,4 +88,5 @@ public boolean equals(Object obj) {
public String toString() {
return "InitialisePartitionsTransaction{partitions=" + partitions + "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import sleeper.core.statestore.ReplaceFileReferencesRequest;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
import sleeper.core.statestore.exception.FileNotFoundException;
import sleeper.core.statestore.exception.FileReferenceNotAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.exception.NewReferenceSameAsOldReferenceException;
import sleeper.core.statestore.transactionlog.state.StateStoreFile;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
Expand Down Expand Up @@ -114,6 +119,24 @@ public void reportJobsAllFailed(CompactionJobTracker tracker, TableStatus sleepe
}
}

/**
* Validates the transaction against the current state. Used during a synchronous commit to report on any
* failures.
*
* @param stateStoreFiles the state
* @throws FileNotFoundException if an input file does not exist
* @throws FileReferenceNotFoundException if an input file is not referenced on the same partition
* @throws FileReferenceNotAssignedToJobException if an input file is not assigned to the job on this partition
* @throws FileAlreadyExistsException if the new file already exists in the state store
* @throws NewReferenceSameAsOldReferenceException if any of the input files are the same as the output file
*/
public void validateStateChange(StateStoreFiles stateStoreFiles) {
for (ReplaceFileReferencesRequest job : jobs) {
job.validateNewReference();
job.validateStateChange(stateStoreFiles);
}
}

@Override
public int hashCode() {
return Objects.hash(jobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;

/**
* A convenience class for specifying a partition tree. This includes methods to define a tree to be readable in a test,
* including shorthand which would not be possible with {@link PartitionFactory}.
Expand Down Expand Up @@ -122,7 +124,7 @@ public void applySplit(StateStore stateStore, String partitionId) {
Partition toSplit = partitionById(partitionId);
Partition left = partitionById(toSplit.getChildPartitionIds().get(0));
Partition right = partitionById(toSplit.getChildPartitionIds().get(1));
stateStore.atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
update(stateStore).atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

import static sleeper.core.statestore.FileReferenceTestData.DEFAULT_UPDATE_TIME;
import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;
import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName;

public class InMemoryTransactionLogStateStoreTestBase {
Expand All @@ -43,12 +44,12 @@ public class InMemoryTransactionLogStateStoreTestBase {

protected void initialiseWithSchema(Schema schema) {
createStore(new PartitionsBuilder(schema).singlePartition("root"));
store.initialise();
update(store).initialise(schema);
}

protected void initialiseWithPartitions(PartitionsBuilder partitions) {
createStore(partitions);
store.initialise(partitions.buildList());
update(store).initialise(partitions.buildList());
}

private void createStore(PartitionsBuilder partitions) {
Expand Down
Loading
Loading