Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:gchq/sleeper into 3500-cdk-stack…
Browse files Browse the repository at this point in the history
…-for-bulk-export
  • Loading branch information
ab295382 committed Feb 20, 2025
2 parents 6f7ee9f + 4e3c3e9 commit 9efdb52
Show file tree
Hide file tree
Showing 26 changed files with 618 additions and 216 deletions.
40 changes: 22 additions & 18 deletions .github/workflows/dependency-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,25 @@ jobs:
- name: Resolve dependencies
run: mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
working-directory: ./java
- name: Update CVEs database
run: mvn --batch-mode dependency-check:update-only -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
working-directory: ./java
- name: Build with Maven
run: mvn --batch-mode verify dependency-check:aggregate -Pquick,skipShade -DskipRust -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
working-directory: ./java
- name: Cache Maven dependencies & CVEs database
uses: actions/cache/save@v3
if: ${{ always() }}
with:
path: ${{ runner.temp }}/.m2/repository
key: ${{ steps.restore-cache.outputs.cache-primary-key }}
- name: Upload dependency check report
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: Dependency check report
path: java/target/dependency-check-report.html
- name: Explain disabled workflow
run: |
echo "Temporarily disabled checks as the NVD API is too slow."
echo "See the following issue: https://github.com/gchq/sleeper/issues/4293"
# - name: Update CVEs database
# run: mvn --batch-mode dependency-check:update-only -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
# working-directory: ./java
# - name: Build with Maven
# run: mvn --batch-mode verify dependency-check:aggregate -Pquick,skipShade -DskipRust -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
# working-directory: ./java
# - name: Cache Maven dependencies & CVEs database
# uses: actions/cache/save@v3
# if: ${{ always() }}
# with:
# path: ${{ runner.temp }}/.m2/repository
# key: ${{ steps.restore-cache.outputs.cache-primary-key }}
# - name: Upload dependency check report
# if: ${{ always() }}
# uses: actions/upload-artifact@v4
# with:
# name: Dependency check report
# path: java/target/dependency-check-report.html
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sleeper.core.partition.Partition;
import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
import sleeper.core.statestore.exception.SplitRequestsFailedException;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;

import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -186,4 +187,16 @@ public void fixFileUpdateTime(Instant now) {
public void fixPartitionUpdateTime(Instant now) {
partitionStore.fixPartitionUpdateTime(now);
}

@Override
public void addFilesTransaction(AddTransactionRequest request) {
request.getTransaction().checkBefore(this);
fileReferenceStore.addFilesTransaction(request);
}

@Override
public void addPartitionsTransaction(AddTransactionRequest request) {
request.getTransaction().checkBefore(this);
partitionStore.addPartitionsTransaction(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package sleeper.core.statestore;

import sleeper.core.statestore.transactionlog.AddTransactionRequest;

import java.time.Instant;

/**
Expand All @@ -36,4 +38,11 @@ public interface FileReferenceStore extends FileReferenceStoreQueries, FileRefer
* @param time the time that any future file updates will be considered to occur
*/
void fixFileUpdateTime(Instant time);

/**
* Adds a file transaction to the transaction log.
*
* @param request the request
*/
void addFilesTransaction(AddTransactionRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionTree;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;

import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -99,4 +100,11 @@ default Partition getPartition(String partitionId) throws StateStoreException {
*/
default void fixPartitionUpdateTime(Instant time) {
}

/**
* Adds a partions transaction to the transaction log.
*
* @param request the request
*/
void addPartitionsTransaction(AddTransactionRequest request);
}
18 changes: 18 additions & 0 deletions java/core/src/main/java/sleeper/core/statestore/StateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package sleeper.core.statestore;

import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;

/**
* Stores information about the data files and their status (i.e. {@link FileReference}s,
* and the {@link sleeper.core.partition.Partition}s).
Expand All @@ -30,4 +34,18 @@ default void clearSleeperTable() throws StateStoreException {
clearFileData();
clearPartitionData();
}

/**
* Applies a transaction to the state store.
*
* @param request the request
* @see AddTransactionRequest
*/
default void addTransaction(AddTransactionRequest request) {
if (request.getTransaction() instanceof FileReferenceTransaction) {
addFilesTransaction(request);
} else if (request.getTransaction() instanceof PartitionTransaction) {
addPartitionsTransaction(request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Optional;

/**
* Holds a transaction that should be added to the log. The transaction may or may not already be held in S3.
* Holds a transaction that should be added to the log. The transaction may or may not be held in S3. When writing to a
* log we could point to the existing file in S3 rather than writing the transaction directly, particularly if it's too
* big.
*/
public class AddTransactionRequest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ void updateFromLog() throws StateStoreException {
head.update();
}

void addTransaction(AddTransactionRequest request) {
@Override
public void addFilesTransaction(AddTransactionRequest request) {
head.addTransaction(clock.instant(), request);
}

Expand All @@ -161,5 +162,4 @@ private StateStoreFiles files() throws StateStoreException {
head.update();
return head.state();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package sleeper.core.statestore.transactionlog;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.PartitionStore;
import sleeper.core.statestore.StateStoreException;
Expand All @@ -29,7 +28,6 @@
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

Expand Down Expand Up @@ -71,7 +69,7 @@ public List<Partition> getLeafPartitions() throws StateStoreException {

@Override
public void initialise() throws StateStoreException {
initialise(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
head.addTransaction(clock.instant(), InitialisePartitionsTransaction.singlePartition(schema));
}

@Override
Expand All @@ -93,7 +91,8 @@ public void updateFromLog() throws StateStoreException {
head.update();
}

void addTransaction(AddTransactionRequest request) {
@Override
public void addPartitionsTransaction(AddTransactionRequest request) {
head.addTransaction(clock.instant(), request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import sleeper.core.statestore.transactionlog.state.StateListenerBeforeApply;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;
import sleeper.core.table.TableStatus;
import sleeper.core.util.ExponentialBackoffWithJitter;
import sleeper.core.util.ExponentialBackoffWithJitter.WaitRange;
Expand Down Expand Up @@ -97,20 +95,6 @@ public void updateFromLogs() throws StateStoreException {
partitions.updateFromLog();
}

/**
* Adds a transaction to the transaction log. The transaction may or may not already be held in S3. If it is already
* held in S3, we don't need to write it to S3 again.
*
* @param request the request
*/
public void addTransaction(AddTransactionRequest request) {
if (request.getTransaction() instanceof FileReferenceTransaction) {
files.addTransaction(request);
} else if (request.getTransaction() instanceof PartitionTransaction) {
partitions.addTransaction(request);
}
}

/**
* Applies a transaction log entry to the local state, and applies some action based on the state before it. Will
* read from the transaction log if entries are missing between the last read entry and the given entry. The given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package sleeper.core.statestore.transactionlog.transaction;

import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;

import java.time.Instant;
Expand All @@ -41,4 +42,16 @@ public interface StateStoreTransaction<T> {
* @param updateTime the time that the update was added to the state store
*/
void apply(T state, Instant updateTime);

/**
* Validates against the state store before attempting to add this transaction. Should not be relied upon
* but can be used as an extra check against state that this transaction does not operate on. Note that
* any checks from this are before the state is updated as part of the transaction. This cannot validate
* against the state the transaction is applied to as it may be updated by another process in the meantime.
*
* @param stateStore the state store
* @see #validate
*/
default void checkBefore(StateStore stateStore) throws StateStoreException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package sleeper.core.statestore.transactionlog.transaction.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.statestore.AssignJobIdRequest;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStoreException;
Expand All @@ -35,6 +38,7 @@
* A transaction to assign files to jobs.
*/
public class AssignJobIdsTransaction implements FileReferenceTransaction {
public static final Logger LOGGER = LoggerFactory.getLogger(AssignJobIdsTransaction.class);

private final List<AssignJobIdRequest> requests;

Expand All @@ -54,6 +58,7 @@ public static Optional<AssignJobIdsTransaction> ignoringEmptyRequests(List<Assig
.filter(request -> !request.getFilenames().isEmpty())
.collect(toUnmodifiableList());
if (filtered.isEmpty()) {
LOGGER.warn("Attempted to create transaction with no file assignments, received requests: {}", requests);
return Optional.empty();
} else {
return Optional.of(new AssignJobIdsTransaction(filtered));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
package sleeper.core.statestore.transactionlog.transaction.impl;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.transactionlog.state.StateStorePartitions;
import sleeper.core.statestore.transactionlog.transaction.PartitionTransaction;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -35,6 +40,16 @@ public InitialisePartitionsTransaction(List<Partition> partitions) {
this.partitions = partitions;
}

/**
* Creates a transaction to initialise the table with a single root partition.
*
* @param schema the table schema
* @return the transaction
*/
public static InitialisePartitionsTransaction singlePartition(Schema schema) {
return new InitialisePartitionsTransaction(new PartitionsFromSplitPoints(schema, Collections.emptyList()).construct());
}

@Override
public void validate(StateStorePartitions stateStorePartitions) {
}
Expand All @@ -45,6 +60,13 @@ public void apply(StateStorePartitions stateStorePartitions, Instant updateTime)
partitions.forEach(stateStorePartitions::put);
}

@Override
public void checkBefore(StateStore stateStore) throws StateStoreException {
if (!stateStore.hasNoFiles()) {
throw new StateStoreException("Cannot initialise state store when files are present");
}
}

@Override
public int hashCode() {
return Objects.hash(partitions);
Expand All @@ -66,4 +88,5 @@ public boolean equals(Object obj) {
public String toString() {
return "InitialisePartitionsTransaction{partitions=" + partitions + "}";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import sleeper.core.statestore.ReplaceFileReferencesRequest;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
import sleeper.core.statestore.exception.FileNotFoundException;
import sleeper.core.statestore.exception.FileReferenceNotAssignedToJobException;
import sleeper.core.statestore.exception.FileReferenceNotFoundException;
import sleeper.core.statestore.exception.NewReferenceSameAsOldReferenceException;
import sleeper.core.statestore.transactionlog.state.StateStoreFile;
import sleeper.core.statestore.transactionlog.state.StateStoreFiles;
import sleeper.core.statestore.transactionlog.transaction.FileReferenceTransaction;
Expand Down Expand Up @@ -114,6 +119,24 @@ public void reportJobsAllFailed(CompactionJobTracker tracker, TableStatus sleepe
}
}

/**
* Validates the transaction against the current state. Used during a synchronous commit to report on any
* failures.
*
* @param stateStoreFiles the state
* @throws FileNotFoundException if an input file does not exist
* @throws FileReferenceNotFoundException if an input file is not referenced on the same partition
* @throws FileReferenceNotAssignedToJobException if an input file is not assigned to the job on this partition
* @throws FileAlreadyExistsException if the new file already exists in the state store
* @throws NewReferenceSameAsOldReferenceException if any of the input files are the same as the output file
*/
public void validateStateChange(StateStoreFiles stateStoreFiles) {
for (ReplaceFileReferencesRequest job : jobs) {
job.validateNewReference();
job.validateStateChange(stateStoreFiles);
}
}

@Override
public int hashCode() {
return Objects.hash(jobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;

/**
* A convenience class for specifying a partition tree. This includes methods to define a tree to be readable in a test,
* including shorthand which would not be possible with {@link PartitionFactory}.
Expand Down Expand Up @@ -122,7 +124,7 @@ public void applySplit(StateStore stateStore, String partitionId) {
Partition toSplit = partitionById(partitionId);
Partition left = partitionById(toSplit.getChildPartitionIds().get(0));
Partition right = partitionById(toSplit.getChildPartitionIds().get(1));
stateStore.atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
update(stateStore).atomicallyUpdatePartitionAndCreateNewOnes(toSplit, left, right);
}

/**
Expand Down
Loading

0 comments on commit 9efdb52

Please sign in to comment.