Skip to content

Commit

Permalink
Use wrapper for atomicallyReplaceFileReferencesWithNewOnes
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Feb 21, 2025
1 parent 7315580 commit 9c90d14
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;

public class CompactionRunnerTestUtils {

Expand Down Expand Up @@ -59,7 +60,7 @@ public static void assignJobIdToInputFiles(StateStore stateStore, CompactionJob
}

public static void assignJobIdsToInputFiles(StateStore stateStore, CompactionJob... jobs) throws Exception {
stateStore.assignJobIds(Stream.of(jobs)
update(stateStore).assignJobIds(Stream.of(jobs)
.map(CompactionJob::createAssignJobIdRequest)
.collect(toUnmodifiableList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import sleeper.core.record.Record;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.transaction.impl.ReplaceFileReferencesTransaction;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
Expand Down Expand Up @@ -63,6 +61,7 @@
import java.util.UUID;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;

public class InMemoryCompaction {
private final List<CompactionJob> queuedJobs = new ArrayList<>();
Expand Down Expand Up @@ -184,10 +183,9 @@ private JobRunSummary compact(CompactionJob job, TableProperties tableProperties
Schema schema = tableProperties.getSchema();
Partition partition = getPartitionForJob(stateStore, job);
RecordsProcessed recordsProcessed = mergeInputFiles(job, partition, schema);
stateStore.addTransaction(AddTransactionRequest.withTransaction(new ReplaceFileReferencesTransaction(List.of(
update(stateStore).atomicallyReplaceFileReferencesWithNewOnes(List.of(
job.replaceFileReferencesRequestBuilder(recordsProcessed.getRecordsWritten())
.taskId(run.getTaskId()).build())))
.build());
.taskId(run.getTaskId()).build()));
Instant finishTime = startTime.plus(Duration.ofMinutes(1));
return new JobRunSummary(recordsProcessed, startTime, finishTime);
}
Expand Down

0 comments on commit 9c90d14

Please sign in to comment.