diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index c3d9eacb8c0910..ffb8ee87ca71ff 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -13,26 +13,31 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable; +import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture; +import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle; +import static com.google.devtools.build.lib.remote.util.Utils.grpcAwareErrorMessage; + import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.PathConverter; -import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; +import com.google.devtools.build.lib.events.Event; +import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.Path; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -40,41 +45,42 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; -/** A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. */ +/** A {@link BuildEventArtifactUploader} backed by {@link RemoteCache}. */ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted implements BuildEventArtifactUploader { - private final ListeningExecutorService uploadExecutor; + private final Executor executor; + private final ExtendedEventHandler reporter; + private final boolean verboseFailures; + private final RemoteCache remoteCache; private final String buildRequestId; private final String commandId; - private final ByteStreamUploader uploader; private final String remoteServerInstanceName; - private final MissingDigestsFinder missingDigestsFinder; private final AtomicBoolean shutdown = new AtomicBoolean(); + private final Scheduler scheduler; ByteStreamBuildEventArtifactUploader( - ByteStreamUploader uploader, - MissingDigestsFinder missingDigestsFinder, + Executor executor, + ExtendedEventHandler reporter, + boolean verboseFailures, + RemoteCache remoteCache, String remoteServerInstanceName, String buildRequestId, - String commandId, - int maxUploadThreads) { - this.uploader = Preconditions.checkNotNull(uploader); + String commandId) { + this.executor = executor; + this.reporter = reporter; + this.verboseFailures = verboseFailures; + this.remoteCache = remoteCache; this.buildRequestId = buildRequestId; this.commandId = commandId; this.remoteServerInstanceName = remoteServerInstanceName; - // Limit the maximum threads number to 1000 (chosen arbitrarily) - this.uploadExecutor = - MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - Math.min(maxUploadThreads, 1000), - new ThreadFactoryBuilder().setNameFormat("bes-artifact-uploader-%d").build())); - this.missingDigestsFinder = missingDigestsFinder; + this.scheduler = Schedulers.from(executor); } /** Returns {@code true} if Bazel knows that the file is stored on a remote system. */ @@ -143,100 +149,112 @@ private static void processQueryResult( } } - /** - * For files where {@link PathMetadata#isRemote()} returns {@code false} this method checks if the - * remote cache already contains the file. If so {@link PathMetadata#isRemote()} is set to {@code - * true}. - */ - private ListenableFuture> queryRemoteCache( - ImmutableList> allPaths) - throws ExecutionException, InterruptedException { - RequestMetadata metadata = - TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null); - RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); + private static boolean shouldUpload(PathMetadata path) { + return path.getDigest() != null && !path.isRemote() && !path.isDirectory(); + } - List knownRemotePaths = new ArrayList<>(allPaths.size()); + private Single> queryRemoteCache( + RemoteCache remoteCache, RemoteActionExecutionContext context, List paths) { + List knownRemotePaths = new ArrayList<>(paths.size()); List filesToQuery = new ArrayList<>(); Set digestsToQuery = new HashSet<>(); - for (ListenableFuture pathMetadataFuture : allPaths) { - // This line is guaranteed to not block, as this code is only called after all futures in - // allPaths have completed. - PathMetadata pathMetadata = pathMetadataFuture.get(); - if (pathMetadata.isRemote() || pathMetadata.isDirectory()) { - knownRemotePaths.add(pathMetadata); + for (PathMetadata path : paths) { + if (shouldUpload(path)) { + filesToQuery.add(path); + digestsToQuery.add(path.getDigest()); } else { - filesToQuery.add(pathMetadata); - digestsToQuery.add(pathMetadata.getDigest()); + knownRemotePaths.add(path); } } + if (digestsToQuery.isEmpty()) { - return Futures.immediateFuture(knownRemotePaths); + return Single.just(knownRemotePaths); } - return Futures.transform( - missingDigestsFinder.findMissingDigests(context, digestsToQuery), - (missingDigests) -> { - processQueryResult(missingDigests, filesToQuery, knownRemotePaths); - return knownRemotePaths; - }, - MoreExecutors.directExecutor()); + return toSingle(() -> remoteCache.findMissingDigests(context, digestsToQuery), executor) + .onErrorResumeNext( + error -> { + reporterUploadError(error); + // Assuming all digests are missing if failed to query + return Single.just(ImmutableSet.copyOf(digestsToQuery)); + }) + .map( + missingDigests -> { + processQueryResult(missingDigests, filesToQuery, knownRemotePaths); + return knownRemotePaths; + }); } - /** - * Uploads any files from {@code allPaths} where {@link PathMetadata#isRemote()} returns {@code - * false}. - */ - private ListenableFuture> uploadLocalFiles(Iterable allPaths) { + private void reporterUploadError(Throwable error) { + if (error instanceof CancellationException) { + return; + } + + String errorMessage = + "Uploading BEP referenced local files: " + grpcAwareErrorMessage(error, verboseFailures); + + reporter.handle(Event.warn(errorMessage)); + } + + private Single> uploadLocalFiles( + RemoteCache remoteCache, RemoteActionExecutionContext context, List paths) { + return Flowable.fromIterable(paths) + .flatMapSingle( + path -> { + if (!shouldUpload(path)) { + return Single.just(path); + } + + return toCompletable( + () -> remoteCache.uploadFile(context, path.getDigest(), path.getPath()), + executor) + .toSingleDefault(path) + .onErrorResumeNext( + error -> { + reporterUploadError(error); + return Single.just( + new PathMetadata( + path.getPath(), + /*digest=*/ null, + path.isDirectory(), + path.isRemote())); + }); + }) + .collect(Collectors.toList()); + } + + private Single upload(Set files) { + if (files.isEmpty()) { + return Single.just(PathConverter.NO_CONVERSION); + } + RequestMetadata metadata = TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null); RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); - ImmutableList.Builder> allPathsUploaded = - ImmutableList.builder(); - for (PathMetadata path : allPaths) { - if (!path.isRemote() && !path.isDirectory()) { - Chunker chunker = - Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build(); - final ListenableFuture upload; - upload = - uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false); - allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor)); - } else { - allPathsUploaded.add(Futures.immediateFuture(path)); - } - } - return Futures.allAsList(allPathsUploaded.build()); + return Single.using( + remoteCache::retain, + remoteCache -> + Flowable.fromIterable(files) + .map( + file -> { + try { + return readPathMetadata(file); + } catch (IOException e) { + reporterUploadError(e); + return new PathMetadata( + file, /*digest=*/ null, /*directory=*/ false, /*remote=*/ false); + } + }) + .collect(Collectors.toList()) + .flatMap(paths -> queryRemoteCache(remoteCache, context, paths)) + .flatMap(paths -> uploadLocalFiles(remoteCache, context, paths)) + .map(paths -> new PathConverterImpl(remoteServerInstanceName, paths)), + RemoteCache::release); } @Override public ListenableFuture upload(Map files) { - if (files.isEmpty()) { - return Futures.immediateFuture(PathConverter.NO_CONVERSION); - } - // Collect metadata about each path - ImmutableList.Builder> allPathMetadata = ImmutableList.builder(); - for (Path file : files.keySet()) { - ListenableFuture pathMetadata = - uploadExecutor.submit(() -> readPathMetadata(file)); - allPathMetadata.add(pathMetadata); - } - - // Query the remote cache to check which files need to be uploaded - ImmutableList> allPaths = allPathMetadata.build(); - ListenableFuture> allPathsUpdatedMetadata = - Futures.whenAllSucceed(allPaths) - .callAsync(() -> queryRemoteCache(allPaths), MoreExecutors.directExecutor()); - - // Upload local files (if any) - ListenableFuture> allPathsMetadata = - Futures.transformAsync( - allPathsUpdatedMetadata, - (paths) -> uploadLocalFiles(paths), - MoreExecutors.directExecutor()); - - return Futures.transform( - allPathsMetadata, - (metadata) -> new PathConverterImpl(remoteServerInstanceName, metadata), - MoreExecutors.directExecutor()); + return toListenableFuture(upload(files.keySet()).subscribeOn(scheduler)); } @Override @@ -249,8 +267,7 @@ protected void deallocate() { if (shutdown.getAndSet(true)) { return; } - uploader.release(); - uploadExecutor.shutdown(); + remoteCache.release(); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java index e2a1e27769f3dc..31edf398e75653 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderFactory.java @@ -14,31 +14,34 @@ package com.google.devtools.build.lib.remote; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; -import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; -import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory; import com.google.devtools.build.lib.runtime.CommandEnvironment; +import java.util.concurrent.Executor; -/** - * A factory for {@link ByteStreamBuildEventArtifactUploader}. - */ -class ByteStreamBuildEventArtifactUploaderFactory implements - BuildEventArtifactUploaderFactory { +/** A factory for {@link ByteStreamBuildEventArtifactUploader}. */ +class ByteStreamBuildEventArtifactUploaderFactory implements BuildEventArtifactUploaderFactory { - private final ByteStreamUploader uploader; + private final Executor executor; + private final ExtendedEventHandler reporter; + private final boolean verboseFailures; + private final RemoteCache remoteCache; private final String remoteServerInstanceName; private final String buildRequestId; private final String commandId; - private final MissingDigestsFinder missingDigestsFinder; ByteStreamBuildEventArtifactUploaderFactory( - ByteStreamUploader uploader, - MissingDigestsFinder missingDigestsFinder, + Executor executor, + ExtendedEventHandler reporter, + boolean verboseFailures, + RemoteCache remoteCache, String remoteServerInstanceName, String buildRequestId, String commandId) { - this.uploader = uploader; - this.missingDigestsFinder = missingDigestsFinder; + this.executor = executor; + this.reporter = reporter; + this.verboseFailures = verboseFailures; + this.remoteCache = remoteCache; this.remoteServerInstanceName = remoteServerInstanceName; this.buildRequestId = buildRequestId; this.commandId = commandId; @@ -47,11 +50,12 @@ class ByteStreamBuildEventArtifactUploaderFactory implements @Override public BuildEventArtifactUploader create(CommandEnvironment env) { return new ByteStreamBuildEventArtifactUploader( - uploader.retain(), - missingDigestsFinder, + executor, + reporter, + verboseFailures, + remoteCache.retain(), remoteServerInstanceName, buildRequestId, - commandId, - env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads); + commandId); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index fb183a5a0c2746..28e84497bf89d1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -118,6 +118,16 @@ public ByteStreamUploader( this.retrier = retrier; } + @VisibleForTesting + ReferenceCountedChannel getChannel() { + return channel; + } + + @VisibleForTesting + RemoteRetrier getRetrier() { + return retrier; + } + /** * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index 28754866bd2aed..ad124412835a55 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -17,7 +17,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; -import static com.google.common.base.Throwables.getStackTraceAsString; import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static com.google.common.util.concurrent.Futures.transform; @@ -1100,23 +1099,8 @@ private void reportUploadError(Throwable error) { return; } - String errorMessage; - if (error instanceof IOException) { - errorMessage = grpcAwareErrorMessage((IOException) error); - } else { - errorMessage = error.getMessage(); - } - - if (isNullOrEmpty(errorMessage)) { - errorMessage = error.getClass().getSimpleName(); - } - - if (verboseFailures) { - // On --verbose_failures print the whole stack trace - errorMessage += "\n" + getStackTraceAsString(error); - } - - errorMessage = "Writing to Remote Cache: " + errorMessage; + String errorMessage = + "Writing to Remote Cache: " + grpcAwareErrorMessage(error, verboseFailures); reporter.handle(Event.warn(errorMessage)); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index b98fa872ebeb4f..d2508bda3b1d4a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -532,9 +532,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, uploader.retain()); uploader.release(); - buildEventArtifactUploaderFactoryDelegate.init( - new ByteStreamBuildEventArtifactUploaderFactory( - uploader, cacheClient, remoteBytestreamUriPrefix, buildRequestId, invocationId)); if (enableRemoteExecution) { if (enableDiskCache) { @@ -619,6 +616,16 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { executorService, env, remoteCache, retryScheduler, digestUtil); } + buildEventArtifactUploaderFactoryDelegate.init( + new ByteStreamBuildEventArtifactUploaderFactory( + executorService, + env.getReporter(), + verboseFailures, + actionContextProvider.getRemoteCache(), + remoteBytestreamUriPrefix, + buildRequestId, + invocationId)); + if (enableRemoteDownloader) { remoteDownloaderSupplier.set( new GrpcRemoteDownloader( diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java index 11c42043744c82..7eb07d4d95e05d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java @@ -27,6 +27,7 @@ import io.reactivex.rxjava3.core.CompletableOnSubscribe; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.core.SingleEmitter; +import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.core.SingleOnSubscribe; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; @@ -192,7 +193,7 @@ public void onFailure(Throwable throwable) { * the {@link Completable} will automatically be cancelled. */ public static ListenableFuture toListenableFuture(Completable completable) { - CompletableFuture future = new CompletableFuture(); + CompletableFuture future = new CompletableFuture<>(); completable.subscribe( new CompletableObserver() { @Override @@ -214,7 +215,35 @@ public void onError(Throwable e) { return future; } - private static final class CompletableFuture extends AbstractFuture { + /** + * Returns a {@link ListenableFuture} that is complete once the {@link Single} has succeeded. + * + *

Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to + * the {@link Single} will automatically be cancelled. + */ + public static ListenableFuture toListenableFuture(Single single) { + CompletableFuture future = new CompletableFuture<>(); + single.subscribe( + new SingleObserver() { + @Override + public void onSubscribe(Disposable d) { + future.setCancelCallback(d); + } + + @Override + public void onSuccess(@NonNull T t) { + future.set(t); + } + + @Override + public void onError(Throwable e) { + future.setException(e); + } + }); + return future; + } + + private static final class CompletableFuture extends AbstractFuture { private final AtomicReference cancelCallback = new AtomicReference<>(); private void setCancelCallback(Disposable cancelCallback) { @@ -239,7 +268,7 @@ protected void afterDone() { // Allow set to be called by other members. @Override - protected boolean set(@Nullable Void t) { + protected boolean set(@Nullable T t) { return super.set(t); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java index dafe13de966a44..78e5cb534cab17 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java @@ -13,6 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.remote.util; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Throwables.getStackTraceAsString; import static java.util.stream.Collectors.joining; import build.bazel.remote.execution.v2.Action; @@ -380,6 +382,26 @@ public static String grpcAwareErrorMessage(IOException e) { return e.getMessage(); } + public static String grpcAwareErrorMessage(Throwable error, boolean verboseFailures) { + String errorMessage; + if (error instanceof IOException) { + errorMessage = grpcAwareErrorMessage((IOException) error); + } else { + errorMessage = error.getMessage(); + } + + if (isNullOrEmpty(errorMessage)) { + errorMessage = error.getClass().getSimpleName(); + } + + if (verboseFailures) { + // On --verbose_failures print the whole stack trace + errorMessage += "\n" + getStackTraceAsString(error); + } + + return errorMessage; + } + @SuppressWarnings("ProtoParseWithRegistry") public static ListenableFuture downloadAsActionResult( ActionKey actionDigest, diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index 01b785281010aa..c4212d0aef6716 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -14,20 +14,21 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import build.bazel.remote.execution.v2.Digest; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.eventbus.EventBus; import com.google.common.hash.HashCode; import com.google.common.io.BaseEncoding; import com.google.common.util.concurrent.Futures; @@ -46,37 +47,40 @@ import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType; import com.google.devtools.build.lib.buildeventstream.PathConverter; import com.google.devtools.build.lib.clock.JavaClock; +import com.google.devtools.build.lib.events.Reporter; +import com.google.devtools.build.lib.events.StoredEventHandler; import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff; import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService; import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; +import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.RxNoGlobalErrorsRule; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import com.google.devtools.common.options.Options; import io.grpc.Server; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; import io.reactivex.rxjava3.core.Single; -import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Random; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -86,9 +90,13 @@ /** Test for {@link ByteStreamBuildEventArtifactUploader}. */ @RunWith(JUnit4.class) public class ByteStreamBuildEventArtifactUploaderTest { + @Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule(); private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256); + private final Reporter reporter = new Reporter(new EventBus()); + private final StoredEventHandler eventHandler = new StoredEventHandler(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private ListeningScheduledExecutorService retryService; @@ -102,7 +110,7 @@ public class ByteStreamBuildEventArtifactUploaderTest { @Before public final void setUp() throws Exception { - MockitoAnnotations.initMocks(this); + reporter.addHandler(eventHandler); String serverName = "Server for " + this.getClass(); server = @@ -194,7 +202,12 @@ public void testUploadDirectoryDoesNotCrash() throws Exception { dir.createDirectoryAndParents(); Map filesToUpload = new HashMap<>(); filesToUpload.put(dir, new LocalFile(dir, LocalFileType.OUTPUT)); - ByteStreamUploader uploader = mock(ByteStreamUploader.class); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory); + ByteStreamUploader uploader = + new ByteStreamUploader( + "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader); PathConverter pathConverter = artifactUploader.upload(filesToUpload).get(); @@ -203,9 +216,9 @@ public void testUploadDirectoryDoesNotCrash() throws Exception { } @Test - public void someUploadsFail() throws Exception { - // Test that if one of multiple file uploads fails, the upload future fails and that the - // error is propagated correctly. + public void someUploadsFail_succeedsWithWarningMessages() throws Exception { + // Test that if one of multiple file uploads fails, the upload future succeeds but the + // error is reported correctly. int numUploads = 10; Map blobsByHash = new HashMap<>(); @@ -260,14 +273,11 @@ public void onCompleted() { "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader); - ExecutionException e = - assertThrows(ExecutionException.class, () -> artifactUploader.upload(filesToUpload).get()); - // The gRPC library uses StatusRuntimeException to raise errors. However, throughout the Bazel - // codebase runtime exceptions are considered bugs. This test ensures that a SRE is converted - // to a checked exception type. - assertThat(e.getCause()).isInstanceOf(IOException.class); - assertThat(e.getCause().getCause()).isInstanceOf(StatusRuntimeException.class); - assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode()); + artifactUploader.upload(filesToUpload).get(); + + assertThat(eventHandler.getEvents()).isNotEmpty(); + assertThat(eventHandler.getEvents().get(0).getMessage()) + .contains("Uploading BEP referenced local files: "); artifactUploader.release(); @@ -282,7 +292,13 @@ public void remoteFileShouldNotBeUploaded_actionFs() throws Exception { // arrange - ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory); + ByteStreamUploader uploader = + spy( + new ByteStreamUploader( + "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier)); RemoteActionInputFetcher actionInputFetcher = Mockito.mock(RemoteActionInputFetcher.class); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader); @@ -316,7 +332,7 @@ public void remoteFileShouldNotBeUploaded_actionFs() throws Exception { + digest.getHash() + "/" + digest.getSizeBytes()); - verifyNoMoreInteractions(uploader); + verify(uploader, times(0)).uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean()); } @Test @@ -334,9 +350,16 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception StaticMissingDigestsFinder digestQuerier = Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest))); - ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class); - when(uploader.uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean())) - .thenReturn(Futures.immediateFuture(null)); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory); + ByteStreamUploader uploader = + spy( + new ByteStreamUploader( + "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier)); + doReturn(Futures.immediateFuture(null)) + .when(uploader) + .uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean()); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader, digestQuerier); @@ -369,19 +392,38 @@ private Artifact createRemoteArtifact( return a; } + private ByteStreamBuildEventArtifactUploader newArtifactUploader(ByteStreamUploader uploader) { + return newArtifactUploader(uploader, new AllMissingDigestsFinder()); + } + private ByteStreamBuildEventArtifactUploader newArtifactUploader( ByteStreamUploader uploader, MissingDigestsFinder missingDigestsFinder) { - return new ByteStreamBuildEventArtifactUploader( - uploader, - missingDigestsFinder, - "localhost/instance", - "none", - "none", - /* maxUploadThreads= */ 100); - } + RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); + GrpcCacheClient cacheClient = + spy( + new GrpcCacheClient( + uploader.getChannel().retain(), + CallCredentialsProvider.NO_CREDENTIALS, + remoteOptions, + uploader.getRetrier(), + DIGEST_UTIL, + uploader)); + doAnswer( + invocationOnMock -> + missingDigestsFinder.findMissingDigests( + invocationOnMock.getArgument(0), invocationOnMock.getArgument(1))) + .when(cacheClient) + .findMissingDigests(any(), any()); + RemoteCache remoteCache = new RemoteCache(reporter, cacheClient, remoteOptions, DIGEST_UTIL); - private ByteStreamBuildEventArtifactUploader newArtifactUploader(ByteStreamUploader uploader) { - return newArtifactUploader(uploader, AllMissingDigestsFinder.INSTANCE); + return new ByteStreamBuildEventArtifactUploader( + MoreExecutors.directExecutor(), + reporter, + /*verboseFailures=*/ true, + remoteCache, + /*remoteServerInstanceName=*/ "localhost/instance", + /*buildRequestId=*/ "none", + /*commandId=*/ "none"); } private static class StaticMissingDigestsFinder implements MissingDigestsFinder {