Skip to content

Commit

Permalink
bes: Allow additional headers to be passed to the server.
Browse files Browse the repository at this point in the history
There is already a --remote_header option that passes extra headers to the remote cache and executor. This revision allows extra headers to be passed along to a BES server with the --bes_header option.

RELNOTES[NEW]: Add `--bes_header` flag to pass extra headers to the BES server.
  • Loading branch information
benjaminp committed Aug 12, 2021
1 parent c07d728 commit 5608cd7
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceGrpcClient;
import io.grpc.ManagedChannel;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;

/**
* Bazel's BES module.
Expand All @@ -37,6 +40,10 @@ public class BazelBuildEventServiceModule
abstract static class BackendConfig {
abstract String besBackend();

abstract @Nullable String besProxy();

abstract List<Map.Entry<String, String>> besHeaders();

abstract AuthAndTLSOptions authAndTLSOptions();
}

Expand All @@ -53,24 +60,27 @@ protected BuildEventServiceClient getBesClient(
BuildEventServiceOptions besOptions, AuthAndTLSOptions authAndTLSOptions) throws IOException {
BackendConfig newConfig =
new AutoValue_BazelBuildEventServiceModule_BackendConfig(
besOptions.besBackend, authAndTLSOptions);
besOptions.besBackend, besOptions.besProxy, besOptions.besHeaders, authAndTLSOptions);
if (client == null || !Objects.equals(config, newConfig)) {
clearBesClient();
config = newConfig;
client =
new BuildEventServiceGrpcClient(
newGrpcChannel(besOptions, authAndTLSOptions),
GoogleAuthUtils.newCallCredentials(authAndTLSOptions));
newGrpcChannel(config),
GoogleAuthUtils.newCallCredentials(config.authAndTLSOptions()),
config.besHeaders());
}
return client;
}

// newGrpcChannel is only defined so it can be overridden in tests to not use a real network link.
@VisibleForTesting
protected ManagedChannel newGrpcChannel(
BuildEventServiceOptions besOptions, AuthAndTLSOptions authAndTLSOptions) throws IOException {
protected ManagedChannel newGrpcChannel(BackendConfig config) throws IOException {
return GoogleAuthUtils.newChannel(
besOptions.besBackend, besOptions.besProxy, authAndTLSOptions, /* interceptors= */ null);
config.besBackend(),
config.besProxy(),
config.authAndTLSOptions(),
/* interceptors= */ null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.devtools.common.options.OptionsBase;
import java.time.Duration;
import java.util.List;
import java.util.Map;

/** Options used by {@link BuildEventServiceModule}. */
public class BuildEventServiceOptions extends OptionsBase {
Expand Down Expand Up @@ -51,6 +52,19 @@ public class BuildEventServiceOptions extends OptionsBase {
)
public Duration besTimeout;

@Option(
name = "bes_header",
converter = Converters.AssignmentConverter.class,
defaultValue = "null",
documentationCategory = OptionDocumentationCategory.LOGGING,
effectTags = {OptionEffectTag.AFFECTS_OUTPUTS},
help =
"Specify a header in NAME=VALUE form that will be included in BES requests. "
+ "Multiple headers can be passed by specifying the flag multiple times. Multiple "
+ "values for the same name will be converted to a comma-separated list.",
allowMultiple = true)
public List<Map.Entry<String, String>> besHeaders;

@Option(
name = "bes_best_effort",
defaultValue = "false",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.CallCredentials;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/** Implementation of BuildEventServiceClient that uploads data using gRPC. */
Expand All @@ -48,11 +53,24 @@ public class BuildEventServiceGrpcClient implements BuildEventServiceClient {
private final PublishBuildEventBlockingStub besBlocking;

public BuildEventServiceGrpcClient(
ManagedChannel channel, @Nullable CallCredentials callCredentials) {
this(
withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials),
withCallCredentials(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials),
channel);
ManagedChannel channel,
@Nullable CallCredentials callCredentials,
List<Map.Entry<String, String>> headers) {
ClientInterceptor interceptor = null;
if (!headers.isEmpty()) {
Metadata extraHeaders = new Metadata();
headers.forEach(
header ->
extraHeaders.put(
Metadata.Key.of(header.getKey(), Metadata.ASCII_STRING_MARSHALLER),
header.getValue()));
interceptor = MetadataUtils.newAttachHeadersInterceptor(extraHeaders);
}
this.besAsync =
configureStub(PublishBuildEventGrpc.newStub(channel), callCredentials, interceptor);
this.besBlocking =
configureStub(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials, interceptor);
this.channel = channel;
}

@VisibleForTesting
Expand All @@ -65,9 +83,10 @@ protected BuildEventServiceGrpcClient(
this.channel = channel;
}

private static <T extends AbstractStub<T>> T withCallCredentials(
T stub, @Nullable CallCredentials callCredentials) {
private static <T extends AbstractStub<T>> T configureStub(
T stub, @Nullable CallCredentials callCredentials, @Nullable ClientInterceptor interceptor) {
stub = callCredentials != null ? stub.withCallCredentials(callCredentials) : stub;
stub = interceptor != null ? stub.withInterceptors(interceptor) : stub;
return stub;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ java_test(
"//src/test/java/com/google/devtools/build/lib/testutil",
"//third_party:guava",
"//third_party:junit4",
"//third_party:mockito",
"//third_party:truth",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,45 @@
package com.google.devtools.build.lib.buildeventservice;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceGrpcClient;
import com.google.devtools.build.v1.PublishBuildEventGrpc;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/** Tests {@link BuildEventServiceGrpcClient}. */
@RunWith(JUnit4.class)
public class BuildEventServiceGrpcClientTest {

private BuildEventServiceGrpcClient grpcClient;

@Mock private PublishBuildEventGrpc.PublishBuildEventImplBase fakeServer;
private static final PublishBuildEventGrpc.PublishBuildEventImplBase NOOP_SERVER =
new PublishBuildEventGrpc.PublishBuildEventImplBase() {
@Override
public StreamObserver<PublishBuildToolEventStreamRequest> publishBuildToolEventStream(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver) {
responseObserver.onCompleted();
return NULL_OBSERVER;
}
};

private static final StreamObserver<PublishBuildToolEventStreamRequest> NULL_OBSERVER =
new StreamObserver<PublishBuildToolEventStreamRequest>() {
Expand All @@ -59,54 +67,98 @@ public void onError(Throwable t) {}
public void onCompleted() {}
};

@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
private static final class TestServer implements AutoCloseable {
private final Server server;
private final ManagedChannel channel;

TestServer(Server server, ManagedChannel channel) {
this.server = server;
this.channel = channel;
}

ManagedChannel getChannel() {
return channel;
}

@Override
public void close() {
channel.shutdown();
server.shutdown();
}
}

/** Test helper that sets up a in-process test server. */
private static TestServer startTestServer(ServerServiceDefinition service) throws Exception {
String uniqueName = UUID.randomUUID().toString();
InProcessServerBuilder.forName(uniqueName)
.directExecutor()
.addService(fakeServer)
.build()
.start();

ManagedChannel channel = InProcessChannelBuilder.forName(uniqueName).directExecutor().build();
grpcClient = new BuildEventServiceGrpcClient(channel, null);
Server server =
InProcessServerBuilder.forName(uniqueName).directExecutor().addService(service).build();
server.start();
return new TestServer(
server, InProcessChannelBuilder.forName(uniqueName).directExecutor().build());
}

@After
public void tearDown() {
grpcClient.shutdown();
Mockito.validateMockitoUsage();
@Test
public void besHeaders() throws Exception {
ArrayList<Metadata> seenHeaders = new ArrayList<>();
try (TestServer server =
startTestServer(
ServerInterceptors.intercept(
NOOP_SERVER,
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
synchronized (seenHeaders) {
seenHeaders.add(headers);
}
return next.startCall(call, headers);
}
}))) {
BuildEventServiceGrpcClient grpcClient =
new BuildEventServiceGrpcClient(
server.getChannel(),
null,
ImmutableList.of(Maps.immutableEntry("metadata-foo", "bar")));
assertThat(grpcClient.openStream(ack -> {}).getStatus().get()).isEqualTo(Status.OK);
assertThat(seenHeaders).hasSize(1);
Metadata headers = seenHeaders.get(0);
assertThat(headers.get(Metadata.Key.of("metadata-foo", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("bar");
}
}

@Test
@SuppressWarnings("unchecked")
public void testImmediateSuccess() throws Exception {
when(fakeServer.publishBuildToolEventStream(any()))
.thenAnswer(
invocation -> {
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver =
(StreamObserver<PublishBuildToolEventStreamResponse>)
invocation.getArguments()[0];
responseObserver.onCompleted();
return NULL_OBSERVER;
});
assertThat(grpcClient.openStream(ack -> {}).getStatus().get()).isEqualTo(Status.OK);
public void immediateSuccess() throws Exception {
try (TestServer server = startTestServer(NOOP_SERVER.bindService())) {
assertThat(
new BuildEventServiceGrpcClient(server.getChannel(), null, ImmutableList.of())
.openStream(ack -> {})
.getStatus()
.get())
.isEqualTo(Status.OK);
}
}

@Test
@SuppressWarnings("unchecked")
public void testImmediateFailure() throws Exception {
Throwable failure = new StatusException(Status.INTERNAL);
when(fakeServer.publishBuildToolEventStream(any()))
.thenAnswer(
invocation -> {
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver =
(StreamObserver<PublishBuildToolEventStreamResponse>)
invocation.getArguments()[0];
responseObserver.onError(failure);
return NULL_OBSERVER;
});
assertThat(grpcClient.openStream(ack -> {}).getStatus().get()).isEqualTo(Status.INTERNAL);
public void immediateFailure() throws Exception {
try (TestServer server =
startTestServer(
new PublishBuildEventGrpc.PublishBuildEventImplBase() {
@Override
public StreamObserver<PublishBuildToolEventStreamRequest> publishBuildToolEventStream(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver) {
responseObserver.onError(new StatusException(Status.INTERNAL));
return NULL_OBSERVER;
}
}.bindService())) {
assertThat(
new BuildEventServiceGrpcClient(server.getChannel(), null, ImmutableList.of())
.openStream(ack -> {})
.getStatus()
.get())
.isEqualTo(Status.INTERNAL);
}
}
}

0 comments on commit 5608cd7

Please sign in to comment.