Passing Bazel metadata in gRPC headers.
TESTED=unit tests
RELNOTES: none
PiperOrigin-RevId: 169395919
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 4c05cd5..088c731 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -44,7 +44,6 @@
import com.google.devtools.common.options.OptionsProvider;
import java.io.IOException;
import java.util.Set;
-import java.util.UUID;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -86,7 +85,7 @@
commandEnvironment.getRuntime().getClock(),
commandEnvironment.getRuntime().getPathToUriConverter(),
commandEnvironment.getReporter(),
- commandEnvironment.getClientEnv().get("BAZEL_INTERNAL_BUILD_REQUEST_ID"),
+ commandEnvironment.getBuildRequestId().toString(),
commandEnvironment.getCommandId().toString(),
commandEnvironment.getCommandName());
if (streamer != null) {
@@ -126,8 +125,6 @@
/**
* Returns {@code null} if no stream could be created.
- *
- * @param buildRequestId if {@code null} or {@code ""} a random UUID is used instead.
*/
@Nullable
@VisibleForTesting
@@ -213,9 +210,6 @@
logger.fine(format("Will create BuildEventServiceTransport streaming to '%s'",
besOptions.besBackend));
- buildRequestId = isNullOrEmpty(buildRequestId)
- ? UUID.randomUUID().toString()
- : buildRequestId;
commandLineReporter.handle(
Event.info(
format(
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 3d8eeb3..62ff0c9 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -27,7 +27,6 @@
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/exec/apple",
"//src/main/java/com/google/devtools/build/lib/exec/local",
- "//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/common/options",
"//third_party:apache_httpclient",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 7f60e140..13670c4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -37,6 +37,7 @@
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
+import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
@@ -257,9 +258,11 @@
try {
ListenableScheduledFuture<?> schedulingResult =
retryService.schedule(
- () ->
- startAsyncUploadWithRetry(
- chunker, backoffTimes, overallUploadResult),
+ Context.current()
+ .wrap(
+ () ->
+ startAsyncUploadWithRetry(
+ chunker, backoffTimes, overallUploadResult)),
nextDelayMillis,
MILLISECONDS);
// In case the scheduled execution errors, we need to notify the overallUploadResult.
@@ -418,7 +421,7 @@
return resourceName;
}
};
- call.start(callListener, new Metadata());
+ call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
call.request(1);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index b49a103..21ecf631 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -89,18 +89,21 @@
private ContentAddressableStorageBlockingStub casBlockingStub() {
return ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
private ByteStreamBlockingStub bsBlockingStub() {
return ByteStreamGrpc.newBlockingStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
private ActionCacheBlockingStub acBlockingStub() {
return ActionCacheGrpc.newBlockingStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(credentials)
.withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index 3dabf0a..a1a5427 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -58,12 +58,14 @@
private ExecutionBlockingStub execBlockingStub() {
return ExecutionGrpc.newBlockingStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(callCredentials)
.withDeadlineAfter(callTimeoutSecs, TimeUnit.SECONDS);
}
private WatcherBlockingStub watcherBlockingStub() {
return WatcherGrpc.newBlockingStub(channel)
+ .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withCallCredentials(callCredentials);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
index 8bb5a27..b1173fb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionContextProvider.java
@@ -49,20 +49,32 @@
ExecutionOptions executionOptions =
checkNotNull(env.getOptions().getOptions(ExecutionOptions.class));
RemoteOptions remoteOptions = checkNotNull(env.getOptions().getOptions(RemoteOptions.class));
+ String buildRequestId = env.getBuildRequestId().toString();
+ String commandId = env.getCommandId().toString();
if (remoteOptions.experimentalRemoteSpawnCache) {
- RemoteSpawnCache spawnCache = new RemoteSpawnCache(env.getExecRoot(), remoteOptions, cache,
- executionOptions.verboseFailures, env.getReporter());
+ RemoteSpawnCache spawnCache =
+ new RemoteSpawnCache(
+ env.getExecRoot(),
+ remoteOptions,
+ cache,
+ buildRequestId,
+ commandId,
+ executionOptions.verboseFailures,
+ env.getReporter());
return ImmutableList.of(spawnCache);
} else {
- RemoteSpawnRunner spawnRunner = new RemoteSpawnRunner(
- env.getExecRoot(),
- remoteOptions,
- createFallbackRunner(env),
- executionOptions.verboseFailures,
- env.getReporter(),
- cache,
- executor);
+ RemoteSpawnRunner spawnRunner =
+ new RemoteSpawnRunner(
+ env.getExecRoot(),
+ remoteOptions,
+ createFallbackRunner(env),
+ executionOptions.verboseFailures,
+ env.getReporter(),
+ buildRequestId,
+ commandId,
+ cache,
+ executor);
return ImmutableList.of(new RemoteSpawnStrategy(spawnRunner));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 6b8eb87..f9949c3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -35,9 +35,12 @@
import io.grpc.CallCredentials;
import io.grpc.Channel;
import java.io.IOException;
+import java.util.logging.Logger;
/** RemoteModule provides distributed cache and remote execution for Bazel. */
public final class RemoteModule extends BlazeModule {
+ private static final Logger logger = Logger.getLogger(RemoteModule.class.getName());
+
@VisibleForTesting
static final class CasPathConverter implements PathConverter {
// Not final; unfortunately, the Bazel startup process requires us to create this object before
@@ -89,7 +92,9 @@
@Override
public void beforeCommand(CommandEnvironment env) {
env.getEventBus().register(this);
-
+ String buildRequestId = env.getBuildRequestId().toString();
+ String commandId = env.getCommandId().toString();
+ logger.info("Command: buildRequestId = " + buildRequestId + ", commandId = " + commandId);
RemoteOptions remoteOptions = env.getOptions().getOptions(RemoteOptions.class);
AuthAndTLSOptions authAndTlsOptions = env.getOptions().getOptions(AuthAndTLSOptions.class);
converter.options = remoteOptions;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index 774a580..74a8afc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -32,6 +32,7 @@
import com.google.devtools.remoteexecution.v1test.ActionResult;
import com.google.devtools.remoteexecution.v1test.Command;
import com.google.devtools.remoteexecution.v1test.Platform;
+import io.grpc.Context;
import java.io.IOException;
import java.util.Collection;
import java.util.NoSuchElementException;
@@ -54,6 +55,8 @@
private final Platform platform;
private final RemoteActionCache remoteCache;
+ private final String buildRequestId;
+ private final String commandId;
private final boolean verboseFailures;
@Nullable private final Reporter cmdlineReporter;
@@ -61,14 +64,22 @@
// Used to ensure that a warning is reported only once.
private final AtomicBoolean warningReported = new AtomicBoolean();
- RemoteSpawnCache(Path execRoot, RemoteOptions options, RemoteActionCache remoteCache,
- boolean verboseFailures, @Nullable Reporter cmdlineReporter) {
+ RemoteSpawnCache(
+ Path execRoot,
+ RemoteOptions options,
+ RemoteActionCache remoteCache,
+ String buildRequestId,
+ String commandId,
+ boolean verboseFailures,
+ @Nullable Reporter cmdlineReporter) {
this.execRoot = execRoot;
this.options = options;
this.platform = options.parseRemotePlatformOverride();
this.remoteCache = remoteCache;
this.verboseFailures = verboseFailures;
this.cmdlineReporter = cmdlineReporter;
+ this.buildRequestId = buildRequestId;
+ this.commandId = commandId;
}
@Override
@@ -91,61 +102,70 @@
// Look up action cache, and reuse the action output if it is found.
final ActionKey actionKey = Digests.computeActionKey(action);
- ActionResult result =
- this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
- if (result != null) {
- // We don't cache failed actions, so we know the outputs exist.
- // For now, download all outputs locally; in the future, we can reuse the digests to
- // just update the TreeNodeRepository and continue the build.
- try {
- remoteCache.download(result, execRoot, policy.getFileOutErr());
- SpawnResult spawnResult = new SpawnResult.Builder()
- .setStatus(Status.SUCCESS)
- .setExitCode(result.getExitCode())
- .build();
- return SpawnCache.success(spawnResult);
- } catch (CacheNotFoundException e) {
- // There's a cache miss. Fall back to local execution.
+ Context withMetadata =
+ TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey);
+ // Metadata will be available in context.current() until we detach.
+ // This is done via a thread-local variable.
+ Context previous = withMetadata.attach();
+ try {
+ ActionResult result =
+ this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
+ if (result != null) {
+ // We don't cache failed actions, so we know the outputs exist.
+ // For now, download all outputs locally; in the future, we can reuse the digests to
+ // just update the TreeNodeRepository and continue the build.
+ try {
+ remoteCache.download(result, execRoot, policy.getFileOutErr());
+ SpawnResult spawnResult =
+ new SpawnResult.Builder()
+ .setStatus(Status.SUCCESS)
+ .setExitCode(result.getExitCode())
+ .build();
+ return SpawnCache.success(spawnResult);
+ } catch (CacheNotFoundException e) {
+ // There's a cache miss. Fall back to local execution.
+ }
}
- }
- if (options.remoteUploadLocalResults) {
- return new CacheHandle() {
- @Override
- public boolean hasResult() {
- return false;
- }
+ if (options.remoteUploadLocalResults) {
+ return new CacheHandle() {
+ @Override
+ public boolean hasResult() {
+ return false;
+ }
- @Override
- public SpawnResult getResult() {
- throw new NoSuchElementException();
- }
+ @Override
+ public SpawnResult getResult() {
+ throw new NoSuchElementException();
+ }
- @Override
- public boolean willStore() {
- return true;
- }
+ @Override
+ public boolean willStore() {
+ return true;
+ }
- @Override
- public void store(SpawnResult result, Collection<Path> files)
- throws InterruptedException, IOException {
- try {
+ @Override
+ public void store(SpawnResult result, Collection<Path> files)
+ throws InterruptedException, IOException {
boolean uploadAction = Status.SUCCESS.equals(result.status()) && result.exitCode() == 0;
- remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction);
- } catch (IOException e) {
- if (verboseFailures) {
- report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
- } else {
- reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache."));
+ try {
+ remoteCache.upload(actionKey, execRoot, files, policy.getFileOutErr(), uploadAction);
+ } catch (IOException e) {
+ if (verboseFailures) {
+ report(Event.debug("Upload to remote cache failed: " + e.getMessage()));
+ } else {
+ reportOnce(Event.warn("Some artifacts failed be uploaded to the remote cache."));
+ }
}
}
- }
- @Override
- public void close() {
- }
- };
- } else {
- return SpawnCache.NO_RESULT_NO_STORE;
+ @Override
+ public void close() {}
+ };
+ } else {
+ return SpawnCache.NO_RESULT_NO_STORE;
+ }
+ } finally {
+ withMetadata.detach(previous);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index b4d28cd..f4babd7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -45,6 +45,7 @@
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
import com.google.devtools.remoteexecution.v1test.Platform;
+import io.grpc.Context;
import io.grpc.Status.Code;
import java.io.IOException;
import java.io.OutputStream;
@@ -74,6 +75,8 @@
@Nullable private final Reporter cmdlineReporter;
@Nullable private final RemoteActionCache remoteCache;
@Nullable private final GrpcRemoteExecutor remoteExecutor;
+ private final String buildRequestId;
+ private final String commandId;
// Used to ensure that a warning is reported only once.
private final AtomicBoolean warningReported = new AtomicBoolean();
@@ -84,6 +87,8 @@
SpawnRunner fallbackRunner,
boolean verboseFailures,
@Nullable Reporter cmdlineReporter,
+ String buildRequestId,
+ String commandId,
@Nullable RemoteActionCache remoteCache,
@Nullable GrpcRemoteExecutor remoteExecutor) {
this.execRoot = execRoot;
@@ -94,6 +99,8 @@
this.remoteExecutor = remoteExecutor;
this.verboseFailures = verboseFailures;
this.cmdlineReporter = cmdlineReporter;
+ this.buildRequestId = buildRequestId;
+ this.commandId = commandId;
}
@Override
@@ -121,64 +128,68 @@
// Look up action cache, and reuse the action output if it is found.
ActionKey actionKey = Digests.computeActionKey(action);
- boolean acceptCachedResult = options.remoteAcceptCached && Spawns.mayBeCached(spawn);
- boolean uploadLocalResults = options.remoteUploadLocalResults;
-
+ Context withMetadata =
+ TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey);
+ Context previous = withMetadata.attach();
try {
- // Try to lookup the action in the action cache.
- ActionResult cachedResult =
- acceptCachedResult
- ? remoteCache.getCachedActionResult(actionKey)
- : null;
- if (cachedResult != null) {
- if (cachedResult.getExitCode() != 0) {
- // The remote cache must never serve a failed action.
- throw new EnvironmentalExecException("The remote cache is in an invalid state as it"
- + " served a failed action. Hash of the action: " + actionKey.getDigest());
+ boolean acceptCachedResult = options.remoteAcceptCached && Spawns.mayBeCached(spawn);
+ boolean uploadLocalResults = options.remoteUploadLocalResults;
+
+ try {
+ // Try to lookup the action in the action cache.
+ ActionResult cachedResult =
+ acceptCachedResult ? remoteCache.getCachedActionResult(actionKey) : null;
+ if (cachedResult != null) {
+ if (cachedResult.getExitCode() != 0) {
+ // The remote cache must never serve a failed action.
+ throw new EnvironmentalExecException(
+ "The remote cache is in an invalid state as it"
+ + " served a failed action. Hash of the action: "
+ + actionKey.getDigest());
+ }
+ try {
+ return downloadRemoteResults(cachedResult, policy.getFileOutErr());
+ } catch (CacheNotFoundException e) {
+ // No cache hit, so we fall through to local or remote execution.
+ // We set acceptCachedResult to false in order to force the action re-execution.
+ acceptCachedResult = false;
+ }
}
- try {
- return downloadRemoteResults(cachedResult, policy.getFileOutErr());
- } catch (CacheNotFoundException e) {
- // No cache hit, so we fall through to local or remote execution.
- // We set acceptCachedResult to false in order to force the action re-execution.
- acceptCachedResult = false;
- }
+ } catch (IOException e) {
+ return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e);
}
- } catch (IOException e) {
- return execLocallyOrFail(
- spawn, policy, inputMap, actionKey, uploadLocalResults, e);
- }
- if (remoteExecutor == null) {
- // Remote execution is disabled and so execute the spawn on the local machine.
- return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey);
- }
+ if (remoteExecutor == null) {
+ // Remote execution is disabled and so execute the spawn on the local machine.
+ return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey);
+ }
- try {
- // Upload the command and all the inputs into the remote cache.
- remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command);
- } catch (IOException e) {
- return execLocallyOrFail(
- spawn, policy, inputMap, actionKey, uploadLocalResults, e);
- }
+ try {
+ // Upload the command and all the inputs into the remote cache.
+ remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command);
+ } catch (IOException e) {
+ return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e);
+ }
- final ActionResult result;
- try {
- result = executeRemotely(action, inputMap.size(), acceptCachedResult);
- } catch (IOException e) {
- return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e);
- }
+ final ActionResult result;
+ try {
+ result = executeRemotely(action, inputMap.size(), acceptCachedResult);
+ } catch (IOException e) {
+ return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e);
+ }
- boolean executionFailed = result.getExitCode() != 0;
- if (options.remoteLocalFallback && executionFailed) {
- return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey);
- }
+ boolean executionFailed = result.getExitCode() != 0;
+ if (options.remoteLocalFallback && executionFailed) {
+ return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey);
+ }
- try {
- return downloadRemoteResults(result, policy.getFileOutErr());
- } catch (IOException e) {
- return execLocallyOrFail(
- spawn, policy, inputMap, actionKey, uploadLocalResults, e);
+ try {
+ return downloadRemoteResults(result, policy.getFileOutErr());
+ } catch (IOException e) {
+ return execLocallyOrFail(spawn, policy, inputMap, actionKey, uploadLocalResults, e);
+ }
+ } finally {
+ withMetadata.detach(previous);
}
}
@@ -191,8 +202,8 @@
.build();
}
- private ActionResult executeRemotely(Action action, int numInputFiles,
- boolean acceptCachedResult) throws IOException, InterruptedException {
+ private ActionResult executeRemotely(Action action, int numInputFiles, boolean acceptCachedResult)
+ throws IOException, InterruptedException {
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
@@ -204,9 +215,13 @@
return reply.getResult();
}
- private SpawnResult execLocallyOrFail(Spawn spawn, SpawnExecutionPolicy policy,
- SortedMap<PathFragment, ActionInput> inputMap, ActionKey actionKey,
- boolean uploadLocalResults, IOException cause)
+ private SpawnResult execLocallyOrFail(
+ Spawn spawn,
+ SpawnExecutionPolicy policy,
+ SortedMap<PathFragment, ActionInput> inputMap,
+ ActionKey actionKey,
+ boolean uploadLocalResults,
+ IOException cause)
throws ExecException, InterruptedException, IOException {
if (options.remoteLocalFallback) {
return execLocally(spawn, policy, inputMap, uploadLocalResults, remoteCache, actionKey);
@@ -313,7 +328,8 @@
SortedMap<PathFragment, ActionInput> inputMap,
boolean uploadToCache,
@Nullable RemoteActionCache remoteCache,
- @Nullable ActionKey actionKey) throws ExecException, IOException, InterruptedException {
+ @Nullable ActionKey actionKey)
+ throws ExecException, IOException, InterruptedException {
if (uploadToCache && Spawns.mayBeCached(spawn) && remoteCache != null && actionKey != null) {
return execLocallyAndUpload(spawn, policy, inputMap, remoteCache, actionKey);
}
@@ -326,7 +342,8 @@
SpawnExecutionPolicy policy,
SortedMap<PathFragment, ActionInput> inputMap,
RemoteActionCache remoteCache,
- ActionKey actionKey) throws ExecException, IOException, InterruptedException {
+ ActionKey actionKey)
+ throws ExecException, IOException, InterruptedException {
Map<Path, Long> ctimesBefore = getInputCtimes(inputMap);
SpawnResult result = fallbackRunner.exec(spawn, policy);
Map<Path, Long> ctimesAfter = getInputCtimes(inputMap);
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java b/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java
new file mode 100644
index 0000000..18da622
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/TracingMetadataUtils.java
@@ -0,0 +1,109 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
+import com.google.devtools.remoteexecution.v1test.RequestMetadata;
+import com.google.devtools.remoteexecution.v1test.ToolDetails;
+import io.grpc.ClientInterceptor;
+import io.grpc.Context;
+import io.grpc.Contexts;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.protobuf.ProtoUtils;
+import io.grpc.stub.MetadataUtils;
+
+/** Utility functions to handle Metadata for remote Grpc calls. */
+public class TracingMetadataUtils {
+
+ private TracingMetadataUtils() {}
+
+ private static final Context.Key<RequestMetadata> CONTEXT_KEY =
+ Context.key("remote-grpc-metadata");
+
+ @VisibleForTesting
+ public static final Metadata.Key<RequestMetadata> METADATA_KEY =
+ ProtoUtils.keyForProto(RequestMetadata.getDefaultInstance());
+
+ /**
+ * Returns a new gRPC context derived from the current context, with
+ * {@link RequestMetadata} accessible by the {@link fromCurrentContext()} method.
+ *
+ * <p>The {@link RequestMetadata} is constructed using the provided arguments
+ * and the current tool version.
+ */
+ public static Context contextWithMetadata(
+ String buildRequestId, String commandId, ActionKey actionKey) {
+ RequestMetadata metadata =
+ RequestMetadata.newBuilder()
+ .setCorrelatedInvocationsId(buildRequestId)
+ .setToolInvocationId(commandId)
+ .setActionId(actionKey.getDigest().getHash())
+ .setToolDetails(
+ ToolDetails.newBuilder()
+ .setToolName("bazel")
+ .setToolVersion(BlazeVersionInfo.instance().getVersion()))
+ .build();
+ return Context.current().withValue(CONTEXT_KEY, metadata);
+ }
+
+ /**
+ * Fetches a {@link RequestMetadata} defined on the current context.
+ *
+ * @throws {@link IllegalStateException} when the metadata is not defined in the current context.
+ */
+ public static RequestMetadata fromCurrentContext() {
+ RequestMetadata metadata = CONTEXT_KEY.get();
+ if (metadata == null) {
+ throw new IllegalStateException("RequestMetadata not set in current context.");
+ }
+ return metadata;
+ }
+
+ /**
+ * Creates a {@link Metadata} containing the {@link RequestMetadata} defined on the current
+ * context.
+ *
+ * @throws {@link IllegalStateException} when the metadata is not defined in the current context.
+ */
+ public static Metadata headersFromCurrentContext() {
+ Metadata headers = new Metadata();
+ headers.put(METADATA_KEY, fromCurrentContext());
+ return headers;
+ }
+
+ public static ClientInterceptor attachMetadataFromContextInterceptor() {
+ return MetadataUtils.newAttachHeadersInterceptor(headersFromCurrentContext());
+ }
+
+ /** GRPC interceptor to add logging metadata to the GRPC context. */
+ public static class ServerHeadersInterceptor implements ServerInterceptor {
+ @Override
+ public <ReqT, RespT> Listener<ReqT> interceptCall(
+ ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
+ RequestMetadata meta = headers.get(METADATA_KEY);
+ if (meta == null) {
+ throw new IllegalStateException("RequestMetadata not received from the client.");
+ }
+ Context ctx = Context.current().withValue(CONTEXT_KEY, meta);
+ return Contexts.interceptCall(ctx, call, headers, next);
+ }
+ }
+
+}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java b/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java
index 86fa7be..655b7d6 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/CommandEnvironment.java
@@ -69,6 +69,7 @@
private final BlazeDirectories directories;
private UUID commandId; // Unique identifier for the command being run
+ private UUID buildRequestId; // Unique identifier for the build being run
private final Reporter reporter;
private final EventBus eventBus;
private final BlazeModule.ModuleEnvironment blazeModuleEnvironment;
@@ -124,6 +125,7 @@
this.workspace = workspace;
this.directories = workspace.getDirectories();
this.commandId = null; // Will be set once we get the client environment
+ this.buildRequestId = null; // Will be set once we get the client environment
this.reporter = new Reporter(eventBus);
this.eventBus = eventBus;
this.commandThread = commandThread;
@@ -244,6 +246,21 @@
return Collections.unmodifiableMap(result);
}
+ private UUID getFromEnvOrGenerate(String varName) {
+ // Try to set the clientId from the client environment.
+ String uuidString = clientEnv.getOrDefault(varName, "");
+ if (!uuidString.isEmpty()) {
+ try {
+ return UUID.fromString(uuidString);
+ } catch (IllegalArgumentException e) {
+ // String was malformed, so we will resort to generating a random UUID
+ }
+ }
+ // We have been provided with the client environment, but it didn't contain
+ // the variable; hence generate our own id.
+ return UUID.randomUUID();
+ }
+
private void updateClientEnv(List<Map.Entry<String, String>> clientEnvList) {
Preconditions.checkState(clientEnv.isEmpty());
@@ -251,21 +268,11 @@
for (Map.Entry<String, String> entry : env) {
clientEnv.put(entry.getKey(), entry.getValue());
}
- // Try to set the clientId from the client environment.
if (commandId == null) {
- String uuidString = clientEnv.get("BAZEL_INTERNAL_INVOCATION_ID");
- if (uuidString != null) {
- try {
- commandId = UUID.fromString(uuidString);
- } catch (IllegalArgumentException e) {
- // String was malformed, so we will resort to generating a random UUID
- }
- }
+ commandId = getFromEnvOrGenerate("BAZEL_INTERNAL_INVOCATION_ID");
}
- if (commandId == null) {
- // We have been provided with the client environment, but it didn't contain
- // the invocation id; hence generate our own.
- commandId = UUID.randomUUID();
+ if (buildRequestId == null) {
+ buildRequestId = getFromEnvOrGenerate("BAZEL_INTERNAL_BUILD_REQUEST_ID");
}
setCommandIdInCrashData();
}
@@ -301,14 +308,14 @@
* the build info.
*/
public UUID getCommandId() {
- if (commandId == null) {
- // The commandId should not be requested before the beforeCommand is executed, as the
- // commandId might be set through the client environment. However, to simplify testing,
- // we set the id value before we throw the exception.
- commandId = UUID.randomUUID();
- throw new IllegalArgumentException("Build Id requested before client environment provided");
- }
- return commandId;
+ return Preconditions.checkNotNull(commandId);
+ }
+
+ /**
+ * Returns the UUID that Blaze uses to identify everything logged from the current build request.
+ */
+ public UUID getBuildRequestId() {
+ return Preconditions.checkNotNull(buildRequestId);
}
public SkyframeExecutor getSkyframeExecutor() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 732afac..e809b74 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -14,22 +14,30 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.fail;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.protobuf.ByteString;
+import io.grpc.BindableService;
import io.grpc.Channel;
+import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.Status.Code;
@@ -76,28 +84,36 @@
private Server server;
private Channel channel;
+ private Context withEmptyMetadata;
@Mock
private Retrier.Backoff mockBackoff;
@Before
- public void init() throws Exception {
+ public final void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
String serverName = "Server for " + this.getClass();
server = InProcessServerBuilder.forName(serverName).fallbackHandlerRegistry(serviceRegistry)
.build().start();
channel = InProcessChannelBuilder.forName(serverName).build();
+ withEmptyMetadata =
+ TracingMetadataUtils.contextWithMetadata(
+ "none", "none", Digests.unsafeActionKeyFromDigest(Digest.getDefaultInstance()));
+ // Needs to be repeated in every test that uses the timeout setting, since the tests run
+ // on different threads than the setUp.
+ withEmptyMetadata.attach();
}
@After
- public void shutdown() {
+ public void tearDown() throws Exception {
server.shutdownNow();
retryService.shutdownNow();
}
@Test(timeout = 10000)
public void singleBlobUploadShouldWork() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -166,6 +182,7 @@
@Test(timeout = 20000)
public void multipleBlobsUploadShouldWork() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0), (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -251,10 +268,100 @@
blockUntilInternalStateConsistent(uploader);
}
+ @Test(timeout = 20000)
+ public void contextShouldBePreservedUponRetries() throws Exception {
+ withEmptyMetadata.attach();
+ // We upload blobs with different context, and retry 3 times for each upload.
+ // We verify that the correct metadata is passed to the server with every blob.
+ Retrier retrier = new Retrier(() -> new FixedBackoff(3, 0), (Status s) -> true);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
+
+ List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
+ List<Chunker> builders = new ArrayList<>(toUpload.size());
+ Map<String, Integer> uploadsFailed = new HashMap<>();
+ for (String s : toUpload) {
+ Chunker chunker = new Chunker(s.getBytes(UTF_8), /* chunkSize=*/ 3);
+ builders.add(chunker);
+ uploadsFailed.put(chunker.digest().getHash(), 0);
+ }
+
+ BindableService bsService =
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+ return new StreamObserver<WriteRequest>() {
+
+ private String digestHash;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ String resourceName = writeRequest.getResourceName();
+ if (!resourceName.isEmpty()) {
+ String[] components = resourceName.split("/");
+ assertThat(components).hasLength(6);
+ digestHash = components[4];
+ }
+ assertThat(digestHash).isNotNull();
+ RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+ assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
+ assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
+ assertThat(meta.getActionId()).isEqualTo(digestHash);
+ assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
+ assertThat(meta.getToolDetails().getToolVersion())
+ .isEqualTo(BlazeVersionInfo.instance().getVersion());
+ synchronized (this) {
+ Integer numFailures = uploadsFailed.get(digestHash);
+ if (numFailures < 3) {
+ uploadsFailed.put(digestHash, numFailures + 1);
+ response.onError(Status.INTERNAL.asException());
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ response.onNext(WriteResponse.newBuilder().setCommittedSize(10).build());
+ response.onCompleted();
+ }
+ };
+ }
+ };
+ serviceRegistry.addService(
+ ServerInterceptors.intercept(
+ bsService, new TracingMetadataUtils.ServerHeadersInterceptor()));
+
+ List<ListenableFuture<Void>> uploads = new ArrayList<>();
+
+ for (Chunker chunker : builders) {
+ Context ctx =
+ TracingMetadataUtils.contextWithMetadata(
+ "build-req-id", "command-id", Digests.unsafeActionKeyFromDigest(chunker.digest()));
+ ctx.call(
+ () -> {
+ uploads.add(uploader.uploadBlobAsync(chunker));
+ return null;
+ });
+ }
+
+ for (ListenableFuture<Void> upload : uploads) {
+ upload.get();
+ }
+
+ blockUntilInternalStateConsistent(uploader);
+ }
+
@Test(timeout = 10000)
public void sameBlobShouldNotBeUploadedTwice() throws Exception {
// Test that uploading the same file concurrently triggers only one file upload.
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -313,6 +420,7 @@
@Test(timeout = 10000)
public void errorsShouldBeReported() throws IOException, InterruptedException {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -339,6 +447,7 @@
@Test(timeout = 10000)
public void shutdownShouldCancelOngoingUploads() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -390,6 +499,7 @@
@Test(timeout = 10000)
public void failureInRetryExecutorShouldBeHandled() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> new FixedBackoff(1, 10), (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
@@ -420,6 +530,7 @@
@Test(timeout = 10000)
public void resourceNameWithoutInstanceName() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> mockBackoff, (Status s) -> true);
ByteStreamUploader uploader =
new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier, retryService);
@@ -456,6 +567,7 @@
@Test(timeout = 10000)
public void nonRetryableStatusShouldNotBeRetried() throws Exception {
+ withEmptyMetadata.attach();
Retrier retrier = new Retrier(() -> new FixedBackoff(1, 0),
/* No Status is retriable. */ (Status s) -> false);
ByteStreamUploader uploader =
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index fe31aa7..5cc03be 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -54,6 +54,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
+import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
@@ -103,6 +104,10 @@
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
+ Context withEmptyMetadata =
+ TracingMetadataUtils.contextWithMetadata(
+ "none", "none", Digests.unsafeActionKeyFromDigest(Digest.getDefaultInstance()));
+ withEmptyMetadata.attach();
}
@After
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index c5d5b66..de6d50f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -34,6 +34,7 @@
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.ResourceSet;
import com.google.devtools.build.lib.actions.SimpleSpawn;
+import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.authandtls.GrpcUtils;
import com.google.devtools.build.lib.exec.SpawnExecException;
@@ -61,6 +62,7 @@
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
@@ -69,9 +71,15 @@
import com.google.watcher.v1.ChangeBatch;
import com.google.watcher.v1.Request;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
+import io.grpc.BindableService;
import io.grpc.CallCredentials;
import io.grpc.Channel;
+import io.grpc.Metadata;
import io.grpc.Server;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
@@ -229,8 +237,17 @@
GrpcUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class));
GrpcRemoteCache remoteCache =
new GrpcRemoteCache(channel, creds, options, retrier);
- client = new RemoteSpawnRunner(execRoot, options, null, true, /*cmdlineReporter=*/null,
- remoteCache, executor);
+ client =
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ null,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ remoteCache,
+ executor);
inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
@@ -364,22 +381,42 @@
};
}
+ /** Capture the request headers from a client. Useful for testing metadata propagation. */
+ private static class RequestHeadersValidator implements ServerInterceptor {
+ @Override
+ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
+ ServerCall<ReqT, RespT> call,
+ Metadata headers,
+ ServerCallHandler<ReqT, RespT> next) {
+ RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY);
+ assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
+ assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
+ assertThat(meta.getActionId()).isNotEmpty();
+ assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
+ assertThat(meta.getToolDetails().getToolVersion())
+ .isEqualTo(BlazeVersionInfo.instance().getVersion());
+ return next.startCall(call, headers);
+ }
+ }
+
@Test
public void remotelyExecute() throws Exception {
- serviceRegistry.addService(
+ BindableService actionCache =
new ActionCacheImplBase() {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
}
- });
+ };
+ serviceRegistry.addService(
+ ServerInterceptors.intercept(actionCache, new RequestHeadersValidator()));
final ActionResult actionResult =
ActionResult.newBuilder()
.setStdoutRaw(ByteString.copyFromUtf8("stdout"))
.setStderrRaw(ByteString.copyFromUtf8("stderr"))
.build();
- serviceRegistry.addService(
+ BindableService execService =
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
@@ -395,7 +432,9 @@
.build());
responseObserver.onCompleted();
}
- });
+ };
+ serviceRegistry.addService(
+ ServerInterceptors.intercept(execService, new RequestHeadersValidator()));
final Command command =
Command.newBuilder()
.addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
@@ -406,7 +445,7 @@
.build())
.build();
final Digest cmdDigest = Digests.computeDigest(command);
- serviceRegistry.addService(
+ BindableService cas =
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
@@ -424,13 +463,15 @@
responseObserver.onNext(b.build());
responseObserver.onCompleted();
}
- });
+ };
+ serviceRegistry.addService(ServerInterceptors.intercept(cas, new RequestHeadersValidator()));
ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
.thenAnswer(blobWriteAnswer(command.toByteArray()))
.thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8)));
- serviceRegistry.addService(mockByteStreamImpl);
+ serviceRegistry.addService(
+ ServerInterceptors.intercept(mockByteStreamImpl, new RequestHeadersValidator()));
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
assertThat(result.setupSuccess()).isTrue();
@@ -563,7 +604,8 @@
})
.when(mockWatcherImpl)
.watch(Mockito.<Request>anyObject(), Mockito.<StreamObserver<ChangeBatch>>anyObject());
- serviceRegistry.addService(mockWatcherImpl);
+ serviceRegistry.addService(
+ ServerInterceptors.intercept(mockWatcherImpl, new RequestHeadersValidator()));
final Command command =
Command.newBuilder()
.addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
index 160c20f..b1e34f5 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java
@@ -166,7 +166,9 @@
Reporter reporter = new Reporter(new EventBus());
eventHandler = new StoredEventHandler();
reporter.addHandler(eventHandler);
- cache = new RemoteSpawnCache(execRoot, options, remoteCache, false, reporter);
+ cache =
+ new RemoteSpawnCache(
+ execRoot, options, remoteCache, "build-req-id", "command-id", false, reporter);
fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
index 940c9d3..bf9dee0 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java
@@ -121,8 +121,16 @@
options.remoteUploadLocalResults = true;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, executor);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ executor);
ExecuteResponse succeeded = ExecuteResponse.newBuilder().setResult(
ActionResult.newBuilder().setExitCode(0).build()).build();
@@ -169,8 +177,16 @@
options.remoteUploadLocalResults = true;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, null);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ null);
// Throw an IOException to trigger the local fallback.
when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(IOException.class);
@@ -211,8 +227,17 @@
options.remoteUploadLocalResults = true;
RemoteSpawnRunner runner =
- spy(new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, null));
+ spy(
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ null));
Spawn spawn = newSimpleSpawn();
SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn);
@@ -249,8 +274,17 @@
SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn);
RemoteSpawnRunner runner =
- spy(new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, null));
+ spy(
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ null));
try {
runner.exec(spawn, policy);
@@ -274,7 +308,16 @@
reporter.addHandler(eventHandler);
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, false, reporter, cache, null);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ false,
+ reporter,
+ "build-req-id",
+ "command-id",
+ cache,
+ null);
Spawn spawn = newSimpleSpawn();
SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn);
@@ -315,8 +358,16 @@
options.remoteLocalFallback = true;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, null);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ null);
Spawn spawn = newSimpleSpawn();
SpawnExecutionPolicy policy = new FakeSpawnExecutionPolicy(spawn);
@@ -343,8 +394,16 @@
RemoteOptions options = Options.getDefaults(RemoteOptions.class);
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, executor);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ executor);
ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(cachedResult);
@@ -375,8 +434,16 @@
options.remoteLocalFallback = false;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, executor);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ executor);
ActionResult cachedResult = ActionResult.newBuilder().setExitCode(0).build();
when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
@@ -402,8 +469,16 @@
options.remoteLocalFallback = false;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, executor);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ executor);
when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
when(executor.executeRemotely(any(ExecuteRequest.class))).thenThrow(new IOException());
@@ -429,8 +504,16 @@
options.remoteLocalFallback = false;
RemoteSpawnRunner runner =
- new RemoteSpawnRunner(execRoot, options, localRunner, true, /*cmdlineReporter=*/null,
- cache, executor);
+ new RemoteSpawnRunner(
+ execRoot,
+ options,
+ localRunner,
+ true,
+ /*cmdlineReporter=*/ null,
+ "build-req-id",
+ "command-id",
+ cache,
+ executor);
when(cache.getCachedActionResult(any(ActionKey.class))).thenThrow(new IOException());
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
index af85891..8f36c4d 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
@@ -12,18 +12,13 @@
visibility = ["//src/tools/remote_worker:__subpackages__"],
deps = [
"//src/main/java/com/google/devtools/build/lib:build-base",
- "//src/main/java/com/google/devtools/build/lib:events",
- "//src/main/java/com/google/devtools/build/lib:io",
"//src/main/java/com/google/devtools/build/lib:os_util",
"//src/main/java/com/google/devtools/build/lib:packages-internal",
"//src/main/java/com/google/devtools/build/lib:process_util",
- "//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib:single-line-formatter",
"//src/main/java/com/google/devtools/build/lib:unix",
"//src/main/java/com/google/devtools/build/lib:util",
"//src/main/java/com/google/devtools/build/lib/actions",
- "//src/main/java/com/google/devtools/build/lib/authandtls",
- "//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/shell",
"//src/main/java/com/google/devtools/build/lib/vfs",
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
index 9ed769c..83b003a 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ExecutionServer.java
@@ -28,6 +28,7 @@
import com.google.devtools.build.lib.remote.Digests;
import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
+import com.google.devtools.build.lib.remote.TracingMetadataUtils;
import com.google.devtools.build.lib.shell.AbnormalTerminationException;
import com.google.devtools.build.lib.shell.Command;
import com.google.devtools.build.lib.shell.CommandException;
@@ -41,10 +42,12 @@
import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.devtools.remoteexecution.v1test.RequestMetadata;
import com.google.longrunning.Operation;
import com.google.protobuf.util.Durations;
import com.google.rpc.Code;
import com.google.rpc.Status;
+import io.grpc.Context;
import io.grpc.StatusException;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
@@ -59,7 +62,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -123,12 +125,8 @@
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
final String opName = UUID.randomUUID().toString();
- ListenableFuture<ActionResult> future = executorService.submit(new Callable<ActionResult>() {
- @Override
- public ActionResult call() throws Exception {
- return execute(request, opName);
- }
- });
+ ListenableFuture<ActionResult> future =
+ executorService.submit(Context.current().wrap(() -> execute(request, opName)));
operationsCache.put(opName, future);
responseObserver.onNext(Operation.newBuilder().setName(opName).build());
responseObserver.onCompleted();
@@ -137,19 +135,20 @@
private ActionResult execute(ExecuteRequest request, String id)
throws IOException, InterruptedException, StatusException {
Path tempRoot = workPath.getRelative("build-" + id);
+ String workDetails = "";
try {
tempRoot.createDirectory();
- logger.log(
- FINE,
- "Work received has {0} input files and {1} output files.",
- new Object[]{
- request.getTotalInputFileCount(), request.getAction().getOutputFilesCount()
- });
+ RequestMetadata meta = TracingMetadataUtils.fromCurrentContext();
+ workDetails =
+ String.format(
+ "build-request-id: %s command-id: %s action-id: %s",
+ meta.getCorrelatedInvocationsId(), meta.getToolInvocationId(), meta.getActionId());
+ logger.log(FINE, "Received work for: {0}", workDetails);
ActionResult result = execute(request.getAction(), tempRoot);
- logger.log(FINE, "Completed {0}.", id);
+ logger.log(FINE, "Completed {0}.", workDetails);
return result;
} catch (Exception e) {
- logger.log(Level.SEVERE, "Work failed.", e);
+ logger.log(Level.SEVERE, "Work failed: {0} {1}.", new Object[] {workDetails, e});
throw e;
} finally {
if (workerOptions.debug) {
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 8d47d5c..a459cc1 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -27,6 +27,7 @@
import com.google.devtools.build.lib.remote.RemoteOptions;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
+import com.google.devtools.build.lib.remote.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.blobstore.ConcurrentMapBlobStore;
import com.google.devtools.build.lib.remote.blobstore.OnDiskBlobStore;
import com.google.devtools.build.lib.remote.blobstore.SimpleBlobStore;
@@ -49,6 +50,8 @@
import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
import com.google.watcher.v1.WatcherGrpc.WatcherImplBase;
import io.grpc.Server;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -121,15 +124,16 @@
}
public Server startServer() throws IOException {
+ ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor();
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
- .addService(actionCacheServer)
- .addService(bsServer)
- .addService(casServer);
+ .addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor))
+ .addService(ServerInterceptors.intercept(bsServer, headersInterceptor))
+ .addService(ServerInterceptors.intercept(casServer, headersInterceptor));
if (execServer != null) {
- b.addService(execServer);
- b.addService(watchServer);
+ b.addService(ServerInterceptors.intercept(execServer, headersInterceptor));
+ b.addService(ServerInterceptors.intercept(watchServer, headersInterceptor));
} else {
logger.info("Execution disabled, only serving cache requests.");
}