Skip to content

Commit

Permalink
Merge pull request #4294 from gchq/4291-job-run-type-checks
Browse files Browse the repository at this point in the history
Issue 4291 - Remove type checks from job run model
  • Loading branch information
patchwork01 authored Feb 20, 2025
2 parents 7cd2917 + 8c88ff1 commit 6c20ef5
Show file tree
Hide file tree
Showing 68 changed files with 769 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import sleeper.core.statestore.testutils.InMemoryTransactionLogs;
import sleeper.core.statestore.transactionlog.transaction.impl.AddFilesTransaction;
import sleeper.core.tracker.ingest.job.InMemoryIngestJobTracker;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.core.tracker.job.run.RecordsProcessed;

import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import sleeper.clients.util.table.TableWriter;
import sleeper.clients.util.table.TableWriterFactory;
import sleeper.core.tracker.compaction.job.query.CompactionJobCommittedStatus;
import sleeper.core.tracker.compaction.job.query.CompactionJobRun;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatusType;
import sleeper.core.tracker.job.run.AverageRecordRate;
import sleeper.core.tracker.job.run.JobRun;
import sleeper.core.tracker.job.run.JobRunReport;
import sleeper.core.util.DurationStatistics;

import java.io.PrintStream;
Expand Down Expand Up @@ -121,22 +122,21 @@ private void printSingleJobSummary(CompactionJobStatus jobStatus) {
out.printf("State: %s%n", jobStatus.getFurthestStatusType());
out.printf("Creation time: %s%n", jobStatus.getCreateUpdateTime());
out.printf("Partition ID: %s%n", jobStatus.getPartitionId());
jobStatus.getJobRuns().forEach(this::printJobRun);
jobStatus.getRunsLatestFirst().forEach(this::printJobRun);
out.println("--------------------------");
}

private void printJobRun(JobRun run) {
private void printJobRun(CompactionJobRun run) {
runReporter.printProcessJobRunWithUpdatePrinter(run,
printUpdateType(CompactionJobCommittedStatus.class, committedStatus -> printCommitStatus(run, committedStatus)));
CompactionJobStatusType runStatusType = CompactionJobStatusType.statusTypeOfJobRun(run);
if (runStatusType == CompactionJobStatusType.IN_PROGRESS) {
if (run.getStatusType() == CompactionJobStatusType.IN_PROGRESS) {
out.println("Not finished");
} else if (runStatusType == CompactionJobStatusType.UNCOMMITTED) {
} else if (run.getStatusType() == CompactionJobStatusType.UNCOMMITTED) {
out.println("Not committed");
}
}

private void printCommitStatus(JobRun run, CompactionJobCommittedStatus committedStatus) {
private void printCommitStatus(JobRunReport run, CompactionJobCommittedStatus committedStatus) {
out.printf("State store commit time: %s%n", committedStatus.getCommitTime());
if (run.isFinished()) {
Duration delay = Duration.between(run.getFinishTime(), committedStatus.getCommitTime());
Expand Down Expand Up @@ -180,20 +180,20 @@ private void printRateAndDelayStatistics(List<CompactionJobStatus> jobs) {

private static AverageRecordRate recordRate(List<CompactionJobStatus> jobs) {
return AverageRecordRate.of(jobs.stream()
.flatMap(job -> job.getJobRuns().stream()));
.flatMap(job -> job.getRunsLatestFirst().stream()));
}

private void writeJob(CompactionJobStatus job, TableWriter.Builder table) {
if (job.getJobRuns().isEmpty()) {
if (job.getRunsLatestFirst().isEmpty()) {
table.row(row -> {
row.value(stateField, job.getFurthestStatusType());
writeJobFields(job, row);
});
} else {
job.getJobRuns().forEach(run -> table.row(row -> {
job.getRunsLatestFirst().forEach(run -> table.row(row -> {
writeJobFields(job, row);
row.value(stateField, CompactionJobStatusType.statusTypeOfJobRun(run));
row.value(commitTimeField, run.getLastStatusOfType(CompactionJobCommittedStatus.class)
row.value(stateField, run.getStatusType());
row.value(commitTimeField, run.getCommittedStatus()
.map(CompactionJobCommittedStatus::getCommitTime)
.orElse(null));
runReporter.writeRunFields(run, row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ private static int getTotalJobsRun(List<CompactionTaskStatus> tasks) {

private static AverageRecordRate recordRate(List<CompactionTaskStatus> tasks) {
return AverageRecordRate.of(tasks.stream()
.map(CompactionTaskStatus::asAggregatedJobRun));
.map(CompactionTaskStatus::asJobRunReport));
}

private void writeRow(CompactionTaskStatus task, TableRow.Builder builder) {
builder.value(STATE, task.isFinished() ? "FINISHED" : "RUNNING")
.value(JOB_RUNS, task.getJobRunsOrNull())
.value(JOB_DURATION, StandardJobRunReporter.getOrNull(task.getFinishedStatus(),
status -> StandardJobRunReporter.formatDurationString(status.getTimeSpentOnJobs())));
jobRunReporter.writeRunFields(task.asAggregatedJobRun(), builder);
jobRunReporter.writeRunFields(task.asJobRunReport(), builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package sleeper.clients.status.report.ingest.job;

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import sleeper.clients.status.report.job.JsonJobRunTime;
import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.clients.util.ClientsGsonConfig;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobUpdateType;
import sleeper.core.tracker.ingest.job.query.IngestJobStartedStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobUpdateType;
import sleeper.core.tracker.job.run.JobRunSummary;
import sleeper.core.tracker.job.run.JobRunTime;
import sleeper.core.tracker.job.run.JobRuns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,20 @@
import sleeper.clients.util.table.TableRow;
import sleeper.clients.util.table.TableWriter;
import sleeper.clients.util.table.TableWriterFactory;
import sleeper.core.tracker.ingest.job.IngestJobFilesWrittenAndAdded;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatusType;
import sleeper.core.tracker.ingest.job.query.IngestJobAddedFilesStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobRejectedStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobRun;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobValidatedStatus;
import sleeper.core.tracker.job.run.AverageRecordRate;
import sleeper.core.tracker.job.run.JobRun;

import java.io.PrintStream;
import java.util.List;
import java.util.Map;

import static sleeper.clients.status.report.job.StandardJobRunReporter.printUpdateType;
import static sleeper.clients.status.report.job.StandardJobRunReporter.updatePrinters;
import static sleeper.core.tracker.ingest.job.IngestJobStatusType.IN_PROGRESS;
import static sleeper.core.tracker.ingest.job.query.IngestJobStatusType.IN_PROGRESS;

public class StandardIngestJobStatusReporter implements IngestJobStatusReporter {

Expand Down Expand Up @@ -123,17 +121,17 @@ private void printDetailedSummary(List<IngestJobStatus> statusList) {
private void printDetailedSummary(IngestJobStatus status) {
out.printf("Details for job %s:%n", status.getJobId());
out.printf("State: %s%n", status.getFurthestRunStatusType());
out.printf("Number of input files: %d%n", status.getInputFilesCount());
for (JobRun run : status.getJobRuns()) {
out.printf("Number of input files: %d%n", status.getInputFileCount());
for (IngestJobRun run : status.getRunsLatestFirst()) {
printProcessJobRun(run);
}
}

private void printProcessJobRun(JobRun run) {
private void printProcessJobRun(IngestJobRun run) {
runReporter.printProcessJobRunWithUpdatePrinter(run, updatePrinters(
printUpdateType(IngestJobValidatedStatus.class, this::printValidation),
printUpdateType(IngestJobAddedFilesStatus.class, this::printAddedFiles)));
if (IngestJobStatusType.statusTypeOfJobRun(run) == IN_PROGRESS) {
if (run.getStatusType() == IN_PROGRESS) {
out.println("Not finished");
}
}
Expand Down Expand Up @@ -205,20 +203,20 @@ private void printRejectedSummary(List<IngestJobStatus> statusList, IngestQueueM

private static AverageRecordRate recordRate(List<IngestJobStatus> jobs) {
return AverageRecordRate.of(jobs.stream()
.flatMap(job -> job.getJobRuns().stream()));
.flatMap(job -> job.getRunsLatestFirst().stream()));
}

private void writeJob(IngestJobStatus job, TableWriter.Builder table) {
job.getJobRuns().forEach(run -> table.row(row -> {
job.getRunsLatestFirst().forEach(run -> table.row(row -> {
writeJobFields(job, row);
row.value(stateField, IngestJobStatusType.statusTypeOfJobRun(run));
row.value(addedFilesCount, IngestJobFilesWrittenAndAdded.from(run).getFilesAddedToStateStore());
row.value(stateField, run.getStatusType());
row.value(addedFilesCount, run.getFilesWrittenAndAdded().getFilesAddedToStateStore());
runReporter.writeRunFields(run, row);
}));
}

private void writeJobFields(IngestJobStatus job, TableRow.Builder builder) {
builder.value(jobIdField, job.getJobId())
.value(inputFilesCount, job.getInputFilesCount());
.value(inputFilesCount, job.getInputFileCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ private static int getTotalJobsRun(List<IngestTaskStatus> tasks) {

private static AverageRecordRate recordRate(List<IngestTaskStatus> tasks) {
return AverageRecordRate.of(tasks.stream()
.map(IngestTaskStatus::asAggregatedJobRun));
.map(IngestTaskStatus::asJobRunReport));
}

private void writeRow(IngestTaskStatus task, TableRow.Builder builder) {
builder.value(STATE, task.isFinished() ? "FINISHED" : "RUNNING")
.value(JOB_RUNS, task.getJobRunsOrNull())
.value(JOB_DURATION, StandardJobRunReporter.getOrNull(task.getFinishedStatus(),
status -> StandardJobRunReporter.formatDurationString(status.getTimeSpentOnJobs())));
jobRunReporter.writeRunFields(task.asAggregatedJobRun(), builder);
jobRunReporter.writeRunFields(task.asJobRunReport(), builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sleeper.clients.util.table.TableFieldDefinition;
import sleeper.clients.util.table.TableRow;
import sleeper.clients.util.table.TableWriterFactory;
import sleeper.core.tracker.job.run.JobRun;
import sleeper.core.tracker.job.run.JobRunReport;
import sleeper.core.tracker.job.run.JobRunSummary;
import sleeper.core.tracker.job.status.JobRunEndUpdate;
import sleeper.core.tracker.job.status.JobRunStartedUpdate;
Expand Down Expand Up @@ -65,7 +65,7 @@ public StandardJobRunReporter(PrintStream out) {
this.out = out;
}

public void writeRunFields(JobRun run, TableRow.Builder builder) {
public void writeRunFields(JobRunReport run, TableRow.Builder builder) {
builder.value(TASK_ID, run.getTaskId())
.value(START_TIME, run.getStartTime())
.value(FINISH_TIME, run.getFinishTime());
Expand All @@ -79,7 +79,7 @@ public void writeRunFields(JobRun run, TableRow.Builder builder) {
}
}

public void printProcessJobRunWithUpdatePrinter(JobRun run, UpdatePrinter updatePrinter) {
public void printProcessJobRunWithUpdatePrinter(JobRunReport run, UpdatePrinter updatePrinter) {
printProcessJobRun(run, updatePrinters(updatePrinter, defaultUpdatePrinter()));
}

Expand All @@ -89,7 +89,7 @@ private UpdatePrinter defaultUpdatePrinter() {
printUpdateTypeInRun(JobRunEndUpdate.class, this::printProcessFinished));
}

private void printProcessJobRun(JobRun run, UpdatePrinter updatePrinter) {
private void printProcessJobRun(JobRunReport run, UpdatePrinter updatePrinter) {
out.println();
if (run.getTaskId() != null) {
out.printf("Run on task %s%n", run.getTaskId());
Expand All @@ -101,7 +101,7 @@ private void printProcessJobRun(JobRun run, UpdatePrinter updatePrinter) {
}
}

public static <T extends JobStatusUpdate> UpdatePrinter printUpdateTypeInRun(Class<T> type, BiConsumer<JobRun, T> printer) {
public static <T extends JobStatusUpdate> UpdatePrinter printUpdateTypeInRun(Class<T> type, BiConsumer<JobRunReport, T> printer) {
return (run, update) -> {
if (type.isInstance(update)) {
printer.accept(run, type.cast(update));
Expand All @@ -128,15 +128,15 @@ public static UpdatePrinter updatePrinters(UpdatePrinter... printers) {
}

public interface UpdatePrinter {
boolean print(JobRun run, JobStatusUpdate update);
boolean print(JobRunReport run, JobStatusUpdate update);
}

public void printProcessStarted(JobRunStartedUpdate update) {
out.printf("Start time: %s%n", update.getStartTime());
out.printf("Start update time: %s%n", update.getUpdateTime());
}

public void printProcessFinished(JobRun run, JobRunEndUpdate update) {
public void printProcessFinished(JobRunReport run, JobRunEndUpdate update) {
JobRunSummary summary = run.getFinishedSummary();
out.printf("Finish time: %s%n", summary.getFinishTime());
out.printf("Finish update time: %s%n", update.getUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.Arrays;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.time.Clock;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import sleeper.core.table.TableStatus;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.query.CompactionJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import sleeper.core.properties.instance.CdkDefinedInstanceProperty;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.IngestJobTracker;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.core.tracker.ingest.task.IngestTaskStatus;
import sleeper.core.tracker.ingest.task.IngestTaskTracker;
import sleeper.task.common.QueueMessageCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package sleeper.clients.status.report.ingest.job;

import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobAcceptedStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobRejectedStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.core.tracker.job.run.JobRunTime;
import sleeper.core.tracker.job.run.RecordsProcessed;
import sleeper.ingest.core.job.IngestJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.clients.testutil.ToStringConsoleOutput;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.junit.jupiter.api.Test;

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.junit.jupiter.api.Test;

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.junit.jupiter.api.Test;

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;
import sleeper.ingest.core.job.IngestJob;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.junit.jupiter.api.Test;

import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStatus;

import java.io.IOException;
import java.util.List;
Expand Down
Loading

0 comments on commit 6c20ef5

Please sign in to comment.