Skip to content

Commit

Permalink
[GOBBLIN-2190] Implement ActivityType & add HeartBeat for Temporal Ac…
Browse files Browse the repository at this point in the history
…tivities (#4093)

Made StartToClose timeout configurable and added heartbeat
  • Loading branch information
Blazer-007 authored Feb 25, 2025
1 parent eb60a2c commit f2bcdc7
Show file tree
Hide file tree
Showing 22 changed files with 455 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,34 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds";
int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10;
String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions";

/**
* Activities timeout configs
*/
String TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = PREFIX + "activity.heartbeat.timeout.minutes";
int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES = 5;
String TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = PREFIX + "activity.heartbeat.interval.minutes";
int DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES = 1;
String ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = "activity.starttoclose.timeout.minutes";
int DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES = 360;
String TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "generate.workunits." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "recommend.scaling." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "delete.work.dirs." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "process.workunit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES =
PREFIX + "commit." + ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS = PREFIX + "activity.retry.options";
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "initial.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS = 3;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.interval.seconds";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 100;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "backoff.coefficient";
double DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import java.time.Duration;
import java.util.Properties;

import lombok.Getter;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.util.PropertiesUtils;


/**
* Enum representing different types of activities in the Temporal workflow.
* Each activity type corresponds to a specific operation that can be performed.
*/
public enum ActivityType {
/** Activity type for generating work units. */
GENERATE_WORKUNITS(GobblinTemporalConfigurationKeys.TEMPORAL_GENERATE_WORKUNITS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Activity type for recommending scaling operations. */
RECOMMEND_SCALING(GobblinTemporalConfigurationKeys.TEMPORAL_RECOMMEND_SCALING_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Activity type for deleting work directories. */
DELETE_WORK_DIRS(GobblinTemporalConfigurationKeys.TEMPORAL_DELETE_WORK_DIRS_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Activity type for processing a work unit. */
PROCESS_WORKUNIT(GobblinTemporalConfigurationKeys.TEMPORAL_PROCESS_WORKUNIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Activity type for committing step. */
COMMIT(GobblinTemporalConfigurationKeys.TEMPORAL_COMMIT_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES),

/** Default placeholder activity type. */
DEFAULT_ACTIVITY(GobblinTemporalConfigurationKeys.ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES);

@Getter private final String startToCloseTimeoutConfigKey;

ActivityType(String startToCloseTimeoutConfigKey) {
this.startToCloseTimeoutConfigKey = startToCloseTimeoutConfigKey;
}

public ActivityOptions buildActivityOptions(Properties props) {
return ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setHeartbeatTimeout(getHeartbeatTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.build();
}

private Duration getStartToCloseTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props, this.startToCloseTimeoutConfigKey,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_STARTTOCLOSE_TIMEOUT_MINUTES));
}

private Duration getHeartbeatTimeout(Properties props) {
return Duration.ofMinutes(PropertiesUtils.getPropAsInt(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_TIMEOUT_MINUTES));
}

private RetryOptions buildRetryOptions(Properties props) {
int maximumIntervalSeconds = PropertiesUtils.getPropAsInt(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS);

int initialIntervalSeconds = Math.min(PropertiesUtils.getPropAsInt(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_INITIAL_INTERVAL_SECONDS),
maximumIntervalSeconds);

return RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(initialIntervalSeconds))
.setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds))
.setBackoffCoefficient(PropertiesUtils.getPropAsDouble(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT))
.setMaximumAttempts(PropertiesUtils.getPropAsInt(props,
GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand All @@ -41,6 +44,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.Closer;

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;
import io.temporal.failure.ApplicationFailure;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
Expand Down Expand Up @@ -85,12 +91,21 @@ public class CommitActivityImpl implements CommitActivity {

@Override
public CommitStats commit(WUProcessingSpec workSpec) {
ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext();
ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
AutomaticTroubleshooter troubleshooter = null;
try (FileSystem fs = Help.loadFileSystem(workSpec)) {
JobState jobState = Help.loadJobState(workSpec, fs);

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running Commit Activity"),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
Expand Down Expand Up @@ -147,6 +162,7 @@ public CommitStats commit(WUProcessingSpec workSpec) {
String errCorrelator = String.format("Commit [%s]", calcCommitId(workSpec));
EventSubmitter eventSubmitter = workSpec.getEventSubmitterContext().create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, errCorrelator);
ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.fs.FileSystem;
Expand All @@ -38,6 +41,8 @@
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;
import io.temporal.failure.ApplicationFailure;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
Expand Down Expand Up @@ -66,6 +71,7 @@
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.writer.initializer.WriterInitializer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;

Expand Down Expand Up @@ -122,11 +128,19 @@ private static class WorkUnitsWithInsights {

@Override
public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext();
ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("GenerateWorkUnitsActivityHeartBeatExecutor")));
// TODO: decide whether to acquire a job lock (as MR did)!
// TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));

int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running GenerateWorkUnits"),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);

Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'", jobState.getJobId(), workDirRoot);

Expand Down Expand Up @@ -177,6 +191,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi
} finally {
EventSubmitter eventSubmitter = eventSubmitterContext.create();
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, jobState.getJobId());
ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.common.collect.Lists;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityExecutionContext;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -56,6 +61,7 @@
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JobLauncherUtils;


Expand All @@ -68,20 +74,28 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {

@Override
public int processWorkUnit(WorkUnitClaimCheck wu) {
ActivityExecutionContext activityExecutionContext = Activity.getExecutionContext();
ScheduledExecutorService heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("CommitActivityHeartBeatExecutor")));
AutomaticTroubleshooter troubleshooter = null;
EventSubmitter eventSubmitter = wu.getEventSubmitterContext().create();
String correlator = String.format("(M)WU [%s]", wu.getCorrelator());
try (FileSystem fs = Help.loadFileSystemForce(wu)) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
log.info("{} - loaded; found {} workUnits", correlator, workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
int heartBeatInterval = JobStateUtils.getHeartBeatInterval(jobState);
heartBeatExecutor.scheduleAtFixedRate(() -> activityExecutionContext.heartbeat("Running ProcessWorkUnit Activity"),
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
return execute(workUnits, wu, jobState, fs, troubleshooter.getIssueRepository());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
Help.finalizeTroubleshooting(troubleshooter, eventSubmitter, log, correlator);
ExecutorsUtils.shutdownExecutorService(heartBeatExecutor, com.google.common.base.Optional.of(log));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void submitJob(List<WorkUnit> workunits) {
Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));

ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
workflow.process(wuSpec, jobProps);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.JobLauncherUtils;
Expand Down Expand Up @@ -238,4 +239,9 @@ public static SharedResourcesBroker<GobblinScopeTypes> getSharedResourcesBroker(
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
}

public static int getHeartBeatInterval(JobState jobState) {
return jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_ACTIVITY_HEARTBEAT_INTERVAL_MINUTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.gobblin.temporal.ddm.work;

import java.net.URI;
import java.util.Optional;

import org.apache.hadoop.fs.Path;

Expand All @@ -36,8 +35,6 @@
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful;
import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr;
import org.apache.gobblin.temporal.util.nesting.work.Workload;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;


Expand Down Expand Up @@ -75,7 +72,7 @@ public Path getJobStatePath() {
return new Path(new Path(workUnitsDir).getParent(), AbstractJobLauncher.JOB_STATE_FILE_NAME);
}

/** Configuration for {@link org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr, Workload, int, int, int, Optional)}*/
/** Configuration for {@link org.apache.gobblin.temporal.util.nesting.work.NestingExecWorkloadInput} */
@Data
@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.temporal.ddm.workflow;

import java.util.Properties;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

Expand All @@ -35,5 +37,5 @@ public interface CommitStepWorkflow {
* @return number of workunits committed
*/
@WorkflowMethod
CommitStats commit(WUProcessingSpec workSpec);
CommitStats commit(WUProcessingSpec workSpec, Properties props);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.temporal.ddm.workflow;

import java.util.Properties;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

Expand All @@ -30,5 +32,5 @@
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed successfully */
@WorkflowMethod
CommitStats process(WUProcessingSpec wuSpec);
CommitStats process(WUProcessingSpec workSpec, Properties props);
}
Loading

0 comments on commit f2bcdc7

Please sign in to comment.