From 9c90d14a1769262f3e3ad7e29a6fc4aea131da2b Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 21 Feb 2025 11:48:00 +0000 Subject: [PATCH] Use wrapper for atomicallyReplaceFileReferencesWithNewOnes --- .../execution/testutils/CompactionRunnerTestUtils.java | 3 ++- .../dsl/testutil/drivers/InMemoryCompaction.java | 8 +++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactionRunnerTestUtils.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactionRunnerTestUtils.java index 28ea78cbfe..1b345c9982 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactionRunnerTestUtils.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactionRunnerTestUtils.java @@ -26,6 +26,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class CompactionRunnerTestUtils { @@ -59,7 +60,7 @@ public static void assignJobIdToInputFiles(StateStore stateStore, CompactionJob } public static void assignJobIdsToInputFiles(StateStore stateStore, CompactionJob... jobs) throws Exception { - stateStore.assignJobIds(Stream.of(jobs) + update(stateStore).assignJobIds(Stream.of(jobs) .map(CompactionJob::createAssignJobIdRequest) .collect(toUnmodifiableList())); } diff --git a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java index bd3272e2b5..3b8b529c05 100644 --- a/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java +++ b/java/system-test/system-test-dsl/src/test/java/sleeper/systemtest/dsl/testutil/drivers/InMemoryCompaction.java @@ -31,8 +31,6 @@ import sleeper.core.record.Record; import sleeper.core.schema.Schema; import sleeper.core.statestore.StateStore; -import sleeper.core.statestore.transactionlog.AddTransactionRequest; -import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction; import sleeper.core.tracker.compaction.job.CompactionJobTracker; import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker; import sleeper.core.tracker.compaction.job.query.CompactionJobStatus; @@ -63,6 +61,7 @@ import java.util.UUID; import static java.util.stream.Collectors.toUnmodifiableList; +import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update; public class InMemoryCompaction { private final List queuedJobs = new ArrayList<>(); @@ -184,10 +183,9 @@ private JobRunSummary compact(CompactionJob job, TableProperties tableProperties Schema schema = tableProperties.getSchema(); Partition partition = getPartitionForJob(stateStore, job); RecordsProcessed recordsProcessed = mergeInputFiles(job, partition, schema); - stateStore.addTransaction(AddTransactionRequest.withTransaction(new ReplaceFileReferencesTransaction(List.of( + update(stateStore).atomicallyReplaceFileReferencesWithNewOnes(List.of( job.replaceFileReferencesRequestBuilder(recordsProcessed.getRecordsWritten()) - .taskId(run.getTaskId()).build()))) - .build()); + .taskId(run.getTaskId()).build())); Instant finishTime = startTime.plus(Duration.ofMinutes(1)); return new JobRunSummary(recordsProcessed, startTime, finishTime); }