Skip to content

Commit

Permalink
Remove code for correlating updates by task ID
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Feb 20, 2025
1 parent 295aa8e commit 0a6efb7
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package sleeper.core.tracker.job.run;

import sleeper.core.tracker.job.status.JobRunStartedUpdate;
import sleeper.core.tracker.job.status.JobStatusUpdate;
import sleeper.core.tracker.job.status.JobStatusUpdateRecord;

import java.util.ArrayList;
Expand Down Expand Up @@ -45,7 +43,6 @@
*/
class JobRunsBuilder {
private final Map<String, JobRun.Builder> builderByJobRunId = new HashMap<>();
private final Map<String, JobRun.Builder> builderByTaskId = new HashMap<>();
private final List<JobRun.Builder> orderedBuilders = new ArrayList<>();

void add(JobStatusUpdateRecord record) {
Expand All @@ -62,17 +59,10 @@ void add(JobStatusUpdateRecord record) {
}

private Optional<JobRun.Builder> getBuilderIfCorrelatable(JobStatusUpdateRecord record) {
String jobRunId = record.getJobRunId();
String taskId = record.getTaskId();
JobStatusUpdate statusUpdate = record.getStatusUpdate();
if (jobRunId != null) {
return Optional.of(builderByJobRunId.computeIfAbsent(jobRunId, id -> createOrderedBuilder()));
} else if (isStartedUpdateAndStartOfRun(statusUpdate)) {
JobRun.Builder builder = createOrderedBuilder();
builderByTaskId.put(taskId, builder);
return Optional.of(builder);
if (record.getStatusUpdate().isPartOfRun()) {
return Optional.of(builderByJobRunId.computeIfAbsent(record.getJobRunId(), id -> createOrderedBuilder()));
} else {
return Optional.ofNullable(builderByTaskId.get(taskId));
return Optional.empty();
}
}

Expand All @@ -89,9 +79,4 @@ JobRuns build() {
Collections.reverse(jobRuns);
return JobRuns.latestFirst(jobRuns);
}

private static boolean isStartedUpdateAndStartOfRun(JobStatusUpdate statusUpdate) {
return statusUpdate instanceof JobRunStartedUpdate
&& ((JobRunStartedUpdate) statusUpdate).isStartOfRun();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.startedStatus;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.TASK_ID_1;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.TASK_ID_2;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forRunOnTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.onTask;

public class JobRunsRecordsOutOfOrderTest {
Expand Down Expand Up @@ -58,7 +59,7 @@ public void shouldReportRunsWhenJobStartedReturnedFromDatabaseOutOfOrder() {
TestJobStartedStatus started3 = startedStatus(Instant.parse("2022-09-26T09:23:30.001Z"));

// When
JobRuns runs = runsFromUpdates(started3, started1, started2);
JobRuns runs = runsFromUpdates(forRunOnTask(started3), forRunOnTask(started1), forRunOnTask(started2));

// Then
assertThat(runs.getRunsLatestFirst())
Expand All @@ -78,7 +79,7 @@ public void shouldReportRunsWhenLastRunFinishedButReturnedFromDatabaseOutOfOrder
AggregatedTaskJobsFinishedStatus finished = finishedStatus(started3, Duration.ofSeconds(30), 450L, 300L);

// When
JobRuns runs = runsFromUpdates(started3, finished, started1, started2);
JobRuns runs = runsFromUpdates(forRunOnTask(started3, finished), forRunOnTask(started1), forRunOnTask(started2));

// Then
assertThat(runs.getRunsLatestFirst())
Expand Down
164 changes: 18 additions & 146 deletions java/core/src/test/java/sleeper/core/tracker/job/run/JobRunsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@
import static sleeper.core.tracker.job.status.TestJobStatus.notPartOfRunWithUpdateTime;
import static sleeper.core.tracker.job.status.TestJobStatus.partOfRunWithUpdateTime;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.DEFAULT_TASK_ID;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.TASK_ID_1;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.TASK_ID_2;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forJobRunOnTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forNoRunNoTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forRunOnNoTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forRunOnTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.onNoTask;
Expand Down Expand Up @@ -122,148 +121,6 @@ void shouldReportRunFailedBeforeStart() {
}
}

@DisplayName("Correlate job runs by task ID and time")
@Nested
class CorrelateRunsByTaskAndTime {

@Test
void shouldReportTwoRunsLatestFirstByStartTimeOnSameTask() {
// Given
TestJobStartedStatus started1 = startedStatus(Instant.parse("2022-09-24T09:23:30.001Z"));
TestJobStartedStatus started2 = startedStatus(Instant.parse("2022-09-24T09:24:30.001Z"));

// When
JobRuns runs = runsFromUpdates(started1, started2);

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(DEFAULT_TASK_ID, List.of(started2)),
tuple(DEFAULT_TASK_ID, List.of(started1)));
}

@Test
void shouldReportTwoRunsWhenJobFinishedMultipleTimesSameTask() {
// Given
TestJobStartedStatus started1 = startedStatus(Instant.parse("2022-09-23T09:23:30.001Z"));
AggregatedTaskJobsFinishedStatus finished1 = finishedStatus(started1, Duration.ofSeconds(30), 450L, 300L);
TestJobStartedStatus started2 = startedStatus(Instant.parse("2022-09-24T09:23:30.001Z"));
AggregatedTaskJobsFinishedStatus finished2 = finishedStatus(started2, Duration.ofSeconds(30), 450L, 300L);

// When
JobRuns runs = runsFromUpdates(started1, finished1, started2, finished2);

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(DEFAULT_TASK_ID, List.of(started2, finished2)),
tuple(DEFAULT_TASK_ID, List.of(started1, finished1)));
}

@Test
void shouldReportTwoTasksWithTwoRunsEachForSameJobWithInterleavingStartTimes() {
// Given
TestJobStartedStatus started1 = startedStatus(Instant.parse("2022-09-23T09:23:30.001Z"));
TestJobStartedStatus started2 = startedStatus(Instant.parse("2022-09-24T09:23:30.001Z"));
TestJobStartedStatus started3 = startedStatus(Instant.parse("2022-09-25T09:23:30.001Z"));
TestJobStartedStatus started4 = startedStatus(Instant.parse("2022-09-26T09:23:30.001Z"));

// When
JobRuns runs = runsFromUpdates(
onTask(TASK_ID_1, started1, started3),
onTask(TASK_ID_2, started2, started4));

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(TASK_ID_2, List.of(started4)),
tuple(TASK_ID_1, List.of(started3)),
tuple(TASK_ID_2, List.of(started2)),
tuple(TASK_ID_1, List.of(started1)));
}

@Test
void shouldReportTwoTasksWithOneFinishedRunEach() {
// Given
TestJobStartedStatus started1 = startedStatus(Instant.parse("2022-09-23T09:23:30.001Z"));
AggregatedTaskJobsFinishedStatus finished1 = finishedStatus(started1, Duration.ofSeconds(30), 450L, 300L);
TestJobStartedStatus started2 = startedStatus(Instant.parse("2022-09-24T09:23:30.001Z"));
AggregatedTaskJobsFinishedStatus finished2 = finishedStatus(started2, Duration.ofSeconds(30), 450L, 300L);

// When
JobRuns runs = runsFromUpdates(
onTask(TASK_ID_1, started1, finished1),
onTask(TASK_ID_2, started2, finished2));

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(TASK_ID_2, List.of(started2, finished2)),
tuple(TASK_ID_1, List.of(started1, finished1)));
}

@Test
void shouldReportRunsOnDifferentTasksWhenJobRunStartedAndFinishedDuringAnotherRun() {
// Given
TestJobStartedStatus started1 = startedStatus(Instant.parse("2022-09-23T09:23:00.001Z"));
TestJobStartedStatus started2 = startedStatus(Instant.parse("2022-09-23T09:23:30.001Z"));
AggregatedTaskJobsFinishedStatus finished1 = finishedStatus(started1, Duration.ofMinutes(2), 450L, 300L);
AggregatedTaskJobsFinishedStatus finished2 = finishedStatus(started2, Duration.ofSeconds(30), 450L, 300L);

// When
JobRuns runs = runsFromUpdates(
onTask(TASK_ID_1, started1, finished1),
onTask(TASK_ID_2, started2, finished2));

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(TASK_ID_2, List.of(started2, finished2)),
tuple(TASK_ID_1, List.of(started1, finished1)));
}

@Test
void shouldExcludeUpdateNotPartOfARunBeforeTask() {
// Given
TestJobStatus notPartOfRun = notPartOfRunWithUpdateTime(Instant.parse("2024-06-19T14:06:00Z"));
TestJobStartedStatus started = startedStatus(Instant.parse("2024-06-19T14:06:01Z"));

// When
JobRuns runs = runsFromUpdates(
onNoTask(notPartOfRun),
onTask(TASK_ID_1, started));

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(TASK_ID_1, List.of(started)));
}

@Test
void shouldExcludeUpdateNotPartOfARunAfterTaskStarted() {
// Given
TestJobStartedStatus started = startedStatus(Instant.parse("2024-06-19T14:06:00Z"));
TestJobStatus notPartOfRun = notPartOfRunWithUpdateTime(Instant.parse("2024-06-19T14:06:01Z"));

// When
JobRuns runs = runsFromUpdates(
onTask(TASK_ID_1, started),
onNoTask(notPartOfRun));

// Then
assertThat(runs.getRunsLatestFirst())
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(
tuple(TASK_ID_1, List.of(started)));
}
}

@DisplayName("Correlate job runs by run ID")
@Nested
class CorrelateRunsById {
Expand Down Expand Up @@ -481,6 +338,21 @@ void shouldCreateRunWithCustomStatusUpdateNotPartOfRunButStillOnTask() {
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly("a-task", List.of(startedStatus));
}

@Test
void shouldCreateRunWhenStatusUpdatePartOfRunButHasNoRunId() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));

// When
JobRuns runs = runsFromUpdates(forNoRunNoTask("a-job", startedStatus));

// Then
assertThat(runs.getRunsLatestFirst())
.singleElement()
.extracting(JobRun::getTaskId, JobRun::getStatusUpdates)
.containsExactly(null, List.of(startedStatus));
}
}

@DisplayName("An update can be the start and end of a run")
Expand Down Expand Up @@ -514,7 +386,7 @@ void shouldStartAnotherRunAfterAStartedAndFinishedUpdate() {
TestJobStartedStatus started = startedStatus(Instant.parse("2022-09-24T09:24:00.001Z"));

// When
JobRuns runs = runsFromUpdates(startedAndFinished, started);
JobRuns runs = runsFromUpdates(forRunOnTask(startedAndFinished), forRunOnTask(started));

// Then
assertThat(runs.getRunsLatestFirst())
Expand All @@ -535,7 +407,7 @@ void shouldFinishAnotherRunAfterAStartedAndFinishedUpdate() {
AggregatedTaskJobsFinishedStatus finished = finishedStatus(started, Duration.ofSeconds(30), 450L, 300L);

// When
JobRuns runs = runsFromUpdates(startedAndFinished, started, finished);
JobRuns runs = runsFromUpdates(forRunOnTask(startedAndFinished), forRunOnTask(started, finished));

// Then
assertThat(runs.getRunsLatestFirst())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -78,7 +79,7 @@ public TestJobStatusUpdateRecords fromUpdates(JobStatusUpdate... statusUpdates)
*/
public static TaskUpdates forJobOnTask(
String jobId, String taskId, JobStatusUpdate... updates) {
return forJobRunOnTask(jobId, null, taskId, updates);
return forJobRunOnTask(jobId, UUID.randomUUID().toString(), taskId, updates);
}

/**
Expand Down Expand Up @@ -136,7 +137,7 @@ public static TaskUpdates forJob(
*/
public static TaskUpdates onTask(
String taskId, JobStatusUpdate... updates) {
return forJobOnTask(DEFAULT_JOB_ID, taskId, updates);
return forRunOnTask(UUID.randomUUID().toString(), taskId, updates);
}

/**
Expand All @@ -149,6 +150,16 @@ public static TaskUpdates onNoTask(JobStatusUpdate... updates) {
return forJobOnTask(DEFAULT_JOB_ID, null, updates);
}

/**
* Creates an instance of task updates with the default job ID.
*
* @param updates the job status updates
* @return a {@link TaskUpdates} instance
*/
public static TaskUpdates forRunOnTask(JobStatusUpdate... updates) {
return forJobRunOnTask(DEFAULT_JOB_ID, UUID.randomUUID().toString(), DEFAULT_TASK_ID, updates);
}

/**
* Creates an instance of task updates with the default job ID.
*
Expand Down

0 comments on commit 0a6efb7

Please sign in to comment.