From 8a48450659b6542d84e1f15ca8e1ee805d26443a Mon Sep 17 00:00:00 2001 From: Abhishek Jain Date: Thu, 13 Feb 2025 23:38:36 +0530 Subject: [PATCH] Close temporal metrics scope on job completion --- .../cluster/GobblinTemporalTaskRunner.java | 15 +++++++++++---- .../temporal/joblauncher/GobblinJobLauncher.java | 4 +++- .../joblauncher/GobblinTemporalJobLauncher.java | 12 ++++++++++++ .../joblauncher/GobblinTemporalJobScheduler.java | 6 +++++- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java index 65d91bab305..d8d608be9f9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java @@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge { private final boolean isMetricReportingFailureFatal; private final boolean isEventReportingFailureFatal; private final List workers; + private final WorkflowServiceStubs workflowServiceStubs; public GobblinTemporalTaskRunner(String applicationName, String applicationId, @@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName, ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL); this.workers = new ArrayList<>(); + String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); + this.workflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); + logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", this.isTaskDriver ? "taskDriver" : "worker", applicationName, @@ -241,12 +245,9 @@ public void start() private TemporalWorker initiateWorker() throws Exception { logger.info("Starting Temporal Worker"); - String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); - WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); - String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); - WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace); + WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(workflowServiceStubs, namespace); String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); @@ -292,6 +293,12 @@ public synchronized void stop() { this.containerMetrics.get().stopMetricsReporting(); } + try { + this.workflowServiceStubs.getOptions().getMetricsScope().close(); + } catch (Exception e) { + logger.error("Exception occurred while closing MetricsScope", e); + } + workers.forEach(TemporalWorker::shutdown); logger.info("All services are stopped."); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index c358b368a3a..68a628950e4 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -137,7 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, @Override public void close() throws IOException { try { - executeCancellation(); + cancelJob(jobListener); + } catch (Exception e) { + log.error("Exception occurred while cancelling job", e); } finally { try { cleanupWorkingDirectory(); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 2d17fe20a30..bd454bc2cc2 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.joblauncher; +import java.io.IOException; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -188,4 +189,15 @@ protected void removeTasksFromCurrentJob(List workUnitIdsToRemove) { protected void addTasksToCurrentJob(List workUnitsToAdd) { log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob"); } + + @Override + public void close() throws IOException { + try { + this.workflowServiceStubs.getOptions().getMetricsScope().close(); + } catch (Exception e) { + log.error("Exception occurred while closing MetricsScope ", e); + } finally { + super.close(); + } + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 34a1cec4dc2..c12135abea4 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -218,7 +218,11 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { throw new RuntimeException(e); } })); - launcher.launchJob(listener); + try { + launcher.launchJob(listener); + } finally { + launcher.close(); + } } } catch (Exception je) { LOGGER.error("Failed to schedule or run job " + jobUri, je);