Skip to content

Commit

Permalink
Close temporal metrics scope on job completion
Browse files Browse the repository at this point in the history
  • Loading branch information
abhishekmjain committed Feb 13, 2025
1 parent adef734 commit 8a48450
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge {
private final boolean isMetricReportingFailureFatal;
private final boolean isEventReportingFailureFatal;
private final List<TemporalWorker> workers;
private final WorkflowServiceStubs workflowServiceStubs;

public GobblinTemporalTaskRunner(String applicationName,
String applicationId,
Expand Down Expand Up @@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName,
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
this.workers = new ArrayList<>();

String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);

logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}",
this.isTaskDriver ? "taskDriver" : "worker",
applicationName,
Expand Down Expand Up @@ -241,12 +245,9 @@ public void start()
private TemporalWorker initiateWorker() throws Exception {
logger.info("Starting Temporal Worker");

String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING);
WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri);

String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace);
WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(workflowServiceStubs, namespace);

String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
Expand Down Expand Up @@ -292,6 +293,12 @@ public synchronized void stop() {
this.containerMetrics.get().stopMetricsReporting();
}

try {
this.workflowServiceStubs.getOptions().getMetricsScope().close();
} catch (Exception e) {
logger.error("Exception occurred while closing MetricsScope", e);
}

workers.forEach(TemporalWorker::shutdown);

logger.info("All services are stopped.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
@Override
public void close() throws IOException {
try {
executeCancellation();
cancelJob(jobListener);
} catch (Exception e) {
log.error("Exception occurred while cancelling job", e);
} finally {
try {
cleanupWorkingDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.temporal.joblauncher;

import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -188,4 +189,15 @@ protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) {
protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) {
log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob");
}

@Override
public void close() throws IOException {
try {
this.workflowServiceStubs.getOptions().getMetricsScope().close();
} catch (Exception e) {
log.error("Exception occurred while closing MetricsScope ", e);
} finally {
super.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
throw new RuntimeException(e);
}
}));
launcher.launchJob(listener);
try {
launcher.launchJob(listener);
} finally {
launcher.close();
}
}
} catch (Exception je) {
LOGGER.error("Failed to schedule or run job " + jobUri, je);
Expand Down

0 comments on commit 8a48450

Please sign in to comment.