Skip to content

Commit

Permalink
Merge pull request #4301 from gchq/4281-Simplify-production-code-to-a…
Browse files Browse the repository at this point in the history
…dd-a-transaction-sync-asynchronously

Issue 4281 - Simplify production code to add a transaction sync/asynchronously
  • Loading branch information
rtjd6554 authored Feb 26, 2025
2 parents 05c2af9 + 2ddc42c commit 3d8483e
Show file tree
Hide file tree
Showing 51 changed files with 425 additions and 541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.StateStoreCommitRequest;
Expand Down Expand Up @@ -116,16 +115,15 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
Instant finishTime = getTime.get();
boolean asyncCommit = tableProperties.getBoolean(BULK_IMPORT_FILES_COMMIT_ASYNC);
try {
AddFilesTransaction transaction = AddFilesTransaction.builder()
.jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime)
.fileReferences(output.fileReferences())
.build();
if (asyncCommit) {
AddFilesTransaction transaction = AddFilesTransaction.builder()
.jobId(job.getId()).taskId(taskId).jobRunId(jobRunId).writtenTime(finishTime)
.files(AllReferencesToAFile.newFilesWithReferences(output.fileReferences()))
.build();
asyncSender.send(StateStoreCommitRequest.create(table.getTableUniqueId(), transaction));
LOGGER.info("Submitted asynchronous request to state store committer to add {} files for job {} in table {}", output.numFiles(), job.getId(), table);
} else {
stateStoreProvider.getStateStore(tableProperties)
.addFiles(output.fileReferences());
transaction.synchronousCommit(stateStoreProvider.getStateStore(tableProperties));
LOGGER.info("Added {} files to statestore for job {} in table {}", output.numFiles(), job.getId(), table);
}
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.testutils.FixedTablePropertiesProvider;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.AllReferencesToAFile;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
Expand Down Expand Up @@ -181,7 +180,7 @@ void shouldCommitNewFilesAsynchronouslyWhenConfigured() throws Exception {
assertThat(commitRequestQueue).containsExactly(StateStoreCommitRequest.create(tableProperties.get(TABLE_ID),
AddFilesTransaction.builder()
.jobId(job.getId()).taskId("test-task").jobRunId("test-run").writtenTime(finishTime)
.files(AllReferencesToAFile.newFilesWithReferences(outputFiles))
.fileReferences(outputFiles)
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@

public class DeleteTable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeleteTable.class);

private final AmazonS3 s3Client;
private final InstanceProperties instanceProperties;
private final TablePropertiesStore tablePropertiesStore;
private final StateStoreProvider stateStoreProvider;

public DeleteTable(AmazonS3 s3Client, AmazonDynamoDB dynamoDB, InstanceProperties instanceProperties) {
this(instanceProperties, s3Client, S3TableProperties.createStore(instanceProperties, s3Client, dynamoDB),
this(s3Client, instanceProperties, S3TableProperties.createStore(instanceProperties, s3Client, dynamoDB),
StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDB, getConfigurationForClient()));
}

public DeleteTable(InstanceProperties instanceProperties, AmazonS3 s3Client, TablePropertiesStore tablePropertiesStore, StateStoreProvider stateStoreProvider) {
public DeleteTable(AmazonS3 s3Client, InstanceProperties instanceProperties, TablePropertiesStore tablePropertiesStore, StateStoreProvider stateStoreProvider) {
this.s3Client = s3Client;
this.instanceProperties = instanceProperties;
this.tablePropertiesStore = tablePropertiesStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.transaction.impl.ClearFilesTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.ClearPartitionsTransaction;
import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction;
import sleeper.statestore.StateStoreFactory;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Function;

import static sleeper.clients.util.BucketUtils.deleteObjectsInBucketWithPrefix;
import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client;
Expand Down Expand Up @@ -70,7 +73,11 @@ public ReinitialiseTable(
}
}

public void run() throws IOException {
public void run() {
run(tableProperties -> InitialisePartitionsTransaction.singlePartition(tableProperties.getSchema()));
}

public void run(Function<TableProperties, InitialisePartitionsTransaction> buildPartitions) {
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId);
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoDBClient);
TableProperties tableProperties = tablePropertiesProvider.getByName(tableName);
Expand All @@ -83,23 +90,18 @@ public void run() throws IOException {

LOGGER.info("State store type: {}", stateStore.getClass().getName());

new ClearFilesTransaction().synchronousCommit(stateStore);
if (deletePartitions) {
stateStore.clearSleeperTable();
} else {
stateStore.clearFileData();
ClearPartitionsTransaction.create().synchronousCommit(stateStore);
}
deleteObjectsInBucketWithPrefix(s3Client, instanceProperties.get(DATA_BUCKET), tableProperties.get(TABLE_ID),
key -> key.matches(tableProperties.get(TABLE_ID) + "/partition.*/.*"));
if (deletePartitions) {
LOGGER.info("Fully reinitialising table");
initialiseStateStore(tableProperties, stateStore);
buildPartitions.apply(tableProperties).synchronousCommit(stateStore);
}
}

protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException {
stateStore.initialise();
}

public static void main(String[] args) {
if (args.length < 2 || args.length > 3) {
throw new IllegalArgumentException("Usage: <instance-id> <table-name> <optional-delete-partitions-true-or-false>");
Expand Down Expand Up @@ -127,7 +129,7 @@ public static void main(String[] args) {
ReinitialiseTable reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, deletePartitions);
reinitialiseTable.run();
LOGGER.info("Table reinitialised successfully");
} catch (RuntimeException | IOException e) {
} catch (RuntimeException e) {
LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " +
"The error message is as follows:\n\n" + e.getMessage()
+ "\n\nCause:" + e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import sleeper.clients.status.partitions.ExportPartitions;
import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionSerDe;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -45,8 +45,9 @@
* is reinitialised using the partitions in the provided file. This file should
* have been created using the class {@link ExportPartitions}.
*/
public class ReinitialiseTableFromExportedPartitions extends ReinitialiseTable {
public class ReinitialiseTableFromExportedPartitions {
private static final Logger LOGGER = LoggerFactory.getLogger(ReinitialiseTableFromExportedPartitions.class);
private final ReinitialiseTable reinitialiseTable;
private final String partitionsFile;

public ReinitialiseTableFromExportedPartitions(
Expand All @@ -55,16 +56,15 @@ public ReinitialiseTableFromExportedPartitions(
String instanceId,
String tableName,
String partitionsFile) {
super(s3Client, dynamoDBClient, instanceId, tableName, true);
this.reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, true);
this.partitionsFile = partitionsFile;
}

@Override
protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException {
stateStore.initialise(readPartitions(tableProperties.getSchema()));
public void run() {
reinitialiseTable.run(tableProperties -> new InitialisePartitionsTransaction(readPartitions(tableProperties.getSchema())));
}

private List<Partition> readPartitions(Schema schema) throws IOException {
private List<Partition> readPartitions(Schema schema) {
PartitionSerDe partitionSerDe = new PartitionSerDe(schema);
List<Partition> partitions = new ArrayList<>();
LOGGER.info("Attempting to read partitions from file {}", partitionsFile);
Expand All @@ -75,6 +75,8 @@ private List<Partition> readPartitions(Schema schema) throws IOException {
partitions.add(partitionSerDe.fromJson(line));
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
LOGGER.info("Read {} partitions from file", partitions.size());

Expand All @@ -101,10 +103,11 @@ public static void main(String[] args) {
AmazonDynamoDB dynamoDBClient = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard());

try {
ReinitialiseTable reinitialiseTable = new ReinitialiseTableFromExportedPartitions(s3Client, dynamoDBClient, instanceId, tableName, exportedPartitionsFile);
ReinitialiseTableFromExportedPartitions reinitialiseTable = new ReinitialiseTableFromExportedPartitions(
s3Client, dynamoDBClient, instanceId, tableName, exportedPartitionsFile);
reinitialiseTable.run();
LOGGER.info("Table reinitialised successfully");
} catch (RuntimeException | IOException e) {
} catch (RuntimeException e) {
LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " +
"The error message is as follows:\n\n" + e.getMessage()
+ "\n\nCause:" + e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionsFromSplitPoints;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.transaction.impl.InitialisePartitionsTransaction;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;

import static sleeper.configuration.ReadSplitPoints.readSplitPoints;
Expand All @@ -37,8 +39,9 @@
* and all the information in the state store. Then the state store for the table
* is reinitialised using the split points in the provided file.
*/
public class ReinitialiseTableFromSplitPoints extends ReinitialiseTable {
public class ReinitialiseTableFromSplitPoints {
private static final Logger LOGGER = LoggerFactory.getLogger(ReinitialiseTableFromSplitPoints.class);
private final ReinitialiseTable reinitialiseTable;
private final boolean splitPointStringsBase64Encoded;
private final String splitPointsFileLocation;

Expand All @@ -49,15 +52,22 @@ public ReinitialiseTableFromSplitPoints(
String tableName,
String splitPointsFileLocation,
boolean splitPointStringsBase64Encoded) {
super(s3Client, dynamoDBClient, instanceId, tableName, true);
this.reinitialiseTable = new ReinitialiseTable(s3Client, dynamoDBClient, instanceId, tableName, true);
this.splitPointStringsBase64Encoded = splitPointStringsBase64Encoded;
this.splitPointsFileLocation = splitPointsFileLocation;
}

@Override
protected void initialiseStateStore(TableProperties tableProperties, StateStore stateStore) throws IOException {
List<Object> splitPoints = readSplitPoints(tableProperties, splitPointsFileLocation, splitPointStringsBase64Encoded);
stateStore.initialise(new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct());
public void run() {
reinitialiseTable.run(tableProperties -> new InitialisePartitionsTransaction(readPartitions(tableProperties)));
}

private List<Partition> readPartitions(TableProperties tableProperties) {
try {
List<Object> splitPoints = readSplitPoints(tableProperties, splitPointsFileLocation, splitPointStringsBase64Encoded);
return new PartitionsFromSplitPoints(tableProperties.getSchema(), splitPoints).construct();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static void main(String[] args) {
Expand All @@ -83,11 +93,11 @@ public static void main(String[] args) {
AmazonDynamoDB dynamoDBClient = buildAwsV1Client(AmazonDynamoDBClientBuilder.standard());

try {
ReinitialiseTable reinitialiseTable = new ReinitialiseTableFromSplitPoints(s3Client, dynamoDBClient, instanceId, tableName,
ReinitialiseTableFromSplitPoints reinitialiseTable = new ReinitialiseTableFromSplitPoints(s3Client, dynamoDBClient, instanceId, tableName,
splitPointsFile, splitPointsFileBase64Encoded);
reinitialiseTable.run();
LOGGER.info("Table reinitialised successfully");
} catch (RuntimeException | IOException e) {
} catch (RuntimeException e) {
LOGGER.error("\nAn Error occurred while trying to reinitialise the table. " +
"The error message is as follows:\n\n" + e.getMessage()
+ "\n\nCause:" + e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.log.TransactionLogEntry;
import sleeper.core.statestore.transactionlog.log.TransactionLogRange;
import sleeper.core.table.TableNotFoundException;
import sleeper.core.table.TableStatus;
import sleeper.core.util.ObjectFactory;
Expand All @@ -41,6 +43,7 @@
import sleeper.ingest.runner.IngestRecords;
import sleeper.localstack.test.LocalStackTestBase;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.transactionlog.DynamoDBTransactionLogStore;
import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;
import sleeper.statestore.transactionlog.snapshots.DynamoDBTransactionLogSnapshotCreator;

Expand Down Expand Up @@ -113,6 +116,8 @@ void shouldDeleteOnlyTable() throws Exception {
assertThatThrownBy(() -> propertiesStore.loadByName("table-1"))
.isInstanceOf(TableNotFoundException.class);
assertThat(streamTableObjects(table)).isEmpty();
assertThat(streamTableFileTransactions(table)).isEmpty();
assertThat(streamTablePartitionTransactions(table)).isEmpty();
}

@Test
Expand Down Expand Up @@ -144,9 +149,13 @@ void shouldDeleteOneTableWhenAnotherTableIsPresent() throws Exception {
assertThatThrownBy(() -> propertiesStore.loadByName("table-1"))
.isInstanceOf(TableNotFoundException.class);
assertThat(streamTableObjects(table1)).isEmpty();
assertThat(streamTableFileTransactions(table1)).isEmpty();
assertThat(streamTablePartitionTransactions(table1)).isEmpty();
assertThat(propertiesStore.loadByName("table-2"))
.isEqualTo(table2);
assertThat(streamTableObjects(table2)).isNotEmpty();
assertThat(streamTableFileTransactions(table2)).isNotEmpty();
assertThat(streamTablePartitionTransactions(table2)).isNotEmpty();
}

@Test
Expand Down Expand Up @@ -196,7 +205,7 @@ void shouldFailToDeleteTableThatDoesNotExist() {
}

private void deleteTable(String tableName) throws Exception {
new DeleteTable(instanceProperties, s3Client, propertiesStore,
new DeleteTable(s3Client, instanceProperties, propertiesStore,
StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient, hadoopConf))
.delete(tableName);
}
Expand Down Expand Up @@ -237,4 +246,14 @@ private Stream<S3ObjectSummary> streamTableObjects(TableProperties tableProperti
.getObjectSummaries().stream()
.filter(s3ObjectSummary -> s3ObjectSummary.getSize() > 0);
}

private Stream<TransactionLogEntry> streamTableFileTransactions(TableProperties tableProperties) {
return DynamoDBTransactionLogStore.forFiles(instanceProperties, tableProperties, dynamoClient, s3Client)
.readTransactions(TransactionLogRange.fromMinimum(1));
}

private Stream<TransactionLogEntry> streamTablePartitionTransactions(TableProperties tableProperties) {
return DynamoDBTransactionLogStore.forPartitions(instanceProperties, tableProperties, dynamoClient, s3Client)
.readTransactions(TransactionLogRange.fromMinimum(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void commit(CompactionJob job, CompactionJobFinishedEvent finishedEvent)
}
} else {
LOGGER.debug("Committing compaction job {} inside compaction task", job.getId());
stateStoreProvider.getStateStore(tableProperties).atomicallyReplaceFileReferencesWithNewOnes(List.of(request));
new ReplaceFileReferencesTransaction(List.of(request))
.synchronousCommit(stateStoreProvider.getStateStore(tableProperties));
tracker.jobCommitted(job.committedEventBuilder(timeSupplier.get())
.jobRunId(finishedEvent.getJobRunId())
.taskId(finishedEvent.getTaskId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface AssignJobIdToFiles {

static AssignJobIdToFiles synchronous(StateStore stateStore) {
return (assignJobIdRequests, tableStatus) -> {
stateStore.assignJobIds(assignJobIdRequests);
new AssignJobIdsTransaction(assignJobIdRequests).synchronousCommit(stateStore);
};
}

Expand Down
Loading

0 comments on commit 3d8483e

Please sign in to comment.