Skip to content

Commit

Permalink
Remove isPartOfRun flag
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Feb 21, 2025
1 parent 092b51d commit 796237d
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package sleeper.core.tracker.compaction.job.query;

import sleeper.core.tracker.job.status.JobStatusUpdate;
import sleeper.core.tracker.job.status.JobRunStatusUpdate;

import java.time.Instant;
import java.util.Objects;

public class CompactionJobCommittedStatus implements JobStatusUpdate {
public class CompactionJobCommittedStatus implements JobRunStatusUpdate {

private final Instant commitTime;
private final Instant updateTime;
Expand All @@ -34,11 +34,6 @@ public static CompactionJobCommittedStatus commitAndUpdateTime(Instant commitTim
return new CompactionJobCommittedStatus(commitTime, updateTime);
}

@Override
public boolean isPartOfRun() {
return true;
}

@Override
public Instant getUpdateTime() {
return updateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package sleeper.core.tracker.ingest.job.query;

import sleeper.core.tracker.job.status.JobStatusUpdate;
import sleeper.core.tracker.job.status.JobRunStatusUpdate;

import java.time.Instant;
import java.util.Objects;

/**
* A status update for when an ingest job has committed files to the state store.
*/
public class IngestJobAddedFilesStatus implements JobStatusUpdate {
public class IngestJobAddedFilesStatus implements JobRunStatusUpdate {

private final Instant writtenTime;
private final Instant updateTime;
Expand Down Expand Up @@ -52,11 +52,6 @@ public int getFileCount() {
return fileCount;
}

@Override
public boolean isPartOfRun() {
return true;
}

@Override
public int hashCode() {
return Objects.hash(writtenTime, updateTime, fileCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ public List<String> getFailureReasons() {
return reasons;
}

@Override
public boolean isPartOfRun() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package sleeper.core.tracker.job.run;

import sleeper.core.tracker.job.status.JobRunStatusUpdate;
import sleeper.core.tracker.job.status.JobStatusUpdateRecord;

import java.util.ArrayList;
Expand Down Expand Up @@ -46,7 +47,7 @@ class JobRunsBuilder {
private final List<JobRun.Builder> orderedBuilders = new ArrayList<>();

void add(JobStatusUpdateRecord record) {
if (!record.getStatusUpdate().isPartOfRun()) {
if (!(record.getStatusUpdate() instanceof JobRunStatusUpdate)) {
return;
}
getBuilderIfCorrelatable(record)
Expand All @@ -59,7 +60,7 @@ void add(JobStatusUpdateRecord record) {
}

private Optional<JobRun.Builder> getBuilderIfCorrelatable(JobStatusUpdateRecord record) {
if (!record.getStatusUpdate().isPartOfRun()) {
if (!(record.getStatusUpdate() instanceof JobRunStatusUpdate)) {
return Optional.empty();
}
String jobRunId = record.getJobRunId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Interface for a status update that marks the end of a process run.
*/
public interface JobRunEndUpdate extends JobStatusUpdate {
public interface JobRunEndUpdate extends JobRunStatusUpdate {

/**
* Gets the time the job finished in the task.
Expand All @@ -53,8 +53,4 @@ default boolean isSuccessful() {
default List<String> getFailureReasons() {
return List.of();
}

default boolean isPartOfRun() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
/**
* Interface for a status update that marks the start of a run of a job.
*/
public interface JobRunStartedUpdate extends JobStatusUpdate {
public interface JobRunStartedUpdate extends JobRunStatusUpdate {
/**
* Get the start time of this run.
*
* @return the start time
*/
Instant getStartTime();

default boolean isPartOfRun() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.tracker.job.status;

/**
* Interface representing a status update for a job during a run of the job.
*/
public interface JobRunStatusUpdate extends JobStatusUpdate {

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.time.Instant;

/**
* Interface representing a status update for a job.
* Interface representing a status update for a job. Note that if the update is part of a run of the job, it should
* implement {@link JobRunStatusUpdate}.
*/
public interface JobStatusUpdate {

Expand All @@ -28,8 +29,4 @@ public interface JobStatusUpdate {
* @return the update time
*/
Instant getUpdateTime();

default boolean isPartOfRun() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import sleeper.core.tracker.job.status.AggregatedTaskJobsFinishedStatus;
import sleeper.core.tracker.job.status.JobRunFailedStatus;
import sleeper.core.tracker.job.status.JobRunStartedUpdate;
import sleeper.core.tracker.job.status.JobStatusUpdate;
import sleeper.core.tracker.job.status.TestJobRunStatus;
import sleeper.core.tracker.job.status.TestJobStartedAndFinishedStatus;
import sleeper.core.tracker.job.status.TestJobStartedStatus;
import sleeper.core.tracker.job.status.TestJobStatus;
import sleeper.core.tracker.job.status.TestJobStatusNotInRun;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -40,8 +40,6 @@
import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.failedStatus;
import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.finishedStatus;
import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.startedStatus;
import static sleeper.core.tracker.job.status.TestJobStatus.notPartOfRunWithUpdateTime;
import static sleeper.core.tracker.job.status.TestJobStatus.partOfRunWithUpdateTime;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.DEFAULT_TASK_ID;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forJobRunOnTask;
import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forNoRunNoTask;
Expand Down Expand Up @@ -173,7 +171,7 @@ void shouldIncludeUpdateForRunBeforeStartTimeWhenOccurredOnAnotherProcessWithOut
// And the other process has an out of sync clock such that the update occurred before the start update
TestJobStartedStatus started = startedStatus(
Instant.parse("2024-06-19T13:26:00Z"));
TestJobStatus update = partOfRunWithUpdateTime(
TestJobRunStatus update = new TestJobRunStatus(
Instant.parse("2024-06-19T13:25:59Z"));

// When
Expand All @@ -195,7 +193,7 @@ void shouldIncludeUpdateForRunAfterItIsFinished() {
Instant.parse("2024-06-19T13:26:00Z"));
AggregatedTaskJobsFinishedStatus finished = finishedStatus(
started, Duration.ofMinutes(1), 123, 123);
TestJobStatus update = partOfRunWithUpdateTime(
TestJobRunStatus update = new TestJobRunStatus(
Instant.parse("2024-06-19T13:28:00Z"));

// When
Expand All @@ -213,7 +211,7 @@ void shouldIncludeUpdateForRunAfterItIsFinished() {
@Test
void shouldExcludeUpdateNotPartOfARun() {
// Given
TestJobStatus notPartOfRun = notPartOfRunWithUpdateTime(Instant.parse("2024-06-19T14:06:00Z"));
TestJobStatusNotInRun notPartOfRun = new TestJobStatusNotInRun(Instant.parse("2024-06-19T14:06:00Z"));
TestJobStartedStatus started = startedStatus(Instant.parse("2024-06-19T14:06:01Z"));

// When
Expand Down Expand Up @@ -280,7 +278,7 @@ class FlagUpdatesAsPartOfRun {
@Test
void shouldNotCreateRunIfStatusUpdateNotFlaggedAsPartOfRun() {
// Given
JobStatusUpdate notStartedUpdate = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T09:23:30.001Z"));
TestJobStatusNotInRun notStartedUpdate = new TestJobStatusNotInRun(Instant.parse("2022-09-24T09:23:30.001Z"));

// When
JobRuns runs = runsFromSingleRunUpdates(notStartedUpdate);
Expand All @@ -293,7 +291,7 @@ void shouldNotCreateRunIfStatusUpdateNotFlaggedAsPartOfRun() {
void shouldCreateRunWithCustomStatusUpdatePartOfRun() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));
TestJobStatus customStatus = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z"));
TestJobRunStatus customStatus = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z"));

// When
JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus);
Expand All @@ -309,7 +307,7 @@ void shouldCreateRunWithCustomStatusUpdatePartOfRun() {
void shouldCreateRunWithCustomStatusUpdateNotPartOfRun() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));
TestJobStatus customStatus = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z"));
TestJobStatusNotInRun customStatus = new TestJobStatusNotInRun(Instant.parse("2022-09-24T10:23:30Z"));

// When
JobRuns runs = runsFromUpdates(forRunOnTask("a-task", startedStatus), onNoTask(customStatus));
Expand All @@ -325,7 +323,7 @@ void shouldCreateRunWithCustomStatusUpdateNotPartOfRun() {
void shouldCreateRunWithCustomStatusUpdateNotPartOfRunButStillOnTask() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));
TestJobStatus customStatus = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z"));
TestJobStatusNotInRun customStatus = new TestJobStatusNotInRun(Instant.parse("2022-09-24T10:23:30Z"));

// When
JobRuns runs = runsFromUpdates(forRunOnTask("a-task", startedStatus, customStatus));
Expand Down Expand Up @@ -441,30 +439,30 @@ class RetrieveStatusUpdatesByClass {
void shouldReturnLastStatusUpdateByClass() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));
TestJobStatus customStatus = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z"));
TestJobRunStatus customStatus = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z"));

// When
JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus);

// Then
assertThat(runs.getLatestRun()
.flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobStatus.class)))
.flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobRunStatus.class)))
.get().isEqualTo(customStatus);
}

@Test
void shouldReturnLastStatusUpdateByClassWithMultipleUpdatesForClass() {
// Given
TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z"));
TestJobStatus customStatus1 = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z"));
TestJobStatus customStatus2 = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:25:30Z"));
TestJobRunStatus customStatus1 = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z"));
TestJobRunStatus customStatus2 = new TestJobRunStatus(Instant.parse("2022-09-24T10:25:30Z"));

// When
JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus1, customStatus2);

// Then
assertThat(runs.getLatestRun()
.flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobStatus.class)))
.flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobRunStatus.class)))
.get().isEqualTo(customStatus2);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.tracker.job.status;

import java.time.Instant;

/**
* A test implementation of a job status update on a run.
*
* @param updateTime the time of the update
*/
public record TestJobRunStatus(Instant updateTime) implements JobRunStatusUpdate {

@Override
public Instant getUpdateTime() {
return updateTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public RecordsProcessed getRecordsProcessed() {
return summary.getRecordsProcessed();
}

@Override
public boolean isPartOfRun() {
return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit 796237d

Please sign in to comment.