diff --git a/gobblin-temporal/build.gradle b/gobblin-temporal/build.gradle index fa34245e914..19687ad3af1 100644 --- a/gobblin-temporal/build.gradle +++ b/gobblin-temporal/build.gradle @@ -64,6 +64,8 @@ dependencies { } compile externalDependency.tdigest compile externalDependency."temporal-sdk" + compile externalDependency.micrometerCore + compile externalDependency.micrometerRegistry testCompile project(path: ':gobblin-cluster', configuration: 'tests') testCompile project(":gobblin-example") diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 0201cda0fb8..82ea6460902 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -74,4 +74,14 @@ public interface GobblinTemporalConfigurationKeys { String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + "polling.interval.seconds"; int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + + /** + * Temporal metrics config properties + */ + String TEMPORAL_METRICS_PREFIX = PREFIX + "metrics."; + String TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT = TEMPORAL_METRICS_PREFIX + "otlp"; + String TEMPORAL_METRICS_OTLP_HEADERS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers"; + String TEMPORAL_METRICS_REPORT_INTERVAL_SECS = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".report.interval.seconds"; + int DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS = 10; + String TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY = TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions"; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java index 030d4aae182..c8a1052ae9e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java @@ -30,6 +30,8 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; @@ -41,7 +43,10 @@ import javax.net.ssl.TrustManagerFactory; import org.apache.gobblin.cluster.GobblinClusterUtils; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator; +import org.apache.gobblin.temporal.workflows.metrics.TemporalMetricsHelper; +import org.apache.gobblin.util.ConfigUtils; public class TemporalWorkflowClientFactory { @@ -100,10 +105,19 @@ public static WorkflowServiceStubs createServiceInstance(String connectionUri) t .ciphers(SSL_CONFIG_DEFAULT_CIPHER_SUITES) .build(); + // Initialize metrics + int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS); + Scope metricsScope = new RootScopeBuilder() + .reporter(TemporalMetricsHelper.getStatsReporter(config)) + .tags(TemporalMetricsHelper.getDimensions(config)) + .reportEvery(com.uber.m3.util.Duration.ofSeconds(reportInterval)); + WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder() .setTarget(connectionUri) .setEnableHttps(true) .setSslContext(sslContext) + .setMetricsScope(metricsScope) .build(); return WorkflowServiceStubs.newServiceStubs(options); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java new file mode 100644 index 00000000000..443c5613da9 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelper.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.workflows.metrics; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.jetbrains.annotations.NotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; +import com.uber.m3.tally.StatsReporter; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.registry.otlp.OtlpConfig; +import io.micrometer.registry.otlp.OtlpMeterRegistry; +import io.temporal.common.reporter.MicrometerClientStatsReporter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.util.ConfigUtils; + + +@Slf4j +public class TemporalMetricsHelper { + + /** + * Retrieves a map of dimension names and their corresponding values from the provided config. + * The dimensions are defined as a comma-separated string in the config, and the method + * fetches the corresponding values for each dimension. + * A missing dimension in config will have empty string as value. + * + * @param config Config object + * @return a map where the key is the dimension name and the value is the corresponding value from the config + */ + public static Map getDimensions(Config config) { + String dimensionsString = ConfigUtils.getString(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_DIMENSIONS_KEY, ""); + + // Split the string by "," and create a map by fetching values from config + return Arrays.stream(dimensionsString.split(",")) + .map(String::trim) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toMap(key -> key, key -> ConfigUtils.getString(config, key, ""), (l, r)-> r)); + } + + /** + * Creates and returns a {@link StatsReporter} instance configured with an {@link OtlpMeterRegistry}. + * This reporter can be used to report metrics via the OpenTelemetry Protocol (OTLP) to a metrics backend. + * + * @param config Config object + * @return a {@link StatsReporter} instance, configured with an OTLP meter registry and ready to report metrics. + */ + public static StatsReporter getStatsReporter(Config config) { + OtlpConfig otlpConfig = getOtlpConfig(config); + MeterRegistry meterRegistry = new OtlpMeterRegistry(otlpConfig, Clock.SYSTEM); + return new MicrometerClientStatsReporter(meterRegistry); + } + + @VisibleForTesting + static OtlpConfig getOtlpConfig(Config config) { + return new OtlpConfig() { + @Override + public String get(@NotNull String key) { + return ConfigUtils.getString(config, key, null); + } + + @NotNull + @Override + public String prefix() { + return GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT; + } + + @NotNull + @Override + public Map headers() { + String headers = get(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_HEADERS_KEY); + return parseHeaders(headers); + } + + @NotNull + @Override + public Duration step() { + int reportInterval = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_REPORT_INTERVAL_SECS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_METRICS_REPORT_INTERVAL_SECS); + return Duration.ofSeconds(reportInterval); + } + }; + } + + private static Map parseHeaders(String headersString) { + try { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(headersString, HashMap.class); + } catch (Exception e) { + String errMsg = "Failed to parse headers: " + headersString; + log.error(errMsg, e); + throw new RuntimeException(errMsg); + } + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java new file mode 100644 index 00000000000..1435d0f4df1 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/workflows/metrics/TemporalMetricsHelperTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.temporal.workflows.metrics; + + +import io.micrometer.registry.otlp.OtlpConfig; + +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Map; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + + +/** Test {@link TemporalMetricsHelper} */ +public class TemporalMetricsHelperTest { + + private Config config; + + @BeforeClass + public void setup() { + config = ConfigFactory.empty() + .withValue("prefix", ConfigValueFactory.fromAnyRef("gobblin.temporal.metrics.otlp")) + .withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".headers", + ConfigValueFactory.fromAnyRef("{\"abc\":\"123\", \"pqr\":\"456\"}")) + .withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".resourceAttributes", + ConfigValueFactory.fromAnyRef("service.name=gobblin-service")) + .withValue("dim1", ConfigValueFactory.fromAnyRef("val1")) + .withValue("dim2", ConfigValueFactory.fromAnyRef("val2")) + .withValue(GobblinTemporalConfigurationKeys.TEMPORAL_METRICS_OTLP_PREFIX_WITHOUT_DOT + ".dimensions", + ConfigValueFactory.fromAnyRef("dim1,dim2,missingDimension")); + } + + @Test + public void testGetDimensions() { + Map dimensions = TemporalMetricsHelper.getDimensions(config); + + Assert.assertNotNull(dimensions); + Assert.assertEquals(3, dimensions.size()); + Assert.assertEquals("val1", dimensions.get("dim1")); + Assert.assertEquals("val2", dimensions.get("dim2")); + Assert.assertEquals("", dimensions.get("missingDimension")); + } + + @Test + public void testGetDimensionsEmptyConfig() { + Map dimensions = TemporalMetricsHelper.getDimensions(ConfigFactory.empty()); + + Assert.assertNotNull(dimensions); + Assert.assertEquals(0, dimensions.size()); + } + + @Test + public void testGetOtlpConfig() { + OtlpConfig otlpConfig = TemporalMetricsHelper.getOtlpConfig(config); + + Map headers = otlpConfig.headers(); + Assert.assertNotNull(headers); + Assert.assertEquals(2, headers.size()); + Assert.assertEquals("123", headers.get("abc")); + Assert.assertEquals("456", headers.get("pqr")); + + Assert.assertEquals("gobblin-service", otlpConfig.resourceAttributes().get("service.name")); + } +} diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle index c3ff18b543f..2263dcdfa01 100644 --- a/gradle/scripts/defaultBuildProperties.gradle +++ b/gradle/scripts/defaultBuildProperties.gradle @@ -41,7 +41,8 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project) .register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository")) .register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus")) .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce dependencies version")) - .register(new BuildProperty("openTelemetryVersion", "1.29.0", "OpenTelemetry dependencies version")) + .register(new BuildProperty("openTelemetryVersion", "1.30.0", "OpenTelemetry dependencies version")) + .register(new BuildProperty("micrometerVersion", "1.11.1", "Micrometer dependencies version")) task buildProperties(description: 'Lists main properties that can be used to customize the build') { doLast { BUILD_PROPERTIES.printHelp(); @@ -74,5 +75,6 @@ BUILD_PROPERTIES.ensureDefined('kafka1Version') BUILD_PROPERTIES.ensureDefined('pegasusVersion') BUILD_PROPERTIES.ensureDefined('salesforceVersion') BUILD_PROPERTIES.ensureDefined('openTelemetryVersion') +BUILD_PROPERTIES.ensureDefined('micrometerVersion') ext.buildProperties = BUILD_PROPERTIES diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index e4b328c5e14..3d71988c142 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -120,6 +120,8 @@ ext.externalDependency = [ "opentelemetrySdk": "io.opentelemetry:opentelemetry-sdk:" + openTelemetryVersion, "opentelemetryExporterOtlp": "io.opentelemetry:opentelemetry-exporter-otlp:" + openTelemetryVersion, "opentelemetrySdkTesting": "io.opentelemetry:opentelemetry-sdk-testing:" + openTelemetryVersion, + "micrometerCore": "io.micrometer:micrometer-core:" + micrometerVersion, + "micrometerRegistry": "io.micrometer:micrometer-registry-otlp:" + micrometerVersion, "jsch": "com.jcraft:jsch:0.1.54", "jdo2": "javax.jdo:jdo2-api:2.1", "azkaban": "com.linkedin.azkaban:azkaban:2.5.0",