From 3269764a1141acbeeb9f5f8f867ccfeb803b22d5 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Wed, 1 Jan 2025 21:39:23 -0800 Subject: [PATCH] [GOBBLIN-2186] Emit GoT GTEs to time `WorkUnit` prep and to record volume of Work Discovery (#4089) --- .../copy/iceberg/IcebergDatasetFinder.java | 4 +- .../gobblin/metrics/event/TimingEvent.java | 1 + .../gobblin/runtime/api/FsSpecConsumer.java | 69 ++++---- .../flow/BaseFlowToJobSpecCompiler.java | 1 - .../modules/template/StaticFlowTemplate.java | 1 - .../activity/impl/GenerateWorkUnitsImpl.java | 29 +++- .../ddm/work/WorkUnitsSizeSummary.java | 23 +++ .../workflow/impl/CommitStepWorkflowImpl.java | 17 +- .../impl/ExecuteGobblinWorkflowImpl.java | 6 +- .../impl/ProcessWorkUnitsWorkflowImpl.java | 2 +- .../helloworld/GreetingWorkflowImpl.java | 2 +- .../workflows/metrics/EventTimer.java | 15 +- .../workflows/metrics/TemporalEventTimer.java | 160 ++++++++++++++---- 13 files changed, 241 insertions(+), 89 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index e6afe37877b..f13a2ed8ebb 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -149,10 +149,10 @@ public Iterator getDatasetsIterator() throws IOException { */ protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog destinationIcebergCatalog, String destDbName, String destTableName, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, srcTableName); - Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, srcTableName)); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Source Iceberg Table not found: {%s}.{%s}", srcDbName, srcTableName)); IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName); // TODO: Rethink strategy to enforce dest iceberg table - Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName)); + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Destination Iceberg Table not found: {%s}.{%s}", destDbName, destTableName)); return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); } diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index 7a0d4cf68b1..154facfe55b 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -110,6 +110,7 @@ public static class FlowEventConstants { public static final String JOB_SKIPPED_TIME = "jobSkippedTime"; public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime"; public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime"; + public static final String WORKUNITS_GENERATED_SUMMARY = "workUnitsGeneratedSummary"; public static final String JOB_END_TIME = "jobEndTime"; public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime"; public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage"; diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java index 514338b5c78..ce9048855ab 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java @@ -87,7 +87,7 @@ public Future>> changedSpecs() { fileStatuses = this.fs.listStatus(this.specDirPath, new AndPathFilter(new HiddenFilter(), new AvroUtils.AvroPathFilter())); } catch (IOException e) { - log.error("Error when listing files at path: {}", this.specDirPath.toString(), e); + log.error("Error when listing files at path: " + this.specDirPath.toString(), e); return null; } log.info("Found {} files at path {}", fileStatuses.length, this.specDirPath.toString()); @@ -102,42 +102,53 @@ public Future>> changedSpecs() { try { dataFileReader = new DataFileReader<>(new FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>()); } catch (IOException e) { - log.error("Error creating DataFileReader for: {}", fileStatus.getPath().toString(), e); + log.error("Error creating DataFileReader for: " + fileStatus.getPath().toString(), e); continue; } - AvroJobSpec avroJobSpec = null; - while (dataFileReader.hasNext()) { - avroJobSpec = dataFileReader.next(); - break; - } + try { // ensure `dataFileReader` is always closed! + AvroJobSpec avroJobSpec = null; + while (dataFileReader.hasNext()) { + avroJobSpec = dataFileReader.next(); + break; + } + + if (avroJobSpec != null) { + JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri()); + Properties props = new Properties(); + props.putAll(avroJobSpec.getProperties()); + jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri()) + .withVersion(avroJobSpec.getVersion()) + .withDescription(avroJobSpec.getDescription()) + .withConfigAsProperties(props) + .withConfig(ConfigUtils.propertiesToConfig(props)); + + try { + if (!avroJobSpec.getTemplateUri().isEmpty()) { + jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri())); + } + } catch (URISyntaxException u) { + log.error("Error building a job spec: ", u); + continue; + } - if (avroJobSpec != null) { - JobSpec.Builder jobSpecBuilder = new JobSpec.Builder(avroJobSpec.getUri()); - Properties props = new Properties(); - props.putAll(avroJobSpec.getProperties()); - jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri()) - .withVersion(avroJobSpec.getVersion()) - .withDescription(avroJobSpec.getDescription()) - .withConfigAsProperties(props) - .withConfig(ConfigUtils.propertiesToConfig(props)); + String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY); + SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName); + JobSpec jobSpec = jobSpecBuilder.build(); + log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString()); + specList.add(new ImmutablePair(verb, jobSpec)); + this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath()); + } + } finally { try { - if (!avroJobSpec.getTemplateUri().isEmpty()) { - jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri())); + if (dataFileReader != null) { + dataFileReader.close(); + dataFileReader = null; } - } catch (URISyntaxException u) { - log.error("Error building a job spec: ", u); - continue; + } catch (IOException e) { + log.warn("Unable to close DataFileReader for: {} - {}", fileStatus.getPath().toString(), e.getMessage()); } - - String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY); - SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName); - - JobSpec jobSpec = jobSpecBuilder.build(); - log.debug("Successfully built jobspec: {}", jobSpec.getUri().toString()); - specList.add(new ImmutablePair(verb, jobSpec)); - this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath()); } } return new CompletedFuture<>(specList, null); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 6fe6a9a0827..ce747db1f78 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -72,7 +72,6 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected // to these data structures. - @Getter protected final Map topologySpecMap; protected final Config config; diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java index 0d0d91e2881..c282e2fe493 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java @@ -68,7 +68,6 @@ public class StaticFlowTemplate implements FlowTemplate { private String description; @Getter private transient FlowCatalogWithTemplates catalog; - @Getter private List jobTemplates; private transient Config rawConfig; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index 0a192a81bde..e0fa2ebb5e7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -43,6 +43,7 @@ import org.apache.gobblin.converter.initializer.ConverterInitializerFactory; import org.apache.gobblin.destination.DestinationDatasetHandlerService; import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; @@ -60,6 +61,8 @@ import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; +import org.apache.gobblin.temporal.workflows.metrics.EventTimer; +import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer; import org.apache.gobblin.writer.initializer.WriterInitializerFactory; @@ -127,6 +130,9 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState); WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles); log.info("Discovered WorkUnits: {}", wuSizeSummary); + // IMPORTANT: send prior to `writeWorkUnits`, so the volume of work discovered (and bin packed) gets durably measured. even if serialization were to + // exceed available memory and this activity execution were to fail, a subsequent re-attempt would know the amount of work, to guide re-config/attempt + createWorkPreparedSizeDistillationTimer(wuSizeSummary, eventSubmitterContext).stop(); JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs); JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete @@ -150,26 +156,28 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer, Set pathsToCleanUp) throws ReflectiveOperationException { + // report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc. + // IMPORTANT: for accurate timing, SEPARATELY emit `.createWorkPreparationTimer`, to record time prior to measuring the WU size required for that one + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext); + EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer(); Source source = JobStateUtils.createSource(jobState); WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState) : new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build(); - // TODO: report (timer) metrics for workunits creation if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs String errMsg = "Failure in getting work units for job " + jobState.getJobId(); log.error(errMsg); - // TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want) + // TODO: decide whether a non-retryable failure is too severe... (some sources may merit retry) throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()"); } + workDiscoveryTimer.stop(); if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure) log.warn("No work units created for job " + jobState.getJobId()); return Lists.newArrayList(); } - // TODO: count total bytes for progress tracking! - boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for cleanup DestinationDatasetHandlerService datasetHandlerService = closer.register( new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create())); @@ -264,6 +272,19 @@ protected static WorkUnitsSizeDigest digestWorkUnitsSize(List workUnit return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest); } + protected static EventTimer createWorkPreparedSizeDistillationTimer( + WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext eventSubmitterContext) { + // Inspired by a pair of log messages produced within `CopySource::getWorkUnits`: + // 1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool: {softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \ + // maxRequirementPerDimension: [entities: 231943.0, bytesCopied: 1.22419622769628E14], ... } + // 2. org.apache.gobblin.data.management.copy.CopySource - Bin packed work units. Initial work units: 27252, packed work units: 13175, \ + // max weight per bin: 500000000, max work units per bin: 100. + // rather than merely logging, durably emit this info, to inform re-config for any potential re-attempt (should WU serialization OOM) + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext); + return timerFactory.createWorkPreparationTimer() + .withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY, wuSizeSummary.distill()); + } + public static int getConfiguredNumSizeSummaryQuantiles(State state) { return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java index 971ed5a04bd..16a20516043 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java @@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary { @NonNull private List topLevelQuantilesMinSizes; @NonNull private List constituentQuantilesMinSizes; + /** Total size, counts, means, and medians: the most telling measurements packaged for ready consumption / observability */ + @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor + public static class Distillation { + @NonNull private long totalSize; + @NonNull private long topLevelWorkUnitsCount; + @NonNull private long constituentWorkUnitsCount; + @NonNull private double topLevelWorkUnitsMeanSize; + @NonNull private double constituentWorkUnitsMeanSize; + @NonNull private double topLevelWorkUnitsMedianSize; + @NonNull private double constituentWorkUnitsMedianSize; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + public Distillation distill() { + return new Distillation(this.totalSize, this.topLevelWorkUnitsCount, this.constituentWorkUnitsCount, + this.getTopLevelWorkUnitsMeanSize(), this.getConstituentWorkUnitsMeanSize(), + this.getTopLevelWorkUnitsMedianSize(), this.getConstituentWorkUnitsMedianSize() + ); + } + @JsonIgnore // (because no-arg method resembles 'java bean property') public double getTopLevelWorkUnitsMeanSize() { return this.totalSize * 1.0 / this.topLevelWorkUnitsCount; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index c2c21d28252..0f289018838 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -17,21 +17,20 @@ package org.apache.gobblin.temporal.ddm.workflow.impl; -import io.temporal.activity.ActivityOptions; -import io.temporal.common.RetryOptions; -import io.temporal.failure.ApplicationFailure; -import io.temporal.workflow.Workflow; - import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflow.Workflow; + import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.runtime.DatasetTaskSummary; -import org.apache.gobblin.runtime.util.GsonUtils; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.DatasetStats; @@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public CommitStats commit(WUProcessingSpec workSpec) { CommitStats commitGobblinStats = activityStub.commit(workSpec); - if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) { - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext()); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext()); timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY) - .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson( - convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))) + .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())) .submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`) } if (commitGobblinStats.getOptFailure().isPresent()) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java index 6de7c51e36b..15184c6ccc9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java @@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow { .build(); private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(1)) + .setStartToCloseTimeout(Duration.ofMinutes(10)) .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS) .build(); private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS); @Override public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) { - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME` EventTimer jobSuccessTimer = timerFactory.createJobTimer(); Optional optGenerateWorkUnitResult = Optional.empty(); @@ -207,7 +207,7 @@ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSum ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY, ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES)); double permittedOveragePercentage = .2; - Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime()); + Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant()); long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins; return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 0b8d58a8988..7b3f171967a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -121,7 +121,7 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp private Optional createOptJobEventTimer(WUProcessingSpec workSpec) { if (workSpec.isToDoJobLevelTiming()) { EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext(); - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); return Optional.of(timerFactory.createJobTimer()); } else { return Optional.empty(); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java index 0cb03a1d66b..c576a495503 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java @@ -57,7 +57,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte /** * Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation. */ - TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext); + TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext); try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) { LOG.info("Executing getGreeting"); timer.withMetadata("name", name); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java index ac0f24bf816..cfd4f74dafc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java @@ -42,17 +42,30 @@ */ public interface EventTimer extends Closeable { /** - * Add additional metadata that will be used for post-processing when the timer is stopped via {@link #stop()} + * Add additional metadata that will be emitted with the timer, once {@link #stop()}ped * @param key * @param metadata */ EventTimer withMetadata(String key, String metadata); + /** + * Add additional metadata, after stringifying as JSON, that will be emitted with the timer, once {@link #stop()}ped + * @param key + * @param metadata (to convert to JSON) + */ + EventTimer withMetadataAsJson(String key, T metadata); + /** * Stops the timer and execute any post-processing (e.g. event submission) */ void stop(); + /** alias to {@link #stop()} */ + default void submit() { + stop(); + } + + /** alias to {@link #stop()} */ default void close() { stop(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java index 93beaadd033..4a6aa8d0bb3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java @@ -19,7 +19,9 @@ import java.time.Duration; import java.time.Instant; +import java.util.function.Supplier; +import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -29,30 +31,29 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.util.GsonUtils; /** - * Boilerplate for tracking elapsed time of events that is compatible with {@link Workflow} - * by using activities to record time + * Encapsulates emission of {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s, e.g. to convey timing duration and/or event metadata to + * gobblin-service (GaaS). For use either within a {@link Workflow} (with emission in a sub-activity, for reliability) or within an + * {@link io.temporal.activity.Activity} (with direct event emission, since an activity may not itself launch further activities). That choice + * is governed by the {@link Factory} used to create the timer: either {@link WithinWorkflowFactory} or {@link WithinActivityFactory}. * - * This class is very similar to {@link TimingEvent} but uses {@link Workflow} compatible APIs. It's possible to refactor - * this class to inherit the {@link TimingEvent} but extra care would be needed to remove the {@link EventSubmitter} field - * since that class is not serializable without losing some information + * Implementation Note: While very similar to {@link TimingEvent}, it cannot inherit directly, since its {@link EventSubmitter} field is not serializable. + * @see EventTimer for details */ -@RequiredArgsConstructor +@RequiredArgsConstructor(access = AccessLevel.PROTECTED) public class TemporalEventTimer implements EventTimer { private final SubmitGTEActivity trackingEventActivity; private final GobblinEventBuilder eventBuilder; private final EventSubmitterContext eventSubmitterContext; + private final Supplier currentInstantSupplier; @Getter private final Instant startTime; - // Alias to stop() - public void submit() { - stop(); - } @Override public void stop() { - stop(getCurrentTime()); + stop(currentInstantSupplier.get()); } @Override @@ -61,6 +62,12 @@ public TemporalEventTimer withMetadata(String key, String metadata) { return this; } + @Override + public TemporalEventTimer withMetadataAsJson(String key, T metadata) { + this.eventBuilder.addMetadata(key, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(metadata)); + return this; + } + private void stop(Instant endTime) { this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, TimingEvent.METADATA_TIMING_EVENT); this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, Long.toString(this.startTime.toEpochMilli())); @@ -71,45 +78,34 @@ private void stop(Instant endTime) { trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext); } + /** - * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link System#currentTimeMillis()} - * WARNING: DO NOT use from an {@link io.temporal.activity.Activity} + * Factory for creating {@link TemporalEventTimer}s, with convenience methods for well-known forms of GaaS-associated + * {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s. + * + * This class is abstract; choose the concrete form befitting the execution context, either {@link WithinWorkflowFactory} or {@link WithinActivityFactory}. */ - public static Instant getCurrentTime() { - return Instant.ofEpochMilli(Workflow.currentTimeMillis()); - } - - public static class Factory { - private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofHours(12)) // maximum timeout for the actual event submission to kafka, waiting out a kafka outage - .build(); - private final SubmitGTEActivity submitGTEActivity; + @RequiredArgsConstructor(access = AccessLevel.PROTECTED) + public abstract static class Factory { private final EventSubmitterContext eventSubmitterContext; - - public Factory(EventSubmitterContext eventSubmitterContext) { - this(eventSubmitterContext, DEFAULT_OPTS); - } - - public Factory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) { - this.submitGTEActivity = Workflow.newActivityStub(SubmitGTEActivity.class, opts); - this.eventSubmitterContext = eventSubmitterContext; - } + private final SubmitGTEActivity submitGTEActivity; + private final Supplier currentInstantSupplier; public TemporalEventTimer create(String eventName, Instant startTime) { GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName, eventSubmitterContext.getNamespace()); - return new TemporalEventTimer(submitGTEActivity, eventBuilder, this.eventSubmitterContext, startTime); + return new TemporalEventTimer(this.submitGTEActivity, eventBuilder, this.eventSubmitterContext, currentInstantSupplier, startTime); } public TemporalEventTimer create(String eventName) { - return create(eventName, getCurrentTime()); + return create(eventName, currentInstantSupplier.get()); } /** * Utility for creating a timer that emits separate events at the start and end of a job. This imitates the behavior in - * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events that are compatible with the - * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to update GaaS flow statuses + * {@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting events that are compatible with the + * {@link org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update GaaS job/flow status and timing information. * - * @return a timer that emits an event at the beginning of the job and a completion event ends at the end of the job + * @return a timer that will emit a job completion (success) event once `.stop`ped; *PLUS* immediately emit a job started event */ public TemporalEventTimer createJobTimer() { TemporalEventTimer startTimer = create(TimingEvent.LauncherTimings.JOB_START); // update GaaS: `ExecutionStatus.RUNNING` @@ -117,5 +113,97 @@ public TemporalEventTimer createJobTimer() { // [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`, `TimingEvent.JOB_END_TIME`: return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED, startTimer.startTime); } + + /** + * Utility for creating a timer that emits an event to time the start and end of "Work Discovery" (i.e. + * {@link org.apache.gobblin.source.Source#getWorkunits}). It SHOULD span only planning - NOT {@link org.apache.gobblin.source.workunit.WorkUnit} + * preparation, such as serialization, etc. This imitates the behavior in {@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting events + * that are compatible with the {@link org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update GaaS job timing information, and + * ultimately participate in the {@link org.apache.gobblin.metrics.GaaSJobObservabilityEvent}. + * + * @return a timer that will emit a "work units creation" event once `.stop`ped` (upon (successful) "Work Discovery") + */ + public TemporalEventTimer createWorkDiscoveryTimer() { + return create(TimingEvent.LauncherTimings.WORK_UNITS_CREATION); // [upon `.stop()`] update GaaS: `TimingEvent.WORKUNIT_PLAN_{START,END}_TIME` + } + + /** + * Utility for creating an event to convey "Work Discovery" metadata, like {@link org.apache.gobblin.source.workunit.WorkUnit} count, size, and + * bin packing. To include such info, the caller MUST invoke {@link #withMetadataAsJson(String, Object)} with + * {@link org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary.Distillation}. + * + * The event emitted would inform GaaS of the volume of work discovered (and bin packed), in case `WorkUnit` serialization were to fail for exceeding + * available memory. The thence-known amount of work would guide config adjustment for a subsequent re-attempt. + * + * IMPORTANT: This event SHOULD be emitted separately from {@link #createWorkDiscoveryTimer()}, to preserve timing accuracy (of that other). + * + * @return an event to convey "work units preparation" (Work Discovery) metadata once `.stop`ped + */ + public TemporalEventTimer createWorkPreparationTimer() { + // [upon `.stop()`] simply (again) set GaaS: `ExecutionStatus.RUNNING` and convey `TimingEvent.WORKUNITS_GENERATED_SUMMARY` (metadata) + // (the true purpose in "sending" is to record observable metadata about WU count, size, and bin packing) + return create(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION); + } + } + + + /** + * Concrete {@link Factory} to use when executing within a {@link Workflow}. It will (synchronously) emit the {@link TemporalEventTimer} in an + * {@link io.temporal.activity.Activity}, both for reliability and to avoid blocking within the limited `Workflow` time allowance. In + * addition, it uses the `Workflow`-safe {@link Workflow#currentTimeMillis()}. + */ + public static class WithinWorkflowFactory extends Factory { + private static final ActivityOptions DEFAULT_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for the actual event submission to kafka, waiting out a kafka outage + .build(); + + public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) { + this(eventSubmitterContext, DEFAULT_OPTS); + } + + public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, ActivityOptions opts) { + super(eventSubmitterContext, Workflow.newActivityStub(SubmitGTEActivity.class, opts), WithinWorkflowFactory::getCurrentInstant); + } + + /** + * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link System#currentTimeMillis()}'s {@link Instant} + * + * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}, as that would throw: + * Caused by: java.lang.Error: Called from non workflow or workflow callback thread + * at io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130) + * at io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404) + * at io.temporal.internal.sync.WorkflowInternal.getWorkflowOutboundInterceptor(WorkflowInternal.java:400) + * at io.temporal.internal.sync.WorkflowInternal.currentTimeMillis(WorkflowInternal.java:205) + * at io.temporal.workflow.Workflow.currentTimeMillis(Workflow.java:524) + * ... + */ + public static Instant getCurrentInstant() { + return Instant.ofEpochMilli(Workflow.currentTimeMillis()); + } + } + + /** + * Concrete {@link Factory} to use when executing within an {@link io.temporal.activity.Activity}. It will (synchronously) emit the + * {@link TemporalEventTimer} directly within the current `Activity`, since an activity may not itself launch further activities. It uses + * the standard {@link System#currentTimeMillis()}, since `Workflow` determinism is not a concern within an `Activity`. + */ + public static class WithinActivityFactory extends Factory { + public WithinActivityFactory(EventSubmitterContext eventSubmitterContext) { + // reference the `SubmitGTEActivity` impl directly, w/o temporal involved (i.e. NOT via `newActivityStub`), to permit use inside an activity; otherwise: + // io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor - Activity failure. ActivityId=..., activityType=..., attempt=4 + // java.lang.Error: Called from non workflow or workflow callback thread + // at io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130) + // at io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404) + // at io.temporal.internal.sync.WorkflowInternal.newActivityStub(WorkflowInternal.java:239) + // at io.temporal.workflow.Workflow.newActivityStub(Workflow.java:92) + // at org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer$Factory.(TemporalEventTimer.java:89) + // ... + super(eventSubmitterContext, new SubmitGTEActivityImpl(), WithinActivityFactory::getCurrentInstant); + } + + /** @return (standard) {@link System#currentTimeMillis()}'s {@link Instant}, abstracted merely for code clarity */ + public static Instant getCurrentInstant() { + return Instant.ofEpochMilli(System.currentTimeMillis()); + } } }