diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java index 65d91bab30..d8d608be9f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java @@ -126,6 +126,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge { private final boolean isMetricReportingFailureFatal; private final boolean isEventReportingFailureFatal; private final List workers; + private final WorkflowServiceStubs workflowServiceStubs; public GobblinTemporalTaskRunner(String applicationName, String applicationId, @@ -163,6 +164,9 @@ public GobblinTemporalTaskRunner(String applicationName, ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL); this.workers = new ArrayList<>(); + String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); + this.workflowServiceStubs = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); + logger.info("GobblinTaskRunner({}): applicationName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", this.isTaskDriver ? "taskDriver" : "worker", applicationName, @@ -241,12 +245,9 @@ public void start() private TemporalWorker initiateWorker() throws Exception { logger.info("Starting Temporal Worker"); - String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); - WorkflowServiceStubs service = TemporalWorkflowClientFactory.createServiceInstance(connectionUri); - String namespace = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_NAMESPACE, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE); - WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(service, namespace); + WorkflowClient client = TemporalWorkflowClientFactory.createClientInstance(workflowServiceStubs, namespace); String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); @@ -292,6 +293,12 @@ public synchronized void stop() { this.containerMetrics.get().stopMetricsReporting(); } + try { + this.workflowServiceStubs.getOptions().getMetricsScope().close(); + } catch (Exception e) { + logger.error("Exception occurred while closing MetricsScope", e); + } + workers.forEach(TemporalWorker::shutdown); logger.info("All services are stopped."); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java index c358b368a3..68a628950e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinJobLauncher.java @@ -137,7 +137,9 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir, @Override public void close() throws IOException { try { - executeCancellation(); + cancelJob(jobListener); + } catch (Exception e) { + log.error("Exception occurred while cancelling job", e); } finally { try { cleanupWorkingDirectory(); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java index 2d17fe20a3..bd454bc2cc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.joblauncher; +import java.io.IOException; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @@ -188,4 +189,15 @@ protected void removeTasksFromCurrentJob(List workUnitIdsToRemove) { protected void addTasksToCurrentJob(List workUnitsToAdd) { log.warn("NOT IMPLEMENTED: Temporal addTasksToCurrentJob"); } + + @Override + public void close() throws IOException { + try { + this.workflowServiceStubs.getOptions().getMetricsScope().close(); + } catch (Exception e) { + log.error("Exception occurred while closing MetricsScope ", e); + } finally { + super.close(); + } + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 34a1cec4dc..c12135abea 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -218,7 +218,11 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { throw new RuntimeException(e); } })); - launcher.launchJob(listener); + try { + launcher.launchJob(listener); + } finally { + launcher.close(); + } } } catch (Exception je) { LOGGER.error("Failed to schedule or run job " + jobUri, je);