children = new ArrayList<>(apiTracerFactories.size());
+
+ for (ApiTracerFactory factory : apiTracerFactories) {
+ children.add(factory.newTracer(parent, spanName, operationType));
+ }
+ return new CompositeTracer(children);
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
new file mode 100644
index 00000000000..71bdc93bfca
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporter.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.cloud.spanner.SpannerMetricsConstant.SPANNER_METRICS;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.rpc.PermissionDeniedException;
+import com.google.auth.Credentials;
+import com.google.cloud.monitoring.v3.MetricServiceClient;
+import com.google.cloud.monitoring.v3.MetricServiceSettings;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.monitoring.v3.CreateTimeSeriesRequest;
+import com.google.monitoring.v3.ProjectName;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.protobuf.Empty;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.threeten.bp.Duration;
+
+/**
+ * Spanner Cloud Monitoring OpenTelemetry Exporter.
+ *
+ * The exporter will look for all spanner owned metrics under spanner.googleapis.com
+ * instrumentation scope and upload it via the Google Cloud Monitoring API.
+ */
+@InternalApi
+public final class SpannerCloudMonitoringExporter implements MetricExporter {
+
+ private static final Logger logger =
+ Logger.getLogger(SpannerCloudMonitoringExporter.class.getName());
+
+ // This system property can be used to override the monitoring endpoint
+ // to a different environment. It's meant for internal testing only.
+ private static final String MONITORING_ENDPOINT =
+ MoreObjects.firstNonNull(
+ System.getProperty("test"), MetricServiceSettings.getDefaultEndpoint());
+
+ // This the quota limit from Cloud Monitoring. More details in
+ // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
+ private static final int EXPORT_BATCH_SIZE_LIMIT = 200;
+ private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean(false);
+ private CompletableResultCode lastExportCode;
+
+ private final MetricServiceClient client;
+ private final String spannerProjectId;
+ private final String taskId;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ public static SpannerCloudMonitoringExporter create(
+ String projectId, @Nullable Credentials credentials) throws IOException {
+ MetricServiceSettings.Builder settingsBuilder = MetricServiceSettings.newBuilder();
+ CredentialsProvider credentialsProvider;
+ if (credentials == null) {
+ credentialsProvider = NoCredentialsProvider.create();
+ } else {
+ credentialsProvider = FixedCredentialsProvider.create(credentials);
+ }
+ settingsBuilder.setCredentialsProvider(credentialsProvider);
+ settingsBuilder.setEndpoint(MONITORING_ENDPOINT);
+
+ org.threeten.bp.Duration timeout = Duration.ofMinutes(1);
+ // TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
+ // it as not retried for now.
+ settingsBuilder.createServiceTimeSeriesSettings().setSimpleTimeoutNoRetries(timeout);
+
+ return new SpannerCloudMonitoringExporter(
+ projectId,
+ MetricServiceClient.create(settingsBuilder.build()),
+ SpannerCloudMonitoringExporterUtils.getDefaultTaskValue());
+ }
+
+ @VisibleForTesting
+ SpannerCloudMonitoringExporter(String projectId, MetricServiceClient client, String taskId) {
+ this.client = client;
+ this.taskId = taskId;
+ this.spannerProjectId = projectId;
+ }
+
+ @Override
+ public CompletableResultCode export(Collection collection) {
+ if (isShutdown.get()) {
+ logger.log(Level.WARNING, "Exporter is shutting down");
+ return CompletableResultCode.ofFailure();
+ }
+
+ lastExportCode = exportSpannerResourceMetrics(collection);
+ return lastExportCode;
+ }
+
+ /** Export metrics associated with a Spanner resource. */
+ private CompletableResultCode exportSpannerResourceMetrics(Collection collection) {
+ // Filter spanner metrics
+ List spannerMetricData =
+ collection.stream()
+ .filter(md -> SPANNER_METRICS.contains(md.getName()))
+ .collect(Collectors.toList());
+
+ // Skips exporting if there's none
+ if (spannerMetricData.isEmpty()) {
+ return CompletableResultCode.ofSuccess();
+ }
+
+ // Verifies metrics project id are the same as the spanner project id set on this client
+ if (!spannerMetricData.stream()
+ .flatMap(metricData -> metricData.getData().getPoints().stream())
+ .allMatch(
+ pd -> spannerProjectId.equals(SpannerCloudMonitoringExporterUtils.getProjectId(pd)))) {
+ logger.log(Level.WARNING, "Metric data has different a projectId. Skip exporting.");
+ return CompletableResultCode.ofFailure();
+ }
+
+ List spannerTimeSeries;
+ try {
+ spannerTimeSeries =
+ SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData, taskId);
+ } catch (Throwable e) {
+ logger.log(
+ Level.WARNING,
+ "Failed to convert spanner table metric data to cloud monitoring timeseries.",
+ e);
+ return CompletableResultCode.ofFailure();
+ }
+
+ ProjectName projectName = ProjectName.of(spannerProjectId);
+
+ ApiFuture> futureList = exportTimeSeriesInBatch(projectName, spannerTimeSeries);
+
+ CompletableResultCode spannerExportCode = new CompletableResultCode();
+ ApiFutures.addCallback(
+ futureList,
+ new ApiFutureCallback>() {
+ @Override
+ public void onFailure(Throwable throwable) {
+ if (spannerExportFailureLogged.compareAndSet(false, true)) {
+ String msg = "createServiceTimeSeries request failed for spanner metrics.";
+ if (throwable instanceof PermissionDeniedException) {
+ msg +=
+ String.format(
+ " Need monitoring metric writer permission on project=%s. Follow TODO to set up permissions.",
+ projectName.getProject());
+ }
+ logger.log(Level.WARNING, msg, throwable);
+ }
+ spannerExportCode.fail();
+ }
+
+ @Override
+ public void onSuccess(List empty) {
+ spannerExportFailureLogged.set(false);
+ spannerExportCode.succeed();
+ }
+ },
+ MoreExecutors.directExecutor());
+
+ return spannerExportCode;
+ }
+
+ private ApiFuture> exportTimeSeriesInBatch(
+ ProjectName projectName, List timeSeries) {
+ List> batchResults = new ArrayList<>();
+
+ for (List batch : Iterables.partition(timeSeries, EXPORT_BATCH_SIZE_LIMIT)) {
+ CreateTimeSeriesRequest req =
+ CreateTimeSeriesRequest.newBuilder()
+ .setName(projectName.toString())
+ .addAllTimeSeries(batch)
+ .build();
+ ApiFuture f = this.client.createServiceTimeSeriesCallable().futureCall(req);
+ batchResults.add(f);
+ }
+
+ return ApiFutures.allAsList(batchResults);
+ }
+
+ @Override
+ public CompletableResultCode flush() {
+ if (lastExportCode != null) {
+ return lastExportCode;
+ }
+ return CompletableResultCode.ofSuccess();
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ if (!isShutdown.compareAndSet(false, true)) {
+ logger.log(Level.WARNING, "shutdown is called multiple times");
+ return CompletableResultCode.ofSuccess();
+ }
+ CompletableResultCode flushResult = flush();
+ CompletableResultCode shutdownResult = new CompletableResultCode();
+ flushResult.whenComplete(
+ () -> {
+ Throwable throwable = null;
+ try {
+ client.shutdown();
+ } catch (Throwable e) {
+ logger.log(Level.WARNING, "failed to shutdown the monitoring client", e);
+ throwable = e;
+ }
+ if (throwable != null) {
+ shutdownResult.fail();
+ } else {
+ shutdownResult.succeed();
+ }
+ });
+ return CompletableResultCode.ofAll(Arrays.asList(flushResult, shutdownResult));
+ }
+
+ /**
+ * For Google Cloud Monitoring always return CUMULATIVE to keep track of the cumulative value of a
+ * metric over time.
+ */
+ @Override
+ public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
+ return AggregationTemporality.CUMULATIVE;
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
new file mode 100644
index 00000000000..ae16b540fa1
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterUtils.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.api.MetricDescriptor.MetricKind.CUMULATIVE;
+import static com.google.api.MetricDescriptor.MetricKind.GAUGE;
+import static com.google.api.MetricDescriptor.MetricKind.UNRECOGNIZED;
+import static com.google.api.MetricDescriptor.ValueType.DISTRIBUTION;
+import static com.google.api.MetricDescriptor.ValueType.DOUBLE;
+import static com.google.api.MetricDescriptor.ValueType.INT64;
+import static com.google.cloud.spanner.SpannerMetricsConstant.CLIENT_NAME_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.CLIENT_UID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.GAX_METER_NAME;
+import static com.google.cloud.spanner.SpannerMetricsConstant.INSTANCE_CONFIG_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.INSTANCE_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.LOCATION_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.PROJECT_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.SPANNER_RESOURCE_TYPE;
+
+import com.google.api.Distribution;
+import com.google.api.Distribution.BucketOptions;
+import com.google.api.Distribution.BucketOptions.Explicit;
+import com.google.api.Metric;
+import com.google.api.MetricDescriptor.MetricKind;
+import com.google.api.MetricDescriptor.ValueType;
+import com.google.api.MonitoredResource;
+import com.google.monitoring.v3.Point;
+import com.google.monitoring.v3.TimeInterval;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.monitoring.v3.TypedValue;
+import com.google.protobuf.util.Timestamps;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.DoublePointData;
+import io.opentelemetry.sdk.metrics.data.HistogramData;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.MetricDataType;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.metrics.data.SumData;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class SpannerCloudMonitoringExporterUtils {
+
+ private static final Logger logger =
+ Logger.getLogger(SpannerCloudMonitoringExporterUtils.class.getName());
+
+ static String getProjectId(PointData pointData) {
+ return pointData.getAttributes().get(PROJECT_ID_KEY);
+ }
+
+ /**
+ * In most cases this should look like ${UUID}@${hostname}. The hostname will be retrieved from
+ * the jvm name and fallback to the local hostname.
+ */
+ static String getDefaultTaskValue() {
+ // Something like '@'
+ final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
+ // If jvm doesn't have the expected format, fallback to the local hostname
+ if (jvmName.indexOf('@') < 1) {
+ String hostname = "localhost";
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.log(Level.INFO, "Unable to get the hostname.", e);
+ }
+ // Generate a random number and use the same format "random_number@hostname".
+ return UUID.randomUUID() + "@" + hostname;
+ }
+ return UUID.randomUUID() + jvmName;
+ }
+
+ static List convertToSpannerTimeSeries(List collection, String taskId) {
+ List allTimeSeries = new ArrayList<>();
+
+ for (MetricData metricData : collection) {
+ // Get common metrics data from GAX library
+ if (!metricData.getInstrumentationScopeInfo().getName().equals(GAX_METER_NAME)) {
+ // Filter out metric data for instruments that are not part of the spanner metrics list
+ continue;
+ }
+ metricData.getData().getPoints().stream()
+ .map(pointData -> convertPointToSpannerTimeSeries(metricData, pointData, taskId))
+ .forEach(allTimeSeries::add);
+ }
+
+ return allTimeSeries;
+ }
+
+ private static TimeSeries convertPointToSpannerTimeSeries(
+ MetricData metricData, PointData pointData, String taskId) {
+ TimeSeries.Builder builder =
+ TimeSeries.newBuilder()
+ .setMetricKind(convertMetricKind(metricData))
+ .setValueType(convertValueType(metricData.getType()));
+ Metric.Builder metricBuilder = Metric.newBuilder().setType(metricData.getName());
+
+ Attributes attributes = pointData.getAttributes();
+ MonitoredResource.Builder monitoredResourceBuilder =
+ MonitoredResource.newBuilder().setType(SPANNER_RESOURCE_TYPE);
+
+ // TODO: Move these to SPANNER_PROMOTED_RESOURCE_LABELS
+ monitoredResourceBuilder.putLabels(LOCATION_ID_KEY.getKey(), "us-central1");
+ monitoredResourceBuilder.putLabels(INSTANCE_CONFIG_ID_KEY.getKey(), "us-central1");
+ monitoredResourceBuilder.putLabels(PROJECT_ID_KEY.getKey(), "span-cloud-testing");
+ monitoredResourceBuilder.putLabels(INSTANCE_ID_KEY.getKey(), "surbhi-testing");
+
+ // for (AttributeKey> key : attributes.asMap().keySet()) {
+ // if (SPANNER_PROMOTED_RESOURCE_LABELS.contains(key)) {
+ // monitoredResourceBuilder.putLabels(key.getKey(), String.valueOf(attributes.get(key)));
+ // } else {
+ // metricBuilder.putLabels(key.getKey(), String.valueOf(attributes.get(key)));
+ // }
+ // }
+
+ builder.setResource(monitoredResourceBuilder.build());
+
+ metricBuilder.putLabels(CLIENT_UID_KEY.getKey(), taskId);
+ metricBuilder.putLabels(CLIENT_NAME_KEY.getKey(), "java");
+ builder.setMetric(metricBuilder.build());
+
+ TimeInterval timeInterval =
+ TimeInterval.newBuilder()
+ .setStartTime(Timestamps.fromNanos(pointData.getStartEpochNanos()))
+ .setEndTime(Timestamps.fromNanos(pointData.getEpochNanos()))
+ .build();
+
+ builder.addPoints(createPoint(metricData.getType(), pointData, timeInterval));
+
+ return builder.build();
+ }
+
+ private static MetricKind convertMetricKind(MetricData metricData) {
+ switch (metricData.getType()) {
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return convertHistogramType(metricData.getHistogramData());
+ case LONG_GAUGE:
+ case DOUBLE_GAUGE:
+ return GAUGE;
+ case LONG_SUM:
+ return convertSumDataType(metricData.getLongSumData());
+ case DOUBLE_SUM:
+ return convertSumDataType(metricData.getDoubleSumData());
+ default:
+ return UNRECOGNIZED;
+ }
+ }
+
+ private static MetricKind convertHistogramType(HistogramData histogramData) {
+ if (histogramData.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) {
+ return CUMULATIVE;
+ }
+ return UNRECOGNIZED;
+ }
+
+ private static MetricKind convertSumDataType(SumData> sum) {
+ if (!sum.isMonotonic()) {
+ return GAUGE;
+ }
+ if (sum.getAggregationTemporality() == AggregationTemporality.CUMULATIVE) {
+ return CUMULATIVE;
+ }
+ return UNRECOGNIZED;
+ }
+
+ private static ValueType convertValueType(MetricDataType metricDataType) {
+ switch (metricDataType) {
+ case LONG_GAUGE:
+ case LONG_SUM:
+ return INT64;
+ case DOUBLE_GAUGE:
+ case DOUBLE_SUM:
+ return DOUBLE;
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return DISTRIBUTION;
+ default:
+ return ValueType.UNRECOGNIZED;
+ }
+ }
+
+ private static Point createPoint(
+ MetricDataType type, PointData pointData, TimeInterval timeInterval) {
+ Point.Builder builder = Point.newBuilder().setInterval(timeInterval);
+ switch (type) {
+ case HISTOGRAM:
+ case EXPONENTIAL_HISTOGRAM:
+ return builder
+ .setValue(
+ TypedValue.newBuilder()
+ .setDistributionValue(convertHistogramData((HistogramPointData) pointData))
+ .build())
+ .build();
+ case DOUBLE_GAUGE:
+ case DOUBLE_SUM:
+ return builder
+ .setValue(
+ TypedValue.newBuilder()
+ .setDoubleValue(((DoublePointData) pointData).getValue())
+ .build())
+ .build();
+ case LONG_GAUGE:
+ case LONG_SUM:
+ return builder
+ .setValue(TypedValue.newBuilder().setInt64Value(((LongPointData) pointData).getValue()))
+ .build();
+ default:
+ logger.log(Level.WARNING, "unsupported metric type");
+ return builder.build();
+ }
+ }
+
+ private static Distribution convertHistogramData(HistogramPointData pointData) {
+ return Distribution.newBuilder()
+ .setCount(pointData.getCount())
+ .setMean(pointData.getCount() == 0L ? 0.0D : pointData.getSum() / pointData.getCount())
+ .setBucketOptions(
+ BucketOptions.newBuilder()
+ .setExplicitBuckets(Explicit.newBuilder().addAllBounds(pointData.getBoundaries())))
+ .addAllBucketCounts(pointData.getCounts())
+ .build();
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerMetricsConstant.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerMetricsConstant.java
new file mode 100644
index 00000000000..b5d4e764ea8
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerMetricsConstant.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import com.google.common.collect.ImmutableSet;
+import io.opentelemetry.api.common.AttributeKey;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SpannerMetricsConstant {
+
+ public static final String METER_NAME = "spanner.googleapis.com/internal/client/";
+
+ public static final String GAX_METER_NAME = "gax-java";
+
+ static final String OPERATION_LATENCIES_NAME = "operation_latencies";
+ static final String ATTEMPT_LATENCIES_NAME = "attempt_latencies";
+ static final String OPERATION_COUNT_NAME = "operation_count";
+ static final String ATTEMPT_COUNT_NAME = "attempt_count";
+
+ public static final Set SPANNER_METRICS =
+ ImmutableSet.of(
+ OPERATION_LATENCIES_NAME,
+ ATTEMPT_LATENCIES_NAME,
+ OPERATION_COUNT_NAME,
+ ATTEMPT_COUNT_NAME)
+ .stream()
+ .map(m -> SpannerMetricsConstant.METER_NAME + m)
+ .collect(Collectors.toSet());
+
+ // TODO: This will change to `spanner_client`
+ public static final String SPANNER_RESOURCE_TYPE = "spanner_instance";
+
+ public static final AttributeKey PROJECT_ID_KEY = AttributeKey.stringKey("project_id");
+ public static final AttributeKey INSTANCE_ID_KEY = AttributeKey.stringKey("instance_id");
+ public static final AttributeKey LOCATION_ID_KEY = AttributeKey.stringKey("location");
+ public static final AttributeKey INSTANCE_CONFIG_ID_KEY =
+ AttributeKey.stringKey("instance_config");
+
+ // These metric labels will be promoted to the spanner monitored resource fields
+ public static final Set> SPANNER_PROMOTED_RESOURCE_LABELS =
+ ImmutableSet.of(PROJECT_ID_KEY, INSTANCE_ID_KEY, INSTANCE_CONFIG_ID_KEY, LOCATION_ID_KEY);
+
+ public static final AttributeKey DATABASE_KEY = AttributeKey.stringKey("database");
+ public static final AttributeKey CLIENT_UID_KEY = AttributeKey.stringKey("client_uid");
+ public static final AttributeKey CLIENT_NAME_KEY = AttributeKey.stringKey("client_name");
+ public static final AttributeKey METHOD_KEY = AttributeKey.stringKey("method");
+ public static final AttributeKey STATUS_KEY = AttributeKey.stringKey("status");
+ public static final AttributeKey DIRECT_PATH_ENABLED_KEY =
+ AttributeKey.stringKey("directpath_enabled");
+ public static final AttributeKey DIRECT_PATH_USED_KEY =
+ AttributeKey.stringKey("directpath_used");
+
+ public static final Set COMMON_ATTRIBUTES =
+ ImmutableSet.of(
+ PROJECT_ID_KEY,
+ INSTANCE_ID_KEY,
+ LOCATION_ID_KEY,
+ INSTANCE_CONFIG_ID_KEY,
+ CLIENT_UID_KEY,
+ METHOD_KEY,
+ STATUS_KEY,
+ DATABASE_KEY,
+ CLIENT_NAME_KEY,
+ DIRECT_PATH_ENABLED_KEY,
+ DIRECT_PATH_USED_KEY);
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
index 79e04805943..adcee9565a2 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java
@@ -30,6 +30,8 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracerFactory;
+import com.google.api.gax.tracing.MetricsTracerFactory;
+import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceDefaults;
@@ -53,6 +55,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -68,11 +71,23 @@
import io.grpc.MethodDescriptor;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.export.MetricExporter;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -81,6 +96,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -155,7 +171,9 @@ public class SpannerOptions extends ServiceOptions {
private final boolean useVirtualThreads;
private final OpenTelemetry openTelemetry;
private final boolean enableApiTracing;
+ private final boolean enableBuiltInMetrics;
private final boolean enableExtendedTracing;
+ private OpenTelemetry builtInOpenTelemetry;
enum TracingFramework {
OPEN_CENSUS,
@@ -662,6 +680,7 @@ protected SpannerOptions(Builder builder) {
openTelemetry = builder.openTelemetry;
enableApiTracing = builder.enableApiTracing;
enableExtendedTracing = builder.enableExtendedTracing;
+ enableBuiltInMetrics = builder.enableBuiltInMetrics;
}
/**
@@ -694,6 +713,10 @@ default boolean isEnableExtendedTracing() {
default boolean isEnableApiTracing() {
return false;
}
+
+ default boolean isEnableBuiltInMetrics() {
+ return true;
+ }
}
/**
@@ -707,6 +730,7 @@ private static class SpannerEnvironmentImpl implements SpannerEnvironment {
"SPANNER_OPTIMIZER_STATISTICS_PACKAGE";
private static final String SPANNER_ENABLE_EXTENDED_TRACING = "SPANNER_ENABLE_EXTENDED_TRACING";
private static final String SPANNER_ENABLE_API_TRACING = "SPANNER_ENABLE_API_TRACING";
+ private static final String SPANNER_ENABLE_BUILTIN_METRICS = "SPANNER_ENABLE_BUILTIN_METRICS";
private SpannerEnvironmentImpl() {}
@@ -732,6 +756,11 @@ public boolean isEnableExtendedTracing() {
public boolean isEnableApiTracing() {
return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_API_TRACING));
}
+
+ @Override
+ public boolean isEnableBuiltInMetrics() {
+ return Boolean.parseBoolean(System.getenv(SPANNER_ENABLE_BUILTIN_METRICS));
+ }
}
/** Builder for {@link SpannerOptions} instances. */
@@ -797,6 +826,7 @@ public static class Builder
private OpenTelemetry openTelemetry;
private boolean enableApiTracing = SpannerOptions.environment.isEnableApiTracing();
private boolean enableExtendedTracing = SpannerOptions.environment.isEnableExtendedTracing();
+ private boolean enableBuiltInMetrics = SpannerOptions.environment.isEnableBuiltInMetrics();
private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
@@ -862,6 +892,7 @@ protected Builder() {
this.useVirtualThreads = options.useVirtualThreads;
this.enableApiTracing = options.enableApiTracing;
this.enableExtendedTracing = options.enableExtendedTracing;
+ this.enableBuiltInMetrics = options.enableBuiltInMetrics;
}
@Override
@@ -1382,6 +1413,12 @@ public Builder setEnableExtendedTracing(boolean enableExtendedTracing) {
return this;
}
+ /** Disables client built in metrics. */
+ public Builder disableBuiltInMetrics() {
+ this.enableBuiltInMetrics = false;
+ return this;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
@@ -1614,6 +1651,7 @@ public boolean isAttemptDirectPath() {
public OpenTelemetry getOpenTelemetry() {
if (this.openTelemetry != null) {
return this.openTelemetry;
+
} else {
return GlobalOpenTelemetry.get();
}
@@ -1621,8 +1659,17 @@ public OpenTelemetry getOpenTelemetry() {
@Override
public ApiTracerFactory getApiTracerFactory() {
+ List apiTracerFactories = new ArrayList();
+
// Prefer any direct ApiTracerFactory that might have been set on the builder.
- return MoreObjects.firstNonNull(super.getApiTracerFactory(), getDefaultApiTracerFactory());
+ apiTracerFactories.add(
+ MoreObjects.firstNonNull(super.getApiTracerFactory(), getDefaultApiTracerFactory()));
+
+ // Add Metrics Tracer factory
+ if (isEnableBuiltInMetrics()) {
+ apiTracerFactories.add(getMetricsApiTracerFactory());
+ }
+ return new CompositeTracerFactory(apiTracerFactories);
}
private ApiTracerFactory getDefaultApiTracerFactory() {
@@ -1641,6 +1688,66 @@ private ApiTracerFactory getDefaultApiTracerFactory() {
return BaseApiTracerFactory.getInstance();
}
+ public ApiTracerFactory getMetricsApiTracerFactory() {
+ return new MetricsTracerFactory(
+ new OpenTelemetryMetricsRecorder(
+ getBuiltInOpenTelemetry(), SpannerMetricsConstant.METER_NAME));
+ }
+
+ public OpenTelemetry getBuiltInOpenTelemetry() {
+ if (this.builtInOpenTelemetry == null) {
+
+ // Use custom exporter
+ MetricExporter metricExporter = null;
+ try {
+ metricExporter = SpannerCloudMonitoringExporter.create(getDefaultProject(), credentials);
+
+ Aggregation AGGREGATION_WITH_MILLIS_HISTOGRAM =
+ Aggregation.explicitBucketHistogram(
+ ImmutableList.of(
+ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, 16.0, 20.0, 25.0, 30.0,
+ 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, 300.0, 400.0, 500.0,
+ 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0,
+ 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0));
+
+ SdkMeterProviderBuilder sdkMeterProviderBuilder = SdkMeterProvider.builder();
+ InstrumentSelector selector =
+ InstrumentSelector.builder()
+ .setName("spanner/operation_latency")
+ .setMeterName(SpannerMetricsConstant.GAX_METER_NAME)
+ .setType(InstrumentType.HISTOGRAM)
+ .setUnit("ms")
+ .build();
+ Set attributesFilter =
+ ImmutableSet.builder()
+ .addAll(
+ SpannerMetricsConstant.COMMON_ATTRIBUTES.stream()
+ .map(AttributeKey::getKey)
+ .collect(Collectors.toSet()))
+ .build();
+ View view =
+ View.builder()
+ .setName(
+ SpannerMetricsConstant.METER_NAME
+ + SpannerMetricsConstant.OPERATION_LATENCIES_NAME)
+ .setAggregation(AGGREGATION_WITH_MILLIS_HISTOGRAM)
+ .setAttributeFilter(attributesFilter)
+ .build();
+ sdkMeterProviderBuilder.registerView(selector, view);
+ SdkMeterProvider sdkMeterProvider =
+ sdkMeterProviderBuilder
+ .registerMetricReader(PeriodicMetricReader.create(metricExporter))
+ .build();
+
+ this.builtInOpenTelemetry =
+ OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();
+ } catch (IOException e) {
+ this.builtInOpenTelemetry = OpenTelemetry.noop();
+ }
+ }
+ return this.builtInOpenTelemetry;
+ }
+
/**
* Returns true if an {@link com.google.api.gax.tracing.ApiTracer} should be created and set on
* the Spanner client. Enabling this only has effect if an OpenTelemetry or OpenCensus trace
@@ -1650,6 +1757,15 @@ public boolean isEnableApiTracing() {
return enableApiTracing;
}
+ /**
+ * Returns true if an {@link com.google.api.gax.tracing.ApiTracer} should be created and set on
+ * the Spanner client. Enabling this only has effect if an OpenTelemetry or OpenCensus trace
+ * exporter has been configured.
+ */
+ public boolean isEnableBuiltInMetrics() {
+ return enableApiTracing;
+ }
+
@BetaApi
public boolean isUseVirtualThreads() {
return useVirtualThreads;
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
index 76b6c65a9b8..9753d1ed740 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java
@@ -15,6 +15,7 @@
*/
package com.google.cloud.spanner.spi.v1;
+import static com.google.api.gax.grpc.GrpcCallContext.TRACER_KEY;
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.DATABASE_ID;
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.INSTANCE_ID;
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.METHOD;
@@ -22,6 +23,7 @@
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT;
import static com.google.cloud.spanner.spi.v1.SpannerRpcViews.SPANNER_GFE_LATENCY;
+import com.google.api.gax.tracing.MetricsTracer;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.SpannerRpcMetrics;
import com.google.common.cache.Cache;
@@ -96,8 +98,10 @@ public void start(Listener responseListener, Metadata headers) {
DatabaseName databaseName = extractDatabaseName(headers);
String key = databaseName + method.getFullMethodName();
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
+ MetricsTracer metricsTracer = (MetricsTracer) callOptions.getOption(TRACER_KEY);
Attributes attributes =
getMetricAttributes(key, method.getFullMethodName(), databaseName);
+ addBuiltInMetricAttributes(metricsTracer, databaseName);
super.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
@@ -197,4 +201,11 @@ private Attributes getMetricAttributes(String key, String method, DatabaseName d
return attributesBuilder.build();
});
}
+
+ private void addBuiltInMetricAttributes(MetricsTracer metricsTracer, DatabaseName databaseName) {
+ // Built in metrics Attributes.
+ metricsTracer.addAttributes("database", databaseName.getDatabase());
+ metricsTracer.addAttributes("instance_id", databaseName.getInstance());
+ metricsTracer.addAttributes("project_id", databaseName.getProject());
+ }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
new file mode 100644
index 00000000000..596c4116b70
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerCloudMonitoringExporterTest.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.cloud.spanner.SpannerMetricsConstant.CLIENT_NAME_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.CLIENT_UID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.DATABASE_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.DIRECT_PATH_ENABLED_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.DIRECT_PATH_USED_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.INSTANCE_CONFIG_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.INSTANCE_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.LOCATION_ID_KEY;
+import static com.google.cloud.spanner.SpannerMetricsConstant.OPERATION_LATENCIES_NAME;
+import static com.google.cloud.spanner.SpannerMetricsConstant.PROJECT_ID_KEY;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.auth.Credentials;
+import com.google.cloud.monitoring.v3.MetricServiceClient;
+import com.google.cloud.monitoring.v3.stub.MetricServiceStub;
+import com.google.common.collect.ImmutableList;
+import com.google.monitoring.v3.CreateTimeSeriesRequest;
+import com.google.monitoring.v3.TimeSeries;
+import com.google.protobuf.Empty;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
+import io.opentelemetry.sdk.common.export.MemoryMode;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
+import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
+import io.opentelemetry.sdk.resources.Resource;
+import java.io.IOException;
+import java.util.Arrays;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class SpannerCloudMonitoringExporterTest {
+
+ private static final String projectId = "fake-project";
+ private static final String instanceId = "fake-instance";
+ private static final String locationId = "default";
+ private static final String tableId = "fake-table";
+ private static final String zone = "us-east-1";
+ private static final String cluster = "cluster-1";
+
+ private static final String clientName = "fake-client-name";
+ private static final String taskId = "fake-task-id";
+
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock private MetricServiceStub mockMetricServiceStub;
+ private MetricServiceClient fakeMetricServiceClient;
+ private SpannerCloudMonitoringExporter exporter;
+
+ private Attributes attributes;
+ private Resource resource;
+ private InstrumentationScopeInfo scope;
+
+ @Before
+ public void setUp() {
+ fakeMetricServiceClient = new FakeMetricServiceClient(mockMetricServiceStub);
+ exporter = new SpannerCloudMonitoringExporter(projectId, fakeMetricServiceClient, taskId);
+
+ attributes =
+ Attributes.builder()
+ .put(PROJECT_ID_KEY, projectId)
+ .put(INSTANCE_ID_KEY, instanceId)
+ .put(LOCATION_ID_KEY, locationId)
+ .put(INSTANCE_CONFIG_ID_KEY, cluster)
+ .put(DATABASE_KEY, zone)
+ .put(String.valueOf(DIRECT_PATH_ENABLED_KEY), true)
+ .put(String.valueOf(DIRECT_PATH_USED_KEY), true)
+ .build();
+
+ resource = Resource.create(Attributes.empty());
+
+ scope = InstrumentationScopeInfo.create(SpannerMetricsConstant.GAX_METER_NAME);
+ }
+
+ @After
+ public void tearDown() {}
+
+ @Test
+ public void createWhenNullCredentials() throws IOException {
+ Credentials credentials = null;
+ SpannerCloudMonitoringExporter actualExporter =
+ SpannerCloudMonitoringExporter.create(projectId, null);
+ assertThat(actualExporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
+ }
+
+ @Test
+ public void createWhenValidCredentials() throws IOException {
+ Credentials credentials = new NoCredentialsProvider().getCredentials();
+ SpannerCloudMonitoringExporter actualExporter =
+ SpannerCloudMonitoringExporter.create(projectId, credentials);
+ assertThat(actualExporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
+ }
+
+ @Test
+ public void testExportingSumData() {
+ ArgumentCaptor argumentCaptor =
+ ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);
+
+ UnaryCallable mockCallable = Mockito.mock(UnaryCallable.class);
+ Mockito.when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
+ ApiFuture future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ Mockito.when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);
+
+ long fakeValue = 11L;
+
+ long startEpoch = 10;
+ long endEpoch = 15;
+ LongPointData longPointData =
+ ImmutableLongPointData.create(startEpoch, endEpoch, attributes, fakeValue);
+
+ MetricData longData =
+ ImmutableMetricData.createLongSum(
+ resource,
+ scope,
+ "spanner.googleapis.com/internal/client/" + OPERATION_LATENCIES_NAME,
+ "description",
+ "1",
+ ImmutableSumData.create(
+ true, AggregationTemporality.CUMULATIVE, ImmutableList.of(longPointData)));
+
+ exporter.export(Arrays.asList(longData));
+
+ CreateTimeSeriesRequest request = argumentCaptor.getValue();
+
+ assertThat(request.getTimeSeriesList()).hasSize(1);
+
+ TimeSeries timeSeries = request.getTimeSeriesList().get(0);
+
+ assertThat(timeSeries.getResource().getLabelsMap())
+ .containsExactly(
+ PROJECT_ID_KEY.getKey(), projectId,
+ INSTANCE_ID_KEY.getKey(), instanceId,
+ LOCATION_ID_KEY.getKey(), locationId,
+ INSTANCE_CONFIG_ID_KEY.getKey(), cluster,
+ DATABASE_KEY.getKey(), zone,
+ DIRECT_PATH_ENABLED_KEY.getKey(), true,
+ DIRECT_PATH_USED_KEY.getKey(), true);
+
+ assertThat(timeSeries.getMetric().getLabelsMap()).hasSize(2);
+
+ assertThat(timeSeries.getMetric().getLabelsMap())
+ .containsAtLeast(CLIENT_NAME_KEY.getKey(), "java", CLIENT_UID_KEY.getKey(), taskId);
+ assertThat(timeSeries.getPoints(0).getValue().getInt64Value()).isEqualTo(fakeValue);
+ assertThat(timeSeries.getPoints(0).getInterval().getStartTime().getNanos())
+ .isEqualTo(startEpoch);
+ assertThat(timeSeries.getPoints(0).getInterval().getEndTime().getNanos()).isEqualTo(endEpoch);
+ }
+
+ @Test
+ public void getAggregationTemporality() throws IOException {
+ SpannerCloudMonitoringExporter actualExporter =
+ SpannerCloudMonitoringExporter.create(projectId, null);
+ assertThat(actualExporter.getAggregationTemporality(InstrumentType.COUNTER))
+ .isEqualTo(AggregationTemporality.CUMULATIVE);
+ }
+
+ private static class FakeMetricServiceClient extends MetricServiceClient {
+
+ protected FakeMetricServiceClient(MetricServiceStub stub) {
+ super(stub);
+ }
+ }
+}