diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index dd4eb17e9..bb2fe9ead 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -800,17 +800,26 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption.. @Override public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { - Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); - ReadObjectRequest request = getReadObjectRequest(blob, opts); - GrpcCallContext grpcCallContext = Retrying.newCallContext(); + Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName()); + try (Scope unused = otelSpan.makeCurrent()) { + Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); + ReadObjectRequest request = getReadObjectRequest(blob, opts); + GrpcCallContext grpcCallContext = Retrying.newCallContext(); - return new GrpcBlobReadChannel( - storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), - getOptions(), - retryAlgorithmManager.getFor(request), - responseContentLifecycleManager, - request, - !opts.autoGzipDecompression()); + return new GrpcBlobReadChannel( + storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + getOptions(), + retryAlgorithmManager.getFor(request), + responseContentLifecycleManager, + request, + !opts.autoGzipDecompression()); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override @@ -853,25 +862,35 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. @Override public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { - Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); - GrpcCallContext grpcCallContext = - opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - Hasher hasher = Hasher.noop(); - // in JSON, the starting of the resumable session happens before the invocation of write can - // happen. Emulate the same thing here. - // 1. create the future - ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts); - // 2. await the result of the future - ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite); - // 3. wrap the result in another future container before constructing the BlobWriteChannel - ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite); - return new GrpcBlobWriteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext), - getOptions(), - retryAlgorithmManager.idempotent(), - () -> wrapped, - hasher); + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); + try (Scope unused = otelSpan.makeCurrent()) { + Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + Hasher hasher = Hasher.noop(); + // in JSON, the starting of the resumable session happens before the invocation of write can + // happen. Emulate the same thing here. + // 1. create the future + ApiFuture<ResumableWrite> startResumableWrite = + startResumableWrite(grpcCallContext, req, opts); + // 2. await the result of the future + ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite); + // 3. wrap the result in another future container before constructing the BlobWriteChannel + ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite); + return new GrpcBlobWriteChannel( + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext), + getOptions(), + retryAlgorithmManager.idempotent(), + () -> wrapped, + hasher); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw StorageException.coalesce(e); + } finally { + otelSpan.end(); + } } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index fbb93d1b2..ae153a801 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -51,6 +51,8 @@ import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope; +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; import com.google.common.base.CharMatcher; @@ -743,10 +745,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption... @Override public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) { - Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); - StorageObject storageObject = Conversions.json().blobId().encode(blob); - ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); - return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); + Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName()); + try (Scope unused = otelSpan.makeCurrent()) { + Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); + StorageObject storageObject = Conversions.json().blobId().encode(blob); + ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); + return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + otelSpan.end(); + } } @Override @@ -777,40 +788,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. @Override public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { - Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); - final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); + try (Scope unused = otelSpan.makeCurrent()) { + Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); + final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); - StorageObject encode = codecs.blobInfo().encode(updated); - // open the resumable session outside the write channel - // the exception behavior of open is different from #write(ByteBuffer) - Supplier<String> uploadIdSupplier = - ResumableMedia.startUploadForBlobInfo( - getOptions(), - updated, - optionsMap, - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); + StorageObject encode = codecs.blobInfo().encode(updated); + // open the resumable session outside the write channel + // the exception behavior of open is different from #write(ByteBuffer) + Supplier<String> uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + getOptions(), + updated, + optionsMap, + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + otelSpan.end(); + } } @Override public StorageWriteChannel writer(URL signedURL) { - // TODO: is it possible to know if a signed url is configured to have a constraint which makes - // it idempotent? - ResultRetryAlgorithm<?> forResumableUploadSessionCreate = - retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); - // open the resumable session outside the write channel - // the exception behavior of open is different from #write(ByteBuffer) - String signedUrlString = signedURL.toString(); - Supplier<String> uploadIdSupplier = - ResumableMedia.startUploadForSignedUrl( - getOptions(), signedURL, forResumableUploadSessionCreate); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); + try (Scope unused = otelSpan.makeCurrent()) { + // TODO: is it possible to know if a signed url is configured to have a constraint which makes + // it idempotent? + ResultRetryAlgorithm<?> forResumableUploadSessionCreate = + retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); + // open the resumable session outside the write channel + // the exception behavior of open is different from #write(ByteBuffer) + String signedUrlString = signedURL.toString(); + Supplier<String> uploadIdSupplier = + ResumableMedia.startUploadForSignedUrl( + getOptions(), signedURL, forResumableUploadSessionCreate); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); + } catch (Exception e) { + otelSpan.recordException(e); + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); + throw e; + } finally { + otelSpan.end(); + } } @Override diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java index c0f599e3a..7441fd272 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java @@ -19,6 +19,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.google.cloud.NoCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -170,6 +172,31 @@ public void runCopy() { Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("copy"))); } + @Test + public void runWriter() throws IOException { + BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build(); + try (WriteChannel writer = storage.writer(info)) { + // Do nothing + } + TestExporter testExported = (TestExporter) exporter; + List<SpanData> spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer"))); + } + + @Test + public void runReader() throws IOException { + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); + storage.create(blobInfo, helloWorldTextBytes); + try (ReadChannel reader = storage.reader(blobId)) { + // Do nothing + } + TestExporter testExported = (TestExporter) exporter; + List<SpanData> spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader"))); + } + private void checkCommonAttributes(List<SpanData> spanData) { for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java index a042d490f..241f28ad5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java @@ -19,6 +19,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import com.google.cloud.NoCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Storage.BlobSourceOption; import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.CopyRequest; @@ -174,6 +176,31 @@ public void runCreateFromInputStream() throws IOException { Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom"))); } + @Test + public void runWriter() throws IOException { + BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build(); + try (WriteChannel writer = storage.writer(info)) { + // Do nothing + } + TestExporter testExported = (TestExporter) exporter; + List<SpanData> spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer"))); + } + + @Test + public void runReader() throws IOException { + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build(); + storage.create(blobInfo, helloWorldTextBytes); + try (ReadChannel reader = storage.reader(blobId)) { + // Do nothing + } + TestExporter testExported = (TestExporter) exporter; + List<SpanData> spanData = testExported.getExportedSpans(); + checkCommonAttributes(spanData); + Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader"))); + } + private void checkCommonAttributes(List<SpanData> spanData) { for (SpanData span : spanData) { Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service")); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java index 186e1225b..5cfe6bff0 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/TestBench.java @@ -16,6 +16,7 @@ package com.google.cloud.storage.it.runner.registry; +import static com.google.cloud.RetryHelper.runWithRetries; import static java.util.Objects.requireNonNull; import com.google.api.client.http.ByteArrayContent; @@ -25,6 +26,10 @@ import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.core.NanoClock; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.Tuple; import com.google.cloud.conformance.storage.v1.InstructionList; import com.google.cloud.conformance.storage.v1.Method; @@ -43,13 +48,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.SocketException; +import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.threeten.bp.Duration; /** * A {@link ManagedLifecycle} which integrates with the <a target="_blank" @@ -172,7 +183,11 @@ public List<RetryTestResource> listRetryTests() throws IOException { } private boolean startGRPCServer(int gRPCPort) throws IOException { - return true; + GenericUrl url = new GenericUrl(baseUri + "/start_grpc?port=9090"); + HttpRequest req = requestFactory.buildGetRequest(url); + HttpResponse resp = req.execute(); + resp.disconnect(); + return resp.getStatusCode() == 200; } @Override @@ -181,10 +196,164 @@ public Object get() { } @Override - public void start() {} + public void start() { + try { + tempDirectory = Files.createTempDirectory(containerName); + outPath = tempDirectory.resolve("stdout"); + errPath = tempDirectory.resolve("stderr"); + File outFile = outPath.toFile(); + File errFile = errPath.toFile(); + LOGGER.info("Redirecting server stdout to: " + outFile.getAbsolutePath()); + LOGGER.info("Redirecting server stderr to: " + errFile.getAbsolutePath()); + String dockerImage = String.format("%s:%s", dockerImageName, dockerImageTag); + // First try and pull the docker image, this validates docker is available and running + // on the host, as well as gives time for the image to be downloaded independently of + // trying to start the container. (Below, when we first start the container we then attempt + // to issue a call against the api before we yield to run our tests.) + try { + Process p = + new ProcessBuilder() + .command("docker", "pull", dockerImage) + .redirectOutput(outFile) + .redirectError(errFile) + .start(); + p.waitFor(5, TimeUnit.MINUTES); + if (!ignorePullError && p.exitValue() != 0) { + dumpServerLogs(outPath, errPath); + throw new IllegalStateException( + String.format( + "Non-zero status while attempting to pull docker image '%s'", dockerImage)); + } + } catch (InterruptedException | IllegalThreadStateException e) { + dumpServerLogs(outPath, errPath); + throw new IllegalStateException( + String.format("Timeout while attempting to pull docker image '%s'", dockerImage)); + } + int port = URI.create(baseUri).getPort(); + int gRPCPort = URI.create(gRPCBaseUri).getPort(); + final List<String> command = + ImmutableList.of( + "docker", + "run", + "-i", + "--rm", + "--publish", + port + ":9000", + "--publish", + gRPCPort + ":9090", + String.format("--name=%s", containerName), + dockerImage); + process = + new ProcessBuilder() + .command(command) + .redirectOutput(outFile) + .redirectError(errFile) + .start(); + LOGGER.log(Level.INFO, command.toString()); + try { + // wait a small amount of time for the server to come up before probing + Thread.sleep(500); + // wait for the server to come up + List<RetryTestResource> existingResources = + runWithRetries( + TestBench.this::listRetryTests, + RetrySettings.newBuilder() + .setTotalTimeout(Duration.ofSeconds(30)) + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.5) + .setMaxRetryDelay(Duration.ofSeconds(5)) + .build(), + new BasicResultRetryAlgorithm<List<RetryTestResource>>() { + @Override + public boolean shouldRetry( + Throwable previousThrowable, List<RetryTestResource> previousResponse) { + return previousThrowable instanceof SocketException; + } + }, + NanoClock.getDefaultClock()); + if (!existingResources.isEmpty()) { + LOGGER.info( + "Test Server already has retry tests in it, is it running outside the tests?"); + } + // Start gRPC Service + if (!startGRPCServer(gRPCPort)) { + throw new IllegalStateException( + "Failed to start server within a reasonable amount of time. Host url(gRPC): " + + gRPCBaseUri); + } + } catch (RetryHelperException e) { + dumpServerLogs(outPath, errPath); + throw new IllegalStateException( + "Failed to connect to server within a reasonable amount of time. Host url: " + baseUri, + e.getCause()); + } + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } @Override - public void stop() {} + public void stop() { + try { + process.destroy(); + process.waitFor(2, TimeUnit.SECONDS); + boolean attemptForceStopContainer = false; + try { + int processExitValue = process.exitValue(); + if (processExitValue != 0) { + attemptForceStopContainer = true; + } + System.out.println("processExitValue = " + processExitValue); + LOGGER.warning("Container exit value = " + processExitValue); + } catch (IllegalThreadStateException e) { + attemptForceStopContainer = true; + } + if (attemptForceStopContainer) { + LOGGER.warning("Container did not gracefully exit, attempting to explicitly stop it."); + System.out.println("Container did not gracefully exit, attempting to explicitly stop it."); + ImmutableList<String> command = ImmutableList.of("docker", "kill", containerName); + System.out.println("command = " + command); + LOGGER.log(Level.WARNING, command.toString()); + Process shutdownProcess = new ProcessBuilder(command).start(); + shutdownProcess.waitFor(5, TimeUnit.SECONDS); + int shutdownProcessExitValue = shutdownProcess.exitValue(); + LOGGER.warning("Container exit value = " + shutdownProcessExitValue); + } + // wait for the server to shutdown + runWithRetries( + () -> { + try { + listRetryTests(); + } catch (SocketException e) { + // desired result + return null; + } + throw new NotShutdownException(); + }, + RetrySettings.newBuilder() + .setTotalTimeout(Duration.ofSeconds(30)) + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.5) + .setMaxRetryDelay(Duration.ofSeconds(5)) + .build(), + new BasicResultRetryAlgorithm<List<?>>() { + @Override + public boolean shouldRetry(Throwable previousThrowable, List<?> previousResponse) { + return previousThrowable instanceof NotShutdownException; + } + }, + NanoClock.getDefaultClock()); + try { + Files.delete(errPath); + Files.delete(outPath); + Files.delete(tempDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + } private void dumpServerLogs(Path outFile, Path errFile) throws IOException { try {