Skip to content

Commit

Permalink
Remote: Update BEP uploader to use RemoteCache and more robust to upl…
Browse files Browse the repository at this point in the history
…oad errors

Local files referenced by build events are uploaded to remote cache with `ByteStreamBuildEventArtifactUploader` which uses `ByteStreamUploader` and `MissingDigestsFinder` internally.

This PR changes `ByteStreamBuildEventArtifactUploader` to use `RemoteCache` directly in order to benefit from recently improvements to `RemoteCache`.

Upload error that used to crash Bazel or cause a non-zero exit code is now caught and reported.

Fixes #13920.

Related #11473.

Closes #13959.

PiperOrigin-RevId: 396526152
  • Loading branch information
coeuvre authored and copybara-github committed Sep 14, 2021
1 parent 4b3ed3b commit e855a26
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,68 +13,74 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
import static com.google.devtools.build.lib.remote.util.Utils.grpcAwareErrorMessage;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.Path;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/** A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. */
/** A {@link BuildEventArtifactUploader} backed by {@link RemoteCache}. */
class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
implements BuildEventArtifactUploader {

private final ListeningExecutorService uploadExecutor;
private final Executor executor;
private final ExtendedEventHandler reporter;
private final boolean verboseFailures;
private final RemoteCache remoteCache;
private final String buildRequestId;
private final String commandId;
private final ByteStreamUploader uploader;
private final String remoteServerInstanceName;
private final MissingDigestsFinder missingDigestsFinder;

private final AtomicBoolean shutdown = new AtomicBoolean();
private final Scheduler scheduler;

ByteStreamBuildEventArtifactUploader(
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
Executor executor,
ExtendedEventHandler reporter,
boolean verboseFailures,
RemoteCache remoteCache,
String remoteServerInstanceName,
String buildRequestId,
String commandId,
int maxUploadThreads) {
this.uploader = Preconditions.checkNotNull(uploader);
String commandId) {
this.executor = executor;
this.reporter = reporter;
this.verboseFailures = verboseFailures;
this.remoteCache = remoteCache;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteServerInstanceName = remoteServerInstanceName;
// Limit the maximum threads number to 1000 (chosen arbitrarily)
this.uploadExecutor =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.min(maxUploadThreads, 1000),
new ThreadFactoryBuilder().setNameFormat("bes-artifact-uploader-%d").build()));
this.missingDigestsFinder = missingDigestsFinder;
this.scheduler = Schedulers.from(executor);
}

/** Returns {@code true} if Bazel knows that the file is stored on a remote system. */
Expand Down Expand Up @@ -143,100 +149,112 @@ private static void processQueryResult(
}
}

/**
* For files where {@link PathMetadata#isRemote()} returns {@code false} this method checks if the
* remote cache already contains the file. If so {@link PathMetadata#isRemote()} is set to {@code
* true}.
*/
private ListenableFuture<Iterable<PathMetadata>> queryRemoteCache(
ImmutableList<ListenableFuture<PathMetadata>> allPaths)
throws ExecutionException, InterruptedException {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
private static boolean shouldUpload(PathMetadata path) {
return path.getDigest() != null && !path.isRemote() && !path.isDirectory();
}

List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
private Single<List<PathMetadata>> queryRemoteCache(
RemoteCache remoteCache, RemoteActionExecutionContext context, List<PathMetadata> paths) {
List<PathMetadata> knownRemotePaths = new ArrayList<>(paths.size());
List<PathMetadata> filesToQuery = new ArrayList<>();
Set<Digest> digestsToQuery = new HashSet<>();
for (ListenableFuture<PathMetadata> pathMetadataFuture : allPaths) {
// This line is guaranteed to not block, as this code is only called after all futures in
// allPaths have completed.
PathMetadata pathMetadata = pathMetadataFuture.get();
if (pathMetadata.isRemote() || pathMetadata.isDirectory()) {
knownRemotePaths.add(pathMetadata);
for (PathMetadata path : paths) {
if (shouldUpload(path)) {
filesToQuery.add(path);
digestsToQuery.add(path.getDigest());
} else {
filesToQuery.add(pathMetadata);
digestsToQuery.add(pathMetadata.getDigest());
knownRemotePaths.add(path);
}
}

if (digestsToQuery.isEmpty()) {
return Futures.immediateFuture(knownRemotePaths);
return Single.just(knownRemotePaths);
}
return Futures.transform(
missingDigestsFinder.findMissingDigests(context, digestsToQuery),
(missingDigests) -> {
processQueryResult(missingDigests, filesToQuery, knownRemotePaths);
return knownRemotePaths;
},
MoreExecutors.directExecutor());
return toSingle(() -> remoteCache.findMissingDigests(context, digestsToQuery), executor)
.onErrorResumeNext(
error -> {
reporterUploadError(error);
// Assuming all digests are missing if failed to query
return Single.just(ImmutableSet.copyOf(digestsToQuery));
})
.map(
missingDigests -> {
processQueryResult(missingDigests, filesToQuery, knownRemotePaths);
return knownRemotePaths;
});
}

/**
* Uploads any files from {@code allPaths} where {@link PathMetadata#isRemote()} returns {@code
* false}.
*/
private ListenableFuture<List<PathMetadata>> uploadLocalFiles(Iterable<PathMetadata> allPaths) {
private void reporterUploadError(Throwable error) {
if (error instanceof CancellationException) {
return;
}

String errorMessage =
"Uploading BEP referenced local files: " + grpcAwareErrorMessage(error, verboseFailures);

reporter.handle(Event.warn(errorMessage));
}

private Single<List<PathMetadata>> uploadLocalFiles(
RemoteCache remoteCache, RemoteActionExecutionContext context, List<PathMetadata> paths) {
return Flowable.fromIterable(paths)
.flatMapSingle(
path -> {
if (!shouldUpload(path)) {
return Single.just(path);
}

return toCompletable(
() -> remoteCache.uploadFile(context, path.getDigest(), path.getPath()),
executor)
.toSingleDefault(path)
.onErrorResumeNext(
error -> {
reporterUploadError(error);
return Single.just(
new PathMetadata(
path.getPath(),
/*digest=*/ null,
path.isDirectory(),
path.isRemote()));
});
})
.collect(Collectors.toList());
}

private Single<PathConverter> upload(Set<Path> files) {
if (files.isEmpty()) {
return Single.just(PathConverter.NO_CONVERSION);
}

RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);

ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
ImmutableList.builder();
for (PathMetadata path : allPaths) {
if (!path.isRemote() && !path.isDirectory()) {
Chunker chunker =
Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
final ListenableFuture<Void> upload;
upload =
uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
} else {
allPathsUploaded.add(Futures.immediateFuture(path));
}
}
return Futures.allAsList(allPathsUploaded.build());
return Single.using(
remoteCache::retain,
remoteCache ->
Flowable.fromIterable(files)
.map(
file -> {
try {
return readPathMetadata(file);
} catch (IOException e) {
reporterUploadError(e);
return new PathMetadata(
file, /*digest=*/ null, /*directory=*/ false, /*remote=*/ false);
}
})
.collect(Collectors.toList())
.flatMap(paths -> queryRemoteCache(remoteCache, context, paths))
.flatMap(paths -> uploadLocalFiles(remoteCache, context, paths))
.map(paths -> new PathConverterImpl(remoteServerInstanceName, paths)),
RemoteCache::release);
}

@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
if (files.isEmpty()) {
return Futures.immediateFuture(PathConverter.NO_CONVERSION);
}
// Collect metadata about each path
ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathMetadata = ImmutableList.builder();
for (Path file : files.keySet()) {
ListenableFuture<PathMetadata> pathMetadata =
uploadExecutor.submit(() -> readPathMetadata(file));
allPathMetadata.add(pathMetadata);
}

// Query the remote cache to check which files need to be uploaded
ImmutableList<ListenableFuture<PathMetadata>> allPaths = allPathMetadata.build();
ListenableFuture<Iterable<PathMetadata>> allPathsUpdatedMetadata =
Futures.whenAllSucceed(allPaths)
.callAsync(() -> queryRemoteCache(allPaths), MoreExecutors.directExecutor());

// Upload local files (if any)
ListenableFuture<List<PathMetadata>> allPathsMetadata =
Futures.transformAsync(
allPathsUpdatedMetadata,
(paths) -> uploadLocalFiles(paths),
MoreExecutors.directExecutor());

return Futures.transform(
allPathsMetadata,
(metadata) -> new PathConverterImpl(remoteServerInstanceName, metadata),
MoreExecutors.directExecutor());
return toListenableFuture(upload(files.keySet()).subscribeOn(scheduler));
}

@Override
Expand All @@ -249,8 +267,7 @@ protected void deallocate() {
if (shutdown.getAndSet(true)) {
return;
}
uploader.release();
uploadExecutor.shutdown();
remoteCache.release();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,34 @@
package com.google.devtools.build.lib.remote;

import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import java.util.concurrent.Executor;

/**
* A factory for {@link ByteStreamBuildEventArtifactUploader}.
*/
class ByteStreamBuildEventArtifactUploaderFactory implements
BuildEventArtifactUploaderFactory {
/** A factory for {@link ByteStreamBuildEventArtifactUploader}. */
class ByteStreamBuildEventArtifactUploaderFactory implements BuildEventArtifactUploaderFactory {

private final ByteStreamUploader uploader;
private final Executor executor;
private final ExtendedEventHandler reporter;
private final boolean verboseFailures;
private final RemoteCache remoteCache;
private final String remoteServerInstanceName;
private final String buildRequestId;
private final String commandId;
private final MissingDigestsFinder missingDigestsFinder;

ByteStreamBuildEventArtifactUploaderFactory(
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
Executor executor,
ExtendedEventHandler reporter,
boolean verboseFailures,
RemoteCache remoteCache,
String remoteServerInstanceName,
String buildRequestId,
String commandId) {
this.uploader = uploader;
this.missingDigestsFinder = missingDigestsFinder;
this.executor = executor;
this.reporter = reporter;
this.verboseFailures = verboseFailures;
this.remoteCache = remoteCache;
this.remoteServerInstanceName = remoteServerInstanceName;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
Expand All @@ -47,11 +50,12 @@ class ByteStreamBuildEventArtifactUploaderFactory implements
@Override
public BuildEventArtifactUploader create(CommandEnvironment env) {
return new ByteStreamBuildEventArtifactUploader(
uploader.retain(),
missingDigestsFinder,
executor,
reporter,
verboseFailures,
remoteCache.retain(),
remoteServerInstanceName,
buildRequestId,
commandId,
env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads);
commandId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ public ByteStreamUploader(
this.retrier = retrier;
}

@VisibleForTesting
ReferenceCountedChannel getChannel() {
return channel;
}

@VisibleForTesting
RemoteRetrier getRetrier() {
return retrier;
}

/**
* Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
* The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
Expand Down
Loading

0 comments on commit e855a26

Please sign in to comment.