diff --git a/java/core/src/main/java/sleeper/core/tracker/compaction/job/query/CompactionJobCommittedStatus.java b/java/core/src/main/java/sleeper/core/tracker/compaction/job/query/CompactionJobCommittedStatus.java index d6b8a9c891..6016797539 100644 --- a/java/core/src/main/java/sleeper/core/tracker/compaction/job/query/CompactionJobCommittedStatus.java +++ b/java/core/src/main/java/sleeper/core/tracker/compaction/job/query/CompactionJobCommittedStatus.java @@ -15,12 +15,12 @@ */ package sleeper.core.tracker.compaction.job.query; -import sleeper.core.tracker.job.status.JobStatusUpdate; +import sleeper.core.tracker.job.status.JobRunStatusUpdate; import java.time.Instant; import java.util.Objects; -public class CompactionJobCommittedStatus implements JobStatusUpdate { +public class CompactionJobCommittedStatus implements JobRunStatusUpdate { private final Instant commitTime; private final Instant updateTime; @@ -34,11 +34,6 @@ public static CompactionJobCommittedStatus commitAndUpdateTime(Instant commitTim return new CompactionJobCommittedStatus(commitTime, updateTime); } - @Override - public boolean isPartOfRun() { - return true; - } - @Override public Instant getUpdateTime() { return updateTime; diff --git a/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobAddedFilesStatus.java b/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobAddedFilesStatus.java index 03714183a3..32d4ec3412 100644 --- a/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobAddedFilesStatus.java +++ b/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobAddedFilesStatus.java @@ -15,7 +15,7 @@ */ package sleeper.core.tracker.ingest.job.query; -import sleeper.core.tracker.job.status.JobStatusUpdate; +import sleeper.core.tracker.job.status.JobRunStatusUpdate; import java.time.Instant; import java.util.Objects; @@ -23,7 +23,7 @@ /** * A status update for when an ingest job has committed files to the state store. */ -public class IngestJobAddedFilesStatus implements JobStatusUpdate { +public class IngestJobAddedFilesStatus implements JobRunStatusUpdate { private final Instant writtenTime; private final Instant updateTime; @@ -52,11 +52,6 @@ public int getFileCount() { return fileCount; } - @Override - public boolean isPartOfRun() { - return true; - } - @Override public int hashCode() { return Objects.hash(writtenTime, updateTime, fileCount); diff --git a/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobRejectedStatus.java b/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobRejectedStatus.java index 68a2f0e7bd..ec828400bf 100644 --- a/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobRejectedStatus.java +++ b/java/core/src/main/java/sleeper/core/tracker/ingest/job/query/IngestJobRejectedStatus.java @@ -89,11 +89,6 @@ public List getFailureReasons() { return reasons; } - @Override - public boolean isPartOfRun() { - return true; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/java/core/src/main/java/sleeper/core/tracker/job/run/JobRunsBuilder.java b/java/core/src/main/java/sleeper/core/tracker/job/run/JobRunsBuilder.java index ab592071db..4acb7a298e 100644 --- a/java/core/src/main/java/sleeper/core/tracker/job/run/JobRunsBuilder.java +++ b/java/core/src/main/java/sleeper/core/tracker/job/run/JobRunsBuilder.java @@ -15,6 +15,7 @@ */ package sleeper.core.tracker.job.run; +import sleeper.core.tracker.job.status.JobRunStatusUpdate; import sleeper.core.tracker.job.status.JobStatusUpdateRecord; import java.util.ArrayList; @@ -46,7 +47,7 @@ class JobRunsBuilder { private final List orderedBuilders = new ArrayList<>(); void add(JobStatusUpdateRecord record) { - if (!record.getStatusUpdate().isPartOfRun()) { + if (!(record.getStatusUpdate() instanceof JobRunStatusUpdate)) { return; } getBuilderIfCorrelatable(record) @@ -59,7 +60,7 @@ void add(JobStatusUpdateRecord record) { } private Optional getBuilderIfCorrelatable(JobStatusUpdateRecord record) { - if (!record.getStatusUpdate().isPartOfRun()) { + if (!(record.getStatusUpdate() instanceof JobRunStatusUpdate)) { return Optional.empty(); } String jobRunId = record.getJobRunId(); diff --git a/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunEndUpdate.java b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunEndUpdate.java index 31d3dcd419..615785c2d6 100644 --- a/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunEndUpdate.java +++ b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunEndUpdate.java @@ -26,7 +26,7 @@ /** * Interface for a status update that marks the end of a process run. */ -public interface JobRunEndUpdate extends JobStatusUpdate { +public interface JobRunEndUpdate extends JobRunStatusUpdate { /** * Gets the time the job finished in the task. @@ -53,8 +53,4 @@ default boolean isSuccessful() { default List getFailureReasons() { return List.of(); } - - default boolean isPartOfRun() { - return true; - } } diff --git a/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStartedUpdate.java b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStartedUpdate.java index 74510b3ded..6b92427a24 100644 --- a/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStartedUpdate.java +++ b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStartedUpdate.java @@ -20,15 +20,11 @@ /** * Interface for a status update that marks the start of a run of a job. */ -public interface JobRunStartedUpdate extends JobStatusUpdate { +public interface JobRunStartedUpdate extends JobRunStatusUpdate { /** * Get the start time of this run. * * @return the start time */ Instant getStartTime(); - - default boolean isPartOfRun() { - return true; - } } diff --git a/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStatusUpdate.java b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStatusUpdate.java new file mode 100644 index 0000000000..ca8efafad3 --- /dev/null +++ b/java/core/src/main/java/sleeper/core/tracker/job/status/JobRunStatusUpdate.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.core.tracker.job.status; + +/** + * Interface representing a status update for a job during a run of the job. + */ +public interface JobRunStatusUpdate extends JobStatusUpdate { + +} diff --git a/java/core/src/main/java/sleeper/core/tracker/job/status/JobStatusUpdate.java b/java/core/src/main/java/sleeper/core/tracker/job/status/JobStatusUpdate.java index 3624daf161..f273e75c61 100644 --- a/java/core/src/main/java/sleeper/core/tracker/job/status/JobStatusUpdate.java +++ b/java/core/src/main/java/sleeper/core/tracker/job/status/JobStatusUpdate.java @@ -18,7 +18,8 @@ import java.time.Instant; /** - * Interface representing a status update for a job. + * Interface representing a status update for a job. Note that if the update is part of a run of the job, it should + * implement {@link JobRunStatusUpdate}. */ public interface JobStatusUpdate { @@ -28,8 +29,4 @@ public interface JobStatusUpdate { * @return the update time */ Instant getUpdateTime(); - - default boolean isPartOfRun() { - return false; - } } diff --git a/java/core/src/test/java/sleeper/core/tracker/job/run/JobRunsTest.java b/java/core/src/test/java/sleeper/core/tracker/job/run/JobRunsTest.java index db3f386d4b..836a29feff 100644 --- a/java/core/src/test/java/sleeper/core/tracker/job/run/JobRunsTest.java +++ b/java/core/src/test/java/sleeper/core/tracker/job/run/JobRunsTest.java @@ -23,10 +23,10 @@ import sleeper.core.tracker.job.status.AggregatedTaskJobsFinishedStatus; import sleeper.core.tracker.job.status.JobRunFailedStatus; import sleeper.core.tracker.job.status.JobRunStartedUpdate; -import sleeper.core.tracker.job.status.JobStatusUpdate; +import sleeper.core.tracker.job.status.TestJobRunStatus; import sleeper.core.tracker.job.status.TestJobStartedAndFinishedStatus; import sleeper.core.tracker.job.status.TestJobStartedStatus; -import sleeper.core.tracker.job.status.TestJobStatus; +import sleeper.core.tracker.job.status.TestJobStatusNotInRun; import java.time.Duration; import java.time.Instant; @@ -40,8 +40,6 @@ import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.failedStatus; import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.finishedStatus; import static sleeper.core.tracker.job.status.JobStatusUpdateTestHelper.startedStatus; -import static sleeper.core.tracker.job.status.TestJobStatus.notPartOfRunWithUpdateTime; -import static sleeper.core.tracker.job.status.TestJobStatus.partOfRunWithUpdateTime; import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.DEFAULT_TASK_ID; import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forJobRunOnTask; import static sleeper.core.tracker.job.status.TestJobStatusUpdateRecords.forNoRunNoTask; @@ -173,7 +171,7 @@ void shouldIncludeUpdateForRunBeforeStartTimeWhenOccurredOnAnotherProcessWithOut // And the other process has an out of sync clock such that the update occurred before the start update TestJobStartedStatus started = startedStatus( Instant.parse("2024-06-19T13:26:00Z")); - TestJobStatus update = partOfRunWithUpdateTime( + TestJobRunStatus update = new TestJobRunStatus( Instant.parse("2024-06-19T13:25:59Z")); // When @@ -195,7 +193,7 @@ void shouldIncludeUpdateForRunAfterItIsFinished() { Instant.parse("2024-06-19T13:26:00Z")); AggregatedTaskJobsFinishedStatus finished = finishedStatus( started, Duration.ofMinutes(1), 123, 123); - TestJobStatus update = partOfRunWithUpdateTime( + TestJobRunStatus update = new TestJobRunStatus( Instant.parse("2024-06-19T13:28:00Z")); // When @@ -213,7 +211,7 @@ void shouldIncludeUpdateForRunAfterItIsFinished() { @Test void shouldExcludeUpdateNotPartOfARun() { // Given - TestJobStatus notPartOfRun = notPartOfRunWithUpdateTime(Instant.parse("2024-06-19T14:06:00Z")); + TestJobStatusNotInRun notPartOfRun = new TestJobStatusNotInRun(Instant.parse("2024-06-19T14:06:00Z")); TestJobStartedStatus started = startedStatus(Instant.parse("2024-06-19T14:06:01Z")); // When @@ -280,7 +278,7 @@ class FlagUpdatesAsPartOfRun { @Test void shouldNotCreateRunIfStatusUpdateNotFlaggedAsPartOfRun() { // Given - JobStatusUpdate notStartedUpdate = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T09:23:30.001Z")); + TestJobStatusNotInRun notStartedUpdate = new TestJobStatusNotInRun(Instant.parse("2022-09-24T09:23:30.001Z")); // When JobRuns runs = runsFromSingleRunUpdates(notStartedUpdate); @@ -293,7 +291,7 @@ void shouldNotCreateRunIfStatusUpdateNotFlaggedAsPartOfRun() { void shouldCreateRunWithCustomStatusUpdatePartOfRun() { // Given TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z")); - TestJobStatus customStatus = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z")); + TestJobRunStatus customStatus = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z")); // When JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus); @@ -309,7 +307,7 @@ void shouldCreateRunWithCustomStatusUpdatePartOfRun() { void shouldCreateRunWithCustomStatusUpdateNotPartOfRun() { // Given TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z")); - TestJobStatus customStatus = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z")); + TestJobStatusNotInRun customStatus = new TestJobStatusNotInRun(Instant.parse("2022-09-24T10:23:30Z")); // When JobRuns runs = runsFromUpdates(forRunOnTask("a-task", startedStatus), onNoTask(customStatus)); @@ -325,7 +323,7 @@ void shouldCreateRunWithCustomStatusUpdateNotPartOfRun() { void shouldCreateRunWithCustomStatusUpdateNotPartOfRunButStillOnTask() { // Given TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z")); - TestJobStatus customStatus = notPartOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z")); + TestJobStatusNotInRun customStatus = new TestJobStatusNotInRun(Instant.parse("2022-09-24T10:23:30Z")); // When JobRuns runs = runsFromUpdates(forRunOnTask("a-task", startedStatus, customStatus)); @@ -441,14 +439,14 @@ class RetrieveStatusUpdatesByClass { void shouldReturnLastStatusUpdateByClass() { // Given TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z")); - TestJobStatus customStatus = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z")); + TestJobRunStatus customStatus = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z")); // When JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus); // Then assertThat(runs.getLatestRun() - .flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobStatus.class))) + .flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobRunStatus.class))) .get().isEqualTo(customStatus); } @@ -456,15 +454,15 @@ void shouldReturnLastStatusUpdateByClass() { void shouldReturnLastStatusUpdateByClassWithMultipleUpdatesForClass() { // Given TestJobStartedStatus startedStatus = startedStatus(Instant.parse("2022-09-24T09:23:30Z")); - TestJobStatus customStatus1 = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:23:30Z")); - TestJobStatus customStatus2 = partOfRunWithUpdateTime(Instant.parse("2022-09-24T10:25:30Z")); + TestJobRunStatus customStatus1 = new TestJobRunStatus(Instant.parse("2022-09-24T10:23:30Z")); + TestJobRunStatus customStatus2 = new TestJobRunStatus(Instant.parse("2022-09-24T10:25:30Z")); // When JobRuns runs = runsFromSingleRunUpdates(startedStatus, customStatus1, customStatus2); // Then assertThat(runs.getLatestRun() - .flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobStatus.class))) + .flatMap(latestRun -> latestRun.getLastStatusOfType(TestJobRunStatus.class))) .get().isEqualTo(customStatus2); } diff --git a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobRunStatus.java b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobRunStatus.java new file mode 100644 index 0000000000..e4825c100b --- /dev/null +++ b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobRunStatus.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.core.tracker.job.status; + +import java.time.Instant; + +/** + * A test implementation of a job status update on a run. + * + * @param updateTime the time of the update + */ +public record TestJobRunStatus(Instant updateTime) implements JobRunStatusUpdate { + + @Override + public Instant getUpdateTime() { + return updateTime; + } + +} diff --git a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStartedAndFinishedStatus.java b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStartedAndFinishedStatus.java index ac3c92ce73..0af53efe63 100644 --- a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStartedAndFinishedStatus.java +++ b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStartedAndFinishedStatus.java @@ -65,11 +65,6 @@ public RecordsProcessed getRecordsProcessed() { return summary.getRecordsProcessed(); } - @Override - public boolean isPartOfRun() { - return true; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatus.java b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatus.java deleted file mode 100644 index 0210032368..0000000000 --- a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatus.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.core.tracker.job.status; - -import java.time.Instant; -import java.util.Objects; - -/** - * A test implementation of a job status update. - */ -public class TestJobStatus implements JobStatusUpdate { - private final Instant updateTime; - private final boolean isPartOfRun; - - private TestJobStatus(Instant updateTime, boolean isPartOfRun) { - this.updateTime = updateTime; - this.isPartOfRun = isPartOfRun; - } - - /** - * Creates an instance of this class that is marked as part of a job run. - * - * @param updateTime the update time - * @return an instance of this class that is marked as part of a job run - */ - public static TestJobStatus partOfRunWithUpdateTime(Instant updateTime) { - return new TestJobStatus(updateTime, true); - } - - /** - * Creates an instance of this class that is not marked as part of a job run. - * - * @param updateTime the update time - * @return an instance of this class that is not marked as part of a job run - */ - public static TestJobStatus notPartOfRunWithUpdateTime(Instant updateTime) { - return new TestJobStatus(updateTime, false); - } - - @Override - public Instant getUpdateTime() { - return updateTime; - } - - @Override - public boolean isPartOfRun() { - return isPartOfRun; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TestJobStatus that = (TestJobStatus) o; - return Objects.equals(updateTime, that.updateTime); - } - - @Override - public int hashCode() { - return Objects.hash(updateTime); - } - - @Override - public String toString() { - return "TestJobStatus{" + - "updateTime=" + updateTime + - '}'; - } -} diff --git a/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatusNotInRun.java b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatusNotInRun.java new file mode 100644 index 0000000000..a09ba47fc1 --- /dev/null +++ b/java/core/src/test/java/sleeper/core/tracker/job/status/TestJobStatusNotInRun.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sleeper.core.tracker.job.status; + +import java.time.Instant; + +/** + * A test implementation of a job status update that is not part of a run. + * + * @param updateTime the time of the update + */ +public record TestJobStatusNotInRun(Instant updateTime) implements JobStatusUpdate { + + @Override + public Instant getUpdateTime() { + return updateTime; + } +}