Skip to content

Commit

Permalink
Emit events to time GoT WorkUnit prep and to record volume of work …
Browse files Browse the repository at this point in the history
…discovered, after reworking `TemporalEventTimer.Factory` for use within an `Activity`
  • Loading branch information
phet committed Jan 1, 2025
1 parent 07bdfff commit 8ce206d
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static class FlowEventConstants {
public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
public static final String WORKUNIT_PLAN_START_TIME = "workunitPlanStartTime";
public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
public static final String WORKUNITS_GENERATED_SUMMARY = "workUnitsGeneratedSummary";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME = "jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE = "jobCompletionPercentage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
Expand All @@ -60,6 +61,8 @@
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


Expand Down Expand Up @@ -127,6 +130,9 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);
// IMPORTANT: send prior to `writeWorkUnits`, so the volume of work discovered (and bin packed) gets durably measured. even if serialization were to
// exceed available memory and this activity execution were to fail, a subsequent re-attempt would know the amount of work, to guide re-config/attempt
createWorkPreparedSizeDistillationTimer(wuSizeSummary, eventSubmitterContext).stop();

JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete
Expand All @@ -150,26 +156,28 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
protected List<WorkUnit> generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
// report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc.
// IMPORTANT: for accurate timing, SEPARATELY emit `.createWorkPreparationTimer`, to record time prior to measuring the WU size required for that one
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer();
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
: new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();

// TODO: report (timer) metrics for workunits creation
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs
String errMsg = "Failure in getting work units for job " + jobState.getJobId();
log.error(errMsg);
// TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want)
// TODO: decide whether a non-retryable failure is too severe... (some sources may merit retry)
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()");
}
workDiscoveryTimer.stop();

if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
return Lists.newArrayList();
}

// TODO: count total bytes for progress tracking!

boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create()));
Expand Down Expand Up @@ -264,6 +272,19 @@ protected static WorkUnitsSizeDigest digestWorkUnitsSize(List<WorkUnit> workUnit
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, constituentWorkUnitsDigest);
}

protected static EventTimer createWorkPreparedSizeDistillationTimer(
WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext eventSubmitterContext) {
// Inspired by a pair of log messages produced within `CopySource::getWorkUnits`:
// 1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool: {softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \
// maxRequirementPerDimension: [entities: 231943.0, bytesCopied: 1.22419622769628E14], ... }
// 2. org.apache.gobblin.data.management.copy.CopySource - Bin packed work units. Initial work units: 27252, packed work units: 13175, \
// max weight per bin: 500000000, max work units per bin: 100.
// rather than merely logging, durably emit this info, to inform re-config for any potential re-attempt (should WU serialization OOM)
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
return timerFactory.createWorkPreparationTimer()
.withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY, wuSizeSummary.distill());
}

public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary {
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;

/** Total size, counts, means, and medians: the most telling measurements packaged for ready consumption / observability */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public static class Distillation {
@NonNull private long totalSize;
@NonNull private long topLevelWorkUnitsCount;
@NonNull private long constituentWorkUnitsCount;
@NonNull private double topLevelWorkUnitsMeanSize;
@NonNull private double constituentWorkUnitsMeanSize;
@NonNull private double topLevelWorkUnitsMedianSize;
@NonNull private double constituentWorkUnitsMedianSize;
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public Distillation distill() {
return new Distillation(this.totalSize, this.topLevelWorkUnitsCount, this.constituentWorkUnitsCount,
this.getTopLevelWorkUnitsMeanSize(), this.getConstituentWorkUnitsMeanSize(),
this.getTopLevelWorkUnitsMedianSize(), this.getConstituentWorkUnitsMedianSize()
);
}

@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMeanSize() {
return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@

package org.apache.gobblin.temporal.ddm.workflow.impl;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
Expand Down Expand Up @@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);

if (!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit (ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if (commitGobblinStats.getOptFailure().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements ExecuteGobblinWorkflow {
.build();

private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofHours(1))
.setStartToCloseTimeout(Duration.ofMinutes(10))
.setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
.build();
private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = Workflow.newActivityStub(DeleteWorkDirsActivity.class, DELETE_WORK_DIRS_ACTIVITY_OPTS);

@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = Optional.empty();
Expand Down Expand Up @@ -207,7 +207,7 @@ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSum
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.getCurrentTime());
Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins, permittedOveragePercentage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private CommitStats performCommitIfAnyWorkUnitsProcessed(WUProcessingSpec workSp
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec workSpec) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext = workSpec.getEventSubmitterContext();
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getGreeting(String name, EventSubmitterContext eventSubmitterConte
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child activity for instrumentation.
*/
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(eventSubmitterContext);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
try (TemporalEventTimer timer = timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,30 @@
*/
public interface EventTimer extends Closeable {
/**
* Add additional metadata that will be used for post-processing when the timer is stopped via {@link #stop()}
* Add additional metadata that will be emitted with the timer, once {@link #stop()}ped
* @param key
* @param metadata
*/
EventTimer withMetadata(String key, String metadata);

/**
* Add additional metadata, after stringifying as JSON, that will be emitted with the timer, once {@link #stop()}ped
* @param key
* @param metadata (to convert to JSON)
*/
<T> EventTimer withMetadataAsJson(String key, T metadata);

/**
* Stops the timer and execute any post-processing (e.g. event submission)
*/
void stop();

/** alias to {@link #stop()} */
default void submit() {
stop();
}

/** alias to {@link #stop()} */
default void close() {
stop();
}
Expand Down
Loading

0 comments on commit 8ce206d

Please sign in to comment.