Rewrite the BuildEventServiceTransport
The code of the BuildEventServiceTransport has become sufficiently
complex, difficult to reason about and hard to test that we found
it necessary to rewrite it before launching this in production.
The new implementation uses an eventloop style concurrency model
for build event uploads. Additionally, the BuildEventServiceClient
implementations where updated to only report errors via the Future
returned by BESC.openStream(...). The BuildEventServiceProtoUtil
was immutable and is no longer responsible for assigning sequence
numbers.
RELNOTES: None
PiperOrigin-RevId: 211784727
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
index 1b78cf8..d702984 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java
@@ -25,11 +25,15 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.BuildEventLogger;
+import com.google.devtools.build.lib.buildeventservice.BuildEventServiceTransport.ExitFunction;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploaderFactoryMap;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
+import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
+import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
import com.google.devtools.build.lib.buildeventstream.transports.BuildEventStreamOptions;
import com.google.devtools.build.lib.buildeventstream.transports.BuildEventTransportFactory;
import com.google.devtools.build.lib.clock.Clock;
@@ -104,7 +108,7 @@
commandEnvironment.getRuntime().getClock(),
commandEnvironment.getRuntime().getBuildEventArtifactUploaderFactoryMap(),
commandEnvironment.getReporter(),
- commandEnvironment.getBuildRequestId().toString(),
+ commandEnvironment.getBuildRequestId(),
commandEnvironment.getCommandId().toString(),
commandEnvironment.getCommandName(),
commandEnvironment.getEventBus());
@@ -159,7 +163,7 @@
String buildRequestId,
String invocationId,
String commandName,
- EventBus internalEventBus) {
+ EventBus eventbus) {
Preconditions.checkNotNull(buildEventArtifactUploaderFactoryMap);
try {
@@ -176,7 +180,7 @@
commandLineReporter,
startupOptionsProvider,
optionsProvider,
- internalEventBus);
+ eventbus);
} catch (Exception e) {
reportError(
commandLineReporter,
@@ -222,7 +226,7 @@
EventHandler commandLineReporter,
OptionsParsingResult startupOptionsProvider,
OptionsParsingResult optionsProvider,
- EventBus internalEventBus)
+ EventBus eventbus)
throws IOException, OptionsParsingException {
T besOptions =
checkNotNull(
@@ -268,24 +272,36 @@
.select(protocolOptions.buildEventUploadStrategy)
.create(optionsProvider);
- BuildEventTransport besTransport =
- new BuildEventServiceTransport(
- client,
- besOptions.besTimeout,
- besOptions.besLifecycleEvents,
+ BuildEventLogger buildEventLogger =
+ (BuildEventStreamProtos.BuildEvent bepEvent) -> {
+ if (bepEvent.getSerializedSize()
+ > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) {
+ eventbus.post(
+ new LargeBuildEventSerializedEvent(
+ bepEvent.getId().toString(), bepEvent.getSerializedSize()));
+ }
+ };
+
+ BuildEventServiceProtoUtil besProtoUtil =
+ new BuildEventServiceProtoUtil(
buildRequestId,
invocationId,
- commandName,
- moduleEnvironment,
- clock,
- protocolOptions,
- commandLineReporter,
besOptions.projectId,
- keywords(besOptions, startupOptionsProvider),
- besResultsUrl,
- artifactUploader,
- errorsShouldFailTheBuild(),
- internalEventBus);
+ commandName,
+ keywords(besOptions, startupOptionsProvider));
+
+ BuildEventTransport besTransport =
+ new BuildEventServiceTransport.Builder()
+ .closeTimeout(besOptions.besTimeout)
+ .publishLifecycleEvents(besOptions.besLifecycleEvents)
+ .buildEventLogger(buildEventLogger)
+ .build(
+ client,
+ artifactUploader,
+ protocolOptions,
+ besProtoUtil,
+ clock,
+ bazelExitFunction(commandLineReporter, moduleEnvironment, besResultsUrl));
logger.fine("BuildEventServiceTransport was created successfully");
return besTransport;
}
@@ -308,10 +324,35 @@
protected Set<String> keywords(
T besOptions, @Nullable OptionsParsingResult startupOptionsProvider) {
- return besOptions
- .besKeywords
- .stream()
+ return besOptions.besKeywords.stream()
.map(keyword -> "user_keyword=" + keyword)
.collect(ImmutableSet.toImmutableSet());
}
+
+ private ExitFunction bazelExitFunction(
+ EventHandler commandLineReporter, ModuleEnvironment moduleEnvironment, String besResultsUrl) {
+ return (String message, Exception cause) -> {
+ if (cause == null) {
+ commandLineReporter.handle(Event.info("Build Event Protocol upload finished successfully"));
+ if (besResultsUrl != null) {
+ commandLineReporter.handle(
+ Event.info("Build Event Protocol results available at " + besResultsUrl));
+ }
+ } else {
+ if (errorsShouldFailTheBuild()) {
+ commandLineReporter.handle(Event.error(message));
+ moduleEnvironment.exit(new AbruptExitException(ExitCode.PUBLISH_ERROR, cause));
+ } else {
+ commandLineReporter.handle(Event.warn(message));
+ }
+ if (besResultsUrl != null) {
+ if (!Strings.isNullOrEmpty(besResultsUrl)) {
+ commandLineReporter.handle(
+ Event.info(
+ "Partial Build Event Protocol results may be available at " + besResultsUrl));
+ }
+ }
+ }
+ };
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
index 4c0973f..7e36e31 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -18,7 +18,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
-import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.v1.BuildEvent;
import com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished;
import com.google.devtools.build.v1.BuildEvent.BuildEnqueued;
@@ -36,16 +35,14 @@
import com.google.protobuf.Any;
import com.google.protobuf.Timestamp;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
-/** Utility class used to build protobuffs requests that are meant to be sent over BES. */
+/** Utility class with convenience methods for building a {@link BuildEventServiceTransport}. */
public final class BuildEventServiceProtoUtil {
private final String buildRequestId;
private final String buildInvocationId;
private final String projectId;
- private final AtomicInteger streamSequenceNumber;
private final String commandName;
private final Set<String> additionalKeywords;
@@ -54,14 +51,12 @@
String buildInvocationId,
@Nullable String projectId,
String commandName,
- Clock clock,
Set<String> additionalKeywords) {
this.buildRequestId = buildRequestId;
this.buildInvocationId = buildInvocationId;
this.projectId = projectId;
this.commandName = commandName;
- this.additionalKeywords = additionalKeywords;
- this.streamSequenceNumber = new AtomicInteger(1);
+ this.additionalKeywords = ImmutableSet.copyOf(additionalKeywords);
}
public PublishLifecycleEventRequest buildEnqueued(Timestamp timestamp) {
@@ -109,13 +104,9 @@
.build();
}
- public int nextSequenceNumber() {
- return streamSequenceNumber.getAndIncrement();
- }
-
/** Creates a PublishBuildToolEventStreamRequest from a packed bazel event. */
public PublishBuildToolEventStreamRequest bazelEvent(
- int sequenceNumber, Timestamp timestamp, Any packedEvent) {
+ long sequenceNumber, Timestamp timestamp, Any packedEvent) {
return publishBuildToolEventStreamRequest(
sequenceNumber,
timestamp,
@@ -123,7 +114,7 @@
}
public PublishBuildToolEventStreamRequest streamFinished(
- int sequenceNumber, Timestamp timestamp) {
+ long sequenceNumber, Timestamp timestamp) {
return publishBuildToolEventStreamRequest(
sequenceNumber,
timestamp,
@@ -134,7 +125,7 @@
@VisibleForTesting
public PublishBuildToolEventStreamRequest publishBuildToolEventStreamRequest(
- int sequenceNumber, Timestamp timestamp, BuildEvent.Builder besEvent) {
+ long sequenceNumber, Timestamp timestamp, BuildEvent.Builder besEvent) {
PublishBuildToolEventStreamRequest.Builder builder =
PublishBuildToolEventStreamRequest.newBuilder()
.setOrderedBuildEvent(
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index b56f16f..ea874f4 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -14,22 +14,17 @@
package com.google.devtools.build.lib.buildeventservice;
-import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
-import static com.google.devtools.build.lib.events.EventKind.INFO;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
-import static java.lang.String.format;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
-import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
@@ -41,21 +36,13 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
-import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.clock.Clock;
-import com.google.devtools.build.lib.events.Event;
-import com.google.devtools.build.lib.events.EventHandler;
-import com.google.devtools.build.lib.events.EventKind;
-import com.google.devtools.build.lib.runtime.BlazeModule.ModuleEnvironment;
-import com.google.devtools.build.lib.util.AbruptExitException;
-import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.JavaSleeper;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
-import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
import com.google.protobuf.Timestamp;
@@ -65,738 +52,881 @@
import io.grpc.StatusException;
import java.time.Duration;
import java.util.Collection;
-import java.util.Deque;
-import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.Immutable;
/** A {@link BuildEventTransport} that streams {@link BuildEvent}s to BuildEventService. */
public class BuildEventServiceTransport implements BuildEventTransport {
- static final String UPLOAD_FAILED_MESSAGE = "Build Event Protocol upload failed: %s";
- static final String UPLOAD_SUCCEEDED_MESSAGE =
- "Build Event Protocol upload finished successfully.";
+ private final BuildEventServiceUploader besUploader;
- static final Set<Code> CODES_NOT_TO_RETRY =
- Sets.newHashSet(Code.INVALID_ARGUMENT, Code.FAILED_PRECONDITION);
+ /** A builder for {@link BuildEventServiceTransport}. */
+ public static class Builder {
+ private boolean publishLifecycleEvents;
+ private Duration closeTimeout;
+ private Sleeper sleeper;
+ private BuildEventLogger buildEventLogger;
- private static final Logger logger = Logger.getLogger(BuildEventServiceTransport.class.getName());
+ /** Whether to publish lifecycle events. */
+ public Builder publishLifecycleEvents(boolean publishLifecycleEvents) {
+ this.publishLifecycleEvents = publishLifecycleEvents;
+ return this;
+ }
- /** Max wait time until for the Streaming RPC to finish after all events were sent. */
- private static final Duration PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT = Duration.ofSeconds(30);
- /** Max wait time between isStreamActive checks of the PublishBuildToolEventStream RPC. */
- private static final int STREAMING_RPC_POLL_IN_SECS = 1;
+ /** The time to wait for the build event upload after the build has completed. */
+ public Builder closeTimeout(Duration closeTimeout) {
+ this.closeTimeout = closeTimeout;
+ return this;
+ }
- private final String besResultsUrl;
- private final ListeningExecutorService uploaderExecutorService;
- private final Duration uploadTimeout;
- private final boolean publishLifecycleEvents;
- private final BuildEventServiceClient besClient;
- private final BuildEventServiceProtoUtil besProtoUtil;
- private final ModuleEnvironment moduleEnvironment;
- private final EventHandler commandLineReporter;
- private final BuildEventProtocolOptions protocolOptions;
- private final BuildEventArtifactUploader artifactUploader;
- private final Sleeper sleeper;
- private final boolean errorsShouldFailTheBuild;
- private final Clock clock;
- /** Contains all pendingAck events that might be retried in case of failures. */
- private ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck;
- /** Contains all events should be sent ordered by sequence number. */
- private final BlockingDeque<InternalOrderedBuildEvent> pendingSend;
- /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
- private volatile Result invocationResult;
- /** Used to block until all events have been uploaded. */
- private ListenableFuture<?> uploadComplete;
- /** Used to ensure that the close logic is only invoked once. */
- private SettableFuture<Void> shutdownFuture;
- /**
- * If the call before the current call threw an exception, this field points to it. If the
- * previous call was successful, this field is null. This is useful for error reporting, when an
- * upload times out due to having had to retry several times.
- */
- private volatile StatusException lastRetryError;
- /** Returns true if we already reported a warning or error to UI. */
- private volatile boolean errorsReported;
- /**
- * Returns the number of ACKs received since the last time {@link #publishEventStream()} was
- * retried due to a failure.
- */
- private final AtomicInteger acksReceivedSinceLastRetry = new AtomicInteger();
+ public Builder buildEventLogger(BuildEventLogger buildEventLogger) {
+ this.buildEventLogger = buildEventLogger;
+ return this;
+ }
- private final EventBus internalEventBus;
+ @VisibleForTesting
+ public Builder sleeper(Sleeper sleeper) {
+ this.sleeper = sleeper;
+ return this;
+ }
- public BuildEventServiceTransport(
- BuildEventServiceClient besClient,
- Duration uploadTimeout,
- boolean publishLifecycleEvents,
- String buildRequestId,
- String invocationId,
- String command,
- ModuleEnvironment moduleEnvironment,
- Clock clock,
- BuildEventProtocolOptions protocolOptions,
- EventHandler commandLineReporter,
- @Nullable String projectId,
- Set<String> keywords,
- @Nullable String besResultsUrl,
- BuildEventArtifactUploader artifactUploader,
- boolean errorsShouldFailTheBuild,
- EventBus internalEventBus) {
- this(
- besClient,
- uploadTimeout,
- publishLifecycleEvents,
- buildRequestId,
- invocationId,
- command,
- moduleEnvironment,
- clock,
- protocolOptions,
- commandLineReporter,
- projectId,
- keywords,
- besResultsUrl,
- artifactUploader,
- new JavaSleeper(),
- errorsShouldFailTheBuild,
- internalEventBus);
+ public BuildEventServiceTransport build(
+ BuildEventServiceClient besClient,
+ BuildEventArtifactUploader localFileUploader,
+ BuildEventProtocolOptions bepOptions,
+ BuildEventServiceProtoUtil besProtoUtil,
+ Clock clock,
+ ExitFunction exitFunction) {
+
+ return new BuildEventServiceTransport(
+ besClient,
+ localFileUploader,
+ bepOptions,
+ besProtoUtil,
+ clock,
+ exitFunction,
+ publishLifecycleEvents,
+ closeTimeout != null ? closeTimeout : Duration.ZERO,
+ sleeper != null ? sleeper : new JavaSleeper(),
+ buildEventLogger != null ? buildEventLogger : (e) -> {});
+ }
}
- @VisibleForTesting
- public BuildEventServiceTransport(
+ private BuildEventServiceTransport(
BuildEventServiceClient besClient,
- Duration uploadTimeout,
- boolean publishLifecycleEvents,
- String buildRequestId,
- String invocationId,
- String command,
- ModuleEnvironment moduleEnvironment,
+ BuildEventArtifactUploader localFileUploader,
+ BuildEventProtocolOptions bepOptions,
+ BuildEventServiceProtoUtil besProtoUtil,
Clock clock,
- BuildEventProtocolOptions protocolOptions,
- EventHandler commandLineReporter,
- @Nullable String projectId,
- Set<String> keywords,
- @Nullable String besResultsUrl,
- BuildEventArtifactUploader artifactUploader,
+ ExitFunction exitFunc,
+ boolean publishLifecycleEvents,
+ Duration closeTimeout,
Sleeper sleeper,
- boolean errorsShouldFailTheBuild,
- EventBus internalEventBus) {
- this.besClient = besClient;
- this.besProtoUtil =
- new BuildEventServiceProtoUtil(
- buildRequestId, invocationId, projectId, command, clock, keywords);
- this.publishLifecycleEvents = publishLifecycleEvents;
- this.moduleEnvironment = moduleEnvironment;
- this.commandLineReporter = commandLineReporter;
- this.pendingAck = new ConcurrentLinkedDeque<>();
- this.pendingSend = new LinkedBlockingDeque<>();
- // Setting the thread count to 2 instead of 1 is a hack, but necessary as publishEventStream
- // blocks one thread permanently and thus we can't do any other work on the executor. A proper
- // fix would be to remove the spinning loop from publishEventStream and instead implement the
- // loop by publishEventStream re-submitting itself to the executor.
- // TODO(buchgr): Fix it.
- this.uploaderExecutorService =
- listeningDecorator(
- Executors.newFixedThreadPool(
- 2,
- new ThreadFactory() {
-
- private final AtomicInteger count = new AtomicInteger();
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "bes-uploader-" + count.incrementAndGet());
- }
- }));
- this.protocolOptions = protocolOptions;
- this.invocationResult = UNKNOWN_STATUS;
- this.uploadTimeout = uploadTimeout;
- this.artifactUploader = artifactUploader;
- this.sleeper = sleeper;
- this.besResultsUrl = besResultsUrl;
- this.errorsShouldFailTheBuild = errorsShouldFailTheBuild;
- this.clock = clock;
- this.internalEventBus = internalEventBus;
- }
-
- public boolean isStreaming() {
- return besClient.isStreamActive();
+ BuildEventLogger buildEventLogger) {
+ this.besUploader =
+ new BuildEventServiceUploader(
+ besClient,
+ localFileUploader,
+ besProtoUtil,
+ bepOptions,
+ publishLifecycleEvents,
+ closeTimeout,
+ exitFunc,
+ sleeper,
+ clock,
+ buildEventLogger);
}
@Override
public ListenableFuture<Void> close() {
- return close(/*now=*/ false);
+ // This future completes once the upload has finished. As
+ // per API contract it is expected to never fail.
+ SettableFuture<Void> closeFuture = SettableFuture.create();
+ ListenableFuture<Void> uploaderCloseFuture = besUploader.close();
+ uploaderCloseFuture.addListener(() -> closeFuture.set(null), MoreExecutors.directExecutor());
+ return closeFuture;
}
@Override
- @SuppressWarnings("FutureReturnValueIgnored")
public void closeNow() {
- close(/*now=*/ true);
- }
-
- private synchronized ListenableFuture<Void> close(boolean now) {
- if (shutdownFuture != null) {
- if (now) {
- cancelUpload();
- if (!shutdownFuture.isDone()) {
- shutdownFuture.set(null);
- }
- }
- return shutdownFuture;
- }
-
- logger.log(Level.INFO, "Closing the build event service transport.");
-
- // The future is completed once the close succeeded or failed.
- shutdownFuture = SettableFuture.create();
-
- if (now) {
- cancelUpload();
- shutdownFuture.set(null);
- return shutdownFuture;
- }
-
- Timestamp lastEventTimestamp = timestamp();
- uploaderExecutorService.execute(
- () -> {
- try {
- sendOrderedBuildEvent(
- new LastInternalOrderedBuildEvent(
- besProtoUtil.nextSequenceNumber(), lastEventTimestamp));
-
- if (errorsReported) {
- // If we encountered errors before and have already reported them, then we should
- // not report them a second time.
- return;
- }
-
- report(INFO, "Waiting for Build Event Protocol upload to finish.");
- try {
- if (uploadTimeout.isZero()) {
- uploadComplete.get();
- } else {
- uploadComplete.get(uploadTimeout.toMillis(), MILLISECONDS);
- }
- report(INFO, UPLOAD_SUCCEEDED_MESSAGE);
- if (!Strings.isNullOrEmpty(besResultsUrl)) {
- report(INFO, "Build Event Protocol results available at " + besResultsUrl);
- }
-
- } catch (Exception e) {
- uploadComplete.cancel(true);
- reportErrorAndFailBuild(e);
- if (!Strings.isNullOrEmpty(besResultsUrl)) {
- report(
- INFO,
- "Partial Build Event Protocol results may be available at " + besResultsUrl);
- }
- }
- } finally {
- shutdownFuture.set(null);
- uploaderExecutorService.shutdown();
- }
- });
-
- return shutdownFuture;
- }
-
- private void cancelUpload() {
- if (!uploaderExecutorService.isShutdown()) {
- logger.log(Level.INFO, "Forcefully closing the build event service transport.");
- // This will interrupt the thread doing the BES upload.
- if (uploadComplete != null) {
- uploadComplete.cancel(true);
- }
- uploaderExecutorService.shutdownNow();
- try {
- uploaderExecutorService.awaitTermination(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // Ignore this exception. We are shutting down independently no matter what the BES
- // upload does.
- }
- }
+ besUploader.closeNow(/*causedByTimeout=*/ false);
}
@Override
public String name() {
- // TODO(buchgr): Also display the hostname / IP.
return "Build Event Service";
}
@Override
public void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
- if (event instanceof BuildCompletingEvent) {
- BuildCompletingEvent completingEvent = (BuildCompletingEvent) event;
- if (completingEvent.getExitCode() != null
- && completingEvent.getExitCode().getNumericExitCode() == 0) {
- invocationResult = COMMAND_SUCCEEDED;
- } else {
- invocationResult = COMMAND_FAILED;
- }
- }
-
- Collection<LocalFile> localFiles = event.referencedLocalFiles();
- Map<Path, LocalFile> localFileMap = new HashMap<>(localFiles.size());
- for (LocalFile localFile : localFiles) {
- // It is possible for targets to have duplicate artifacts (same path but different owners)
- // in their output groups. Since they didn't trigger an artifact conflict they are the
- // same file, so just skip either one
- localFileMap.putIfAbsent(localFile.path, localFile);
- }
- ListenableFuture<PathConverter> upload = artifactUploader.upload(localFileMap);
- InternalOrderedBuildEvent buildEvent =
- new DefaultInternalOrderedBuildEvent(
- event, namer, upload, besProtoUtil.nextSequenceNumber(), timestamp());
- sendOrderedBuildEvent(buildEvent);
+ besUploader.enqueueEvent(event, namer);
}
- private Timestamp timestamp() {
- return Timestamps.fromMillis(clock.currentTimeMillis());
+ /** BuildEventLogger can be used to log build event (stats). */
+ @FunctionalInterface
+ public interface BuildEventLogger {
+ void log(BuildEventStreamProtos.BuildEvent buildEvent);
}
- private String errorMessageFromException(Throwable t) {
- String message;
- if (t instanceof TimeoutException) {
- message = "Build Event Protocol upload timed out.";
- StatusException lastRetryError0 = lastRetryError;
- if (lastRetryError0 != null) {
- // We may at times get a timeout exception due to an underlying error that was retried
- // several times. If such an error exists, report it.
- message += " Transport errors caused the upload to be retried.";
- message += " Last known reason for retry: ";
- message += besClient.userReadableError(lastRetryError0);
- return message;
- }
- return message;
- } else if (t instanceof ExecutionException) {
- message =
- format(
- UPLOAD_FAILED_MESSAGE,
- t.getCause() != null ? besClient.userReadableError(t.getCause()) : t.getMessage());
- return message;
- } else {
- message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t));
- return message;
- }
+ /**
+ * Called by the {@link BuildEventServiceUploader} in case of error to asynchronously notify Bazel
+ * of an error.
+ */
+ @FunctionalInterface
+ public interface ExitFunction {
+ void accept(String message, Exception cause);
}
- protected void reportErrorAndFailBuild(Throwable t) {
- String message = errorMessageFromException(t);
- if (errorsShouldFailTheBuild) {
- commandLineReporter.handle(Event.error(message));
- moduleEnvironment.exit(
- new AbruptExitException(
- "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR));
- } else {
- commandLineReporter.handle(Event.warn(message));
- }
- }
-
- private void maybeReportUploadError() {
- if (errorsReported) {
- return;
- }
-
- Throwable uploadError = fromFuture(uploadComplete);
- if (uploadError != null) {
- errorsReported = true;
- reportErrorAndFailBuild(uploadError);
- }
- }
-
- private synchronized void sendOrderedBuildEvent(InternalOrderedBuildEvent evtAndNamer) {
- if (uploadComplete != null && uploadComplete.isDone()) {
- maybeReportUploadError();
- return;
- }
-
- pendingSend.add(evtAndNamer);
- if (uploadComplete == null) {
- uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload());
- }
- }
-
- private Result getInvocationResult() {
- return invocationResult;
- }
-
- /** Class responsible for sending lifecycle and build events. */
- private class BuildEventServiceUpload implements Callable<Void> {
- @Override
- public Void call() throws Exception {
+ /**
+ * This method is only used in tests. Once TODO(b/113035235) is fixed the close future will also
+ * carry error messages.
+ */
+ @VisibleForTesting
+ public void throwUploaderError() throws Exception {
+ synchronized (besUploader.lock) {
+ checkState(besUploader.closeFuture != null && besUploader.closeFuture.isDone());
try {
- publishLifecycleEvent(besProtoUtil.buildEnqueued(timestamp()));
- publishLifecycleEvent(besProtoUtil.invocationStarted(timestamp()));
- try {
- retryOnException(BuildEventServiceTransport.this::publishEventStream);
- } finally {
- Result result = getInvocationResult();
- publishLifecycleEvent(besProtoUtil.invocationFinished(timestamp(), result));
- publishLifecycleEvent(besProtoUtil.buildFinished(timestamp(), result));
+ besUploader.closeFuture.get();
+ } catch (ExecutionException e) {
+ throw (Exception) e.getCause();
+ }
+ }
+ }
+
+ /** Implements the BES upload which includes uploading the lifecycle and build events. */
+ private static class BuildEventServiceUploader implements Runnable {
+
+ private static final Logger logger =
+ Logger.getLogger(BuildEventServiceUploader.class.getName());
+
+ /** Configuration knobs related to RPC retries. Values chosen by good judgement. */
+ private static final int MAX_NUM_RETRIES = 4;
+
+ private static final int DELAY_MILLIS = 1000;
+
+ private final BuildEventServiceClient besClient;
+ private final BuildEventArtifactUploader localFileUploader;
+ private final BuildEventServiceProtoUtil besProtoUtil;
+ private final BuildEventProtocolOptions protocolOptions;
+ private final boolean publishLifecycleEvents;
+ private final Duration closeTimeout;
+ private final ExitFunction exitFunc;
+ private final Sleeper sleeper;
+ private final Clock clock;
+ private final BuildEventLogger buildEventLogger;
+
+ /**
+ * The event queue contains two types of events: - Build events, sorted by sequence number, that
+ * should be sent to the server - Command events that are used by {@link #publishBuildEvents()}
+ * to change state.
+ */
+ private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>();
+
+ /**
+ * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be
+ * consecutive monotonically increasing natural numbers.
+ */
+ private final AtomicLong nextSeqNum = new AtomicLong(1);
+
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private Result buildStatus = UNKNOWN_STATUS;
+
+ /**
+ * Initialized only after the first call to {@link #close()} or if the upload fails before that.
+ * The {@code null} state is used throughout the code to make multiple calls to {@link #close()}
+ * idempotent.
+ */
+ @GuardedBy("lock")
+ private SettableFuture<Void> closeFuture;
+
+ /**
+ * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
+ * on the first call to {@link #enqueueEvent(BuildEvent, ArtifactGroupNamer)} or {@link
+ * #close()} (which ever comes first).
+ */
+ @GuardedBy("lock")
+ private Thread uploadThread;
+
+ @GuardedBy("lock")
+ private boolean interruptCausedByTimeout;
+
+ public BuildEventServiceUploader(
+ BuildEventServiceClient besClient,
+ BuildEventArtifactUploader localFileUploader,
+ BuildEventServiceProtoUtil besProtoUtil,
+ BuildEventProtocolOptions protocolOptions,
+ boolean publishLifecycleEvents,
+ Duration closeTimeout,
+ ExitFunction exitFunc,
+ Sleeper sleeper,
+ Clock clock,
+ BuildEventLogger buildEventLogger) {
+ this.besClient = Preconditions.checkNotNull(besClient);
+ this.localFileUploader = Preconditions.checkNotNull(localFileUploader);
+ this.besProtoUtil = Preconditions.checkNotNull(besProtoUtil);
+ this.protocolOptions = Preconditions.checkNotNull(protocolOptions);
+ this.publishLifecycleEvents = publishLifecycleEvents;
+ this.closeTimeout = Preconditions.checkNotNull(closeTimeout);
+ this.exitFunc = Preconditions.checkNotNull(exitFunc);
+ this.sleeper = Preconditions.checkNotNull(sleeper);
+ this.clock = Preconditions.checkNotNull(clock);
+ this.buildEventLogger = Preconditions.checkNotNull(buildEventLogger);
+ }
+
+ /** Enqueues an event for uploading to a BES backend. */
+ public void enqueueEvent(BuildEvent event, ArtifactGroupNamer namer) {
+ // This needs to happen outside a synchronized block as it may trigger
+ // stdout/stderr and lead to a deadlock. See b/109725432
+ ListenableFuture<PathConverter> localFileUploadFuture =
+ uploadReferencedLocalFiles(event.referencedLocalFiles());
+
+ synchronized (lock) {
+ if (closeFuture != null) {
+ // Close has been called and thus we silently ignore any further events and cancel
+ // any pending file uploads
+ closeFuture.addListener(
+ () -> {
+ if (!localFileUploadFuture.isDone()) {
+ localFileUploadFuture.cancel(true);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return;
}
+ // BuildCompletingEvent marks the end of the build in the BEP event stream.
+ if (event instanceof BuildCompletingEvent) {
+ this.buildStatus = extractBuildStatus((BuildCompletingEvent) event);
+ }
+ ensureUploadThreadStarted();
+ eventQueue.addLast(
+ new SendRegularBuildEventCommand(
+ event, namer, localFileUploadFuture, nextSeqNum.getAndIncrement(), currentTime()));
+ }
+ }
+
+ /**
+ * Gracefully stops the BES upload. All events enqueued before the call to close will be
+ * uploaded and events enqueued after the call will be discarded.
+ *
+ * <p>The returned future completes when the upload completes. It's guaranteed to never fail.
+ */
+ public ListenableFuture<Void> close() {
+ synchronized (lock) {
+ if (closeFuture != null) {
+ return closeFuture;
+ }
+ ensureUploadThreadStarted();
+
+ closeFuture = SettableFuture.create();
+
+ // Enqueue the last event which will terminate the upload.
+ eventQueue.addLast(
+ new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
+
+ if (!closeTimeout.isZero()) {
+ startCloseTimer(closeFuture, closeTimeout);
+ }
+ return closeFuture;
+ }
+ }
+
+ /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
+ public void closeNow(boolean causedByTimeout) {
+ synchronized (lock) {
+ if (uploadThread != null) {
+ if (uploadThread.isInterrupted()) {
+ return;
+ }
+
+ interruptCausedByTimeout = causedByTimeout;
+ uploadThread.interrupt();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (publishLifecycleEvents) {
+ publishLifecycleEvent(besProtoUtil.buildEnqueued(currentTime()));
+ publishLifecycleEvent(besProtoUtil.invocationStarted(currentTime()));
+ }
+
+ try {
+ publishBuildEvents();
+ } finally {
+ if (publishLifecycleEvents) {
+ Result buildStatus;
+ synchronized (lock) {
+ buildStatus = this.buildStatus;
+ }
+ publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus));
+ publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus));
+ }
+ }
+ exitFunc.accept("The Build Event Protocol upload finished successfully", null);
+ synchronized (lock) {
+ // Invariant: closeFuture is not null.
+ // publishBuildEvents() only terminates successfully after SendLastBuildEventCommand
+ // has been sent successfully and that event is only added to the eventQueue during a
+ // call to close() which initializes the closeFuture.
+ closeFuture.set(null);
+ }
+ } catch (InterruptedException e) {
+ try {
+ logInfo(e, "Aborting the BES upload due to having received an interrupt");
+ synchronized (lock) {
+ if (interruptCausedByTimeout) {
+ exitFunc.accept("The Build Event Protocol upload timed out", e);
+ }
+ }
+ } finally {
+ // TODO(buchgr): Due to b/113035235 exitFunc needs to be called before the close future
+ // completes.
+ failCloseFuture(e);
+ }
+ } catch (StatusException e) {
+ try {
+ String message =
+ "The Build Event Protocol upload failed: " + besClient.userReadableError(e);
+ logInfo(e, message);
+ exitFunc.accept(message, e);
+ } finally {
+ failCloseFuture(e);
+ }
+ } catch (LocalFileUploadException e) {
+ try {
+ String message =
+ "The Build Event Protocol local file upload failed: " + e.getCause().getMessage();
+ logInfo((Exception) e.getCause(), message);
+ exitFunc.accept(message, (Exception) e.getCause());
+ } finally {
+ failCloseFuture((Exception) e.getCause());
+ }
+ } catch (RuntimeException e) {
+ failCloseFuture((Exception) e.getCause());
+ logError(e, "BES upload failed due to a RuntimeException. This is a bug.");
+ throw e;
} finally {
try {
besClient.shutdown();
} finally {
- artifactUploader.shutdown();
+ localFileUploader.shutdown();
}
}
- return null;
}
- private void publishLifecycleEvent(PublishLifecycleEventRequest request) throws Exception {
- if (publishLifecycleEvents) {
- retryOnException(() -> besClient.publish(request));
- }
- }
- }
+ private void publishBuildEvents()
+ throws StatusException, LocalFileUploadException, InterruptedException {
+ eventQueue.addFirst(new OpenStreamCommand());
- /**
- * Used as method reference, responsible for the entire Streaming RPC. Safe to retry. This method
- * carries over the state between consecutive calls (pendingAck messages will be added to the head
- * of the pendingSend queue), but that is intended behavior.
- */
- private void publishEventStream()
- throws StatusException, LocalFileUploadException, InterruptedException {
- // Reschedule unacked messages if required, keeping its original order.
- InternalOrderedBuildEvent unacked;
- while ((unacked = pendingAck.pollLast()) != null) {
- pendingSend.addFirst(unacked);
- }
- pendingAck = new ConcurrentLinkedDeque<>();
- publishEventStream(pendingAck, pendingSend, besClient);
- }
+ // Every build event sent to the server needs to be acknowledged by it. This queue stores
+ // the build events that have been sent and still have to be acknowledged by the server.
+ // The build events are stored in the order they were sent.
+ ConcurrentLinkedDeque<SendBuildEventCommand> ackQueue = new ConcurrentLinkedDeque<>();
+ boolean lastEventSent = false;
+ int acksReceived = 0;
+ int retryAttempt = 0;
- /** Method responsible for a single Streaming RPC. */
- private void publishEventStream(
- final ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck,
- final BlockingDeque<InternalOrderedBuildEvent> pendingSend,
- final BuildEventServiceClient besClient)
- throws StatusException, LocalFileUploadException, InterruptedException {
- ListenableFuture<Status> stream = besClient.openStream(ackCallback(pendingAck, besClient));
- logger.log(
- Level.INFO,
- String.format(
- "Started PublishBuildToolEventStream RPC (pendingSendCount=%s)", pendingSend.size()));
- try {
- @Nullable InternalOrderedBuildEvent orderedBuildEvent;
- do {
- orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
- if (orderedBuildEvent != null) {
- pendingAck.add(orderedBuildEvent);
- PathConverter pathConverter = waitForLocalFileUploads(orderedBuildEvent);
- besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
- }
- Status streamStatus = getFromStreamFuture(stream);
- if (streamStatus != null) {
- throw streamStatus.augmentDescription("Stream closed prematurely").asException();
- }
- } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
- logger.log(
- Level.INFO,
- String.format(
- "Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
- orderedBuildEvent.isLastEvent(), besClient.isStreamActive()));
- } catch (InterruptedException e) {
- // By convention the interrupted flag should have been cleared,
- // but just to be sure clear it.
- Thread.interrupted();
- besClient.abortStream(
- Status.CANCELLED.augmentDescription("The build event upload was interrupted."));
- throw e;
- } catch (StatusException e) {
- besClient.abortStream(e.getStatus());
- throw e;
- } catch (LocalFileUploadException e) {
- besClient.abortStream(Status.INTERNAL.augmentDescription("Local file upload failed."));
- throw e;
- }
-
- try {
- Status status =
- stream.get(PUBLISH_EVENT_STREAM_FINISHED_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
- logger.log(Level.INFO, "Done with publishEventStream(). Status: " + status);
- if (!status.isOk()) {
- throw status.asException();
- }
- } catch (InterruptedException e) {
- // By convention the interrupted flag should have been cleared,
- // but just to be sure clear it.
- Thread.interrupted();
- String additionalDetails = "Waiting for ACK messages.";
- besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetails));
- throw e;
- } catch (TimeoutException e) {
- String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
- logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
- besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
- throw Status.DEADLINE_EXCEEDED.augmentDescription(additionalDetail).asException();
- } catch (ExecutionException e) {
- throw new IllegalStateException(
- "The stream future is expected to never fail per API contract", e);
- }
- }
-
- @Nullable
- private Status getFromStreamFuture(ListenableFuture<Status> stream) throws InterruptedException {
- if (stream.isDone()) {
try {
- return stream.get();
+ // OPEN_STREAM is the first event and opens a bidi streaming RPC for sending build events
+ // and receiving ACKs.
+ // SEND_BUILD_EVENT sends a build event to the server. Sending of the Nth build event does
+ // does not wait for the ACK of the N-1th build event to have been received.
+ // SEND_LAST_EVENT sends the last build event and half closes the RPC.
+ // ACK_RECEIVED is executed for every ACK from the server and checks that the ACKs are in
+ // the correct order.
+ // STREAM_COMPLETE checks that all build events have been sent and all ACKs have been
+ // received. If not it invokes a retry logic that may decide to re-send every build event
+ // for which an ACK has not been received. If so, it adds an OPEN_STREAM event.
+ while (true) {
+ EventLoopCommand event = eventQueue.takeFirst();
+ switch (event.type()) {
+ case OPEN_STREAM:
+ {
+ // Invariant: the eventQueue only contains events of type SEND_BUILD_EVENT
+ // or SEND_LAST_EVENT
+ logInfo("Starting publishBuildEvents: eventQueue=%d", eventQueue.size());
+ ListenableFuture<Status> streamFuture =
+ besClient.openStream(
+ (ack) ->
+ eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber())));
+ addStreamStatusListener(
+ streamFuture,
+ (status) -> eventQueue.addLast(new StreamCompleteCommand(status)));
+ }
+ break;
+
+ case SEND_BUILD_EVENT:
+ {
+ // Invariant: the eventQueue may contain events of any type
+ SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
+ ackQueue.addLast(buildEvent);
+ PathConverter pathConverter = waitForLocalFileUploads(buildEvent);
+ besClient.sendOverStream(buildEvent.serialize(pathConverter));
+ }
+ break;
+
+ case SEND_LAST_EVENT:
+ {
+ // Invariant: the eventQueue may contain events of any type
+ SendLastBuildEventCommand lastEvent = (SendLastBuildEventCommand) event;
+ ackQueue.addLast(lastEvent);
+ lastEventSent = true;
+ besClient.sendOverStream(lastEvent.serialize());
+ besClient.halfCloseStream();
+ }
+ break;
+
+ case ACK_RECEIVED:
+ {
+ // Invariant: the eventQueue may contain events of any type
+ AckReceivedCommand ackEvent = (AckReceivedCommand) event;
+ if (!ackQueue.isEmpty()) {
+ SendBuildEventCommand expected = ackQueue.removeFirst();
+ long actualSeqNum = ackEvent.getSequenceNumber();
+ if (expected.getSequenceNumber() == actualSeqNum) {
+ acksReceived++;
+ } else {
+ ackQueue.addFirst(expected);
+ String message =
+ String.format(
+ "Expected ACK with seqNum=%d but received ACK with seqNum=%d",
+ expected.getSequenceNumber(), actualSeqNum);
+ logInfo(message);
+ besClient.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
+ }
+ } else {
+ String message =
+ String.format(
+ "Received ACK (seqNum=%d) when no ACK was expected",
+ ackEvent.getSequenceNumber());
+ logInfo(message);
+ besClient.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
+ }
+ }
+ break;
+
+ case STREAM_COMPLETE:
+ {
+ // Invariant: the eventQueue only contains events of type SEND_BUILD_EVENT
+ // or SEND_LAST_EVENT
+ StreamCompleteCommand completeEvent = (StreamCompleteCommand) event;
+ Status streamStatus = completeEvent.status();
+ if (streamStatus.isOk()) {
+ if (lastEventSent && ackQueue.isEmpty()) {
+ logInfo("publishBuildEvents was successful");
+ // Upload successful. Break out from the while(true) loop.
+ return;
+ } else {
+ throw (lastEventSent
+ ? ackQueueNotEmptyStatus(ackQueue.size())
+ : lastEventNotSentStatus())
+ .asException();
+ }
+ }
+
+ if (!shouldRetryStatus(streamStatus)) {
+ logInfo("Not retrying publishBuildEvents: status='%s'", streamStatus);
+ throw streamStatus.asException();
+ }
+ if (retryAttempt == MAX_NUM_RETRIES) {
+ logInfo(
+ "Not retrying publishBuildEvents, no more attempts left: status='%s'",
+ streamStatus);
+ throw streamStatus.asException();
+ }
+
+ // Retry logic
+ // Adds events from the ackQueue to the front of the eventQueue, so that the
+ // events in the eventQueue are sorted by sequence number (ascending).
+ SendBuildEventCommand unacked;
+ while ((unacked = ackQueue.pollLast()) != null) {
+ eventQueue.addFirst(unacked);
+ }
+
+ long sleepMillis = retrySleepMillis(retryAttempt);
+ logInfo(
+ "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d",
+ streamStatus, sleepMillis);
+ sleeper.sleepMillis(sleepMillis);
+
+ // If we made progress, meaning the server ACKed events that we sent, then reset
+ // the retry counter to 0.
+ if (acksReceived > 0) {
+ retryAttempt = 0;
+ } else {
+ retryAttempt++;
+ }
+ acksReceived = 0;
+ eventQueue.addFirst(new OpenStreamCommand());
+ }
+ break;
+ }
+ }
+ } catch (InterruptedException | LocalFileUploadException e) {
+ besClient.abortStream(Status.CANCELLED);
+ throw e;
+ } finally {
+ // Cancel all local file uploads that may still be running
+ // of events that haven't been uploaded.
+ EventLoopCommand event;
+ while ((event = ackQueue.pollFirst()) != null) {
+ if (event instanceof SendRegularBuildEventCommand) {
+ cancelLocalFileUpload((SendRegularBuildEventCommand) event);
+ }
+ }
+ while ((event = eventQueue.pollFirst()) != null) {
+ if (event instanceof SendRegularBuildEventCommand) {
+ cancelLocalFileUpload((SendRegularBuildEventCommand) event);
+ }
+ }
+ }
+ }
+
+ private void cancelLocalFileUpload(SendRegularBuildEventCommand event) {
+ ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress();
+ if (!localFileUploaderFuture.isDone()) {
+ localFileUploaderFuture.cancel(true);
+ }
+ }
+
+ /** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */
+ private void publishLifecycleEvent(PublishLifecycleEventRequest request)
+ throws StatusException, InterruptedException {
+ int retryAttempt = 0;
+ StatusException cause = null;
+ while (retryAttempt <= MAX_NUM_RETRIES) {
+ try {
+ besClient.publish(request);
+ return;
+ } catch (StatusException e) {
+ if (!shouldRetryStatus(e.getStatus())) {
+ logInfo("Not retrying publishLifecycleEvent: status='%s'", e.getStatus().toString());
+ throw e;
+ }
+
+ cause = e;
+
+ long sleepMillis = retrySleepMillis(retryAttempt);
+ logInfo(
+ "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d",
+ e.getStatus().toString(), sleepMillis);
+ sleeper.sleepMillis(sleepMillis);
+ retryAttempt++;
+ }
+ }
+
+ // All retry attempts failed
+ throw cause;
+ }
+
+ private ListenableFuture<PathConverter> uploadReferencedLocalFiles(
+ Collection<LocalFile> localFiles) {
+ Map<Path, LocalFile> localFileMap = new TreeMap<>();
+ for (LocalFile localFile : localFiles) {
+ // It is possible for targets to have duplicate artifacts (same path but different owners)
+ // in their output groups. Since they didn't trigger an artifact conflict they are the
+ // same file, so just skip either one
+ localFileMap.putIfAbsent(localFile.path, localFile);
+ }
+ return localFileUploader.upload(localFileMap);
+ }
+
+ private void ensureUploadThreadStarted() {
+ synchronized (lock) {
+ if (uploadThread == null) {
+ uploadThread = new Thread(this, "bes-uploader");
+ uploadThread.start();
+ }
+ }
+ }
+
+ private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) {
+ Thread closeTimer =
+ new Thread(
+ () -> {
+ // Call closeNow() if the future does not complete within closeTimeout
+ try {
+ closeFuture.get(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ closeNow(/*causedByTimeout=*/ true);
+ } catch (ExecutionException e) {
+ // Intentionally left empty, because this code only cares about
+ // calling closeNow() if the closeFuture does not complete within
+ // closeTimeout.
+ }
+ },
+ "bes-uploader-close-timer");
+ closeTimer.start();
+ }
+
+ private void failCloseFuture(Exception cause) {
+ synchronized (lock) {
+ if (closeFuture == null) {
+ closeFuture = SettableFuture.create();
+ }
+ closeFuture.setException(cause);
+ }
+ }
+
+ private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
+ throws LocalFileUploadException, InterruptedException {
+ try {
+ // Wait for the local file upload to have been completed.
+ return orderedBuildEvent.localFileUploadProgress().get();
} catch (ExecutionException e) {
- throw new IllegalStateException(
- "The stream future is expected to never fail per API contract", e);
+ logger.log(
+ Level.WARNING,
+ String.format(
+ "Failed to upload local files referenced by build event: %s", e.getMessage()),
+ e);
+ throw new LocalFileUploadException((Exception) e.getCause());
}
}
- return null;
- }
- private PathConverter waitForLocalFileUploads(InternalOrderedBuildEvent orderedBuildEvent)
- throws LocalFileUploadException, InterruptedException {
- try {
- // Wait for the local file upload to have been completed.
- return orderedBuildEvent.localFileUploadProgress().get();
- } catch (ExecutionException e) {
- logger.log(
- Level.WARNING,
+ private Timestamp currentTime() {
+ return Timestamps.fromMillis(clock.currentTimeMillis());
+ }
+
+ private static Result extractBuildStatus(BuildCompletingEvent event) {
+ if (event.getExitCode() != null && event.getExitCode().getNumericExitCode() == 0) {
+ return COMMAND_SUCCEEDED;
+ } else {
+ return COMMAND_FAILED;
+ }
+ }
+
+ private static Status lastEventNotSentStatus() {
+ return Status.FAILED_PRECONDITION.withDescription(
+ "Server closed stream with status OK but not all events have been sent");
+ }
+
+ private static Status ackQueueNotEmptyStatus(int ackQueueSize) {
+ return Status.FAILED_PRECONDITION.withDescription(
String.format(
- "Failed to upload local files referenced by build event: %s", e.getMessage()),
- e);
- throw new LocalFileUploadException(e.getCause());
+ "Server closed stream with status OK but not all ACKs have been"
+ + " received (ackQueue=%d)",
+ ackQueueSize));
}
- }
- private static class LocalFileUploadException extends Exception {
+ private static void addStreamStatusListener(
+ ListenableFuture<Status> stream, Consumer<Status> onDone) {
+ Futures.addCallback(
+ stream,
+ new FutureCallback<Status>() {
+ @Override
+ public void onSuccess(Status result) {
+ onDone.accept(result);
+ }
- public LocalFileUploadException(Throwable cause) {
- super(cause);
+ @Override
+ public void onFailure(Throwable t) {}
+ },
+ MoreExecutors.directExecutor());
}
- }
- private Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
- final Deque<InternalOrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
- return ack -> {
- Preconditions.checkNotNull(ack);
- long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
- long ackSeq = ack.getSequenceNumber();
- if (pendingSeq != ackSeq) {
- besClient.abortStream(
- Status.INTERNAL.augmentDescription(
- format("Expected ACK %s but was %s.", pendingSeq, ackSeq)));
- return null;
+ private static boolean shouldRetryStatus(Status status) {
+ return !status.getCode().equals(Code.INVALID_ARGUMENT)
+ && !status.getCode().equals(Code.FAILED_PRECONDITION);
+ }
+
+ private static long retrySleepMillis(int attempt) {
+ // This somewhat matches the backoff used for gRPC connection backoffs.
+ return (long) (DELAY_MILLIS * Math.pow(1.6, attempt));
+ }
+
+ private static void logInfo(String message, Object... args) {
+ logger.log(Level.INFO, String.format(message, args));
+ }
+
+ private static void logInfo(Exception cause, String message, Object... args) {
+ logger.log(Level.INFO, String.format(message, args), cause);
+ }
+
+ private static void logError(Exception cause, String message, Object... args) {
+ logger.log(Level.SEVERE, String.format(message, args), cause);
+ }
+
+ /** A command that may be added to the {@code eventQueue}. */
+ private interface EventLoopCommand {
+
+ /**
+ * The event types are used to switch between states in the event loop in {@link
+ * #publishBuildEvents()}
+ */
+ enum Type {
+ /** Tells the event loop to open a new BES stream */
+ OPEN_STREAM,
+ /** Tells the event loop to send the build event */
+ SEND_BUILD_EVENT,
+ /** Tells the event loop that an ACK was received */
+ ACK_RECEIVED,
+ /** Tells the event loop that this is the last event of the stream */
+ SEND_LAST_EVENT,
+ /** Tells the event loop that the streaming RPC completed */
+ STREAM_COMPLETE
}
- InternalOrderedBuildEvent event = pendingAck.removeFirst();
- if (event.isLastEvent()) {
- logger.log(Level.INFO, "Last ACK received.");
- besClient.closeStream();
- }
- acksReceivedSinceLastRetry.incrementAndGet();
- return null;
- };
- }
- /** Executes a {@link Callable} retrying on exception thrown. */
- private void retryOnException(EventUploadCallable c) throws Exception {
- final int maxRetries = 5;
- final long initialDelayMillis = 0;
- final long delayMillis = 1000;
+ Type type();
+ }
- int tries = 0;
- while (tries <= maxRetries) {
- try {
- acksReceivedSinceLastRetry.set(0);
- c.call();
- lastRetryError = null;
- return;
- } catch (LocalFileUploadException e) {
- throw (Exception) e.getCause();
- } catch (StatusException e) {
- if (CODES_NOT_TO_RETRY.contains(e.getStatus().getCode())) {
- throw e;
- }
+ @Immutable
+ private static final class OpenStreamCommand implements EventLoopCommand {
- if (acksReceivedSinceLastRetry.get() > 0) {
- logger.fine(
- String.format(
- "ACKs received since last retry %d.", acksReceivedSinceLastRetry.get()));
- tries = 0;
- }
- tries++;
- lastRetryError = e;
- long sleepMillis;
- if (tries == 1) {
- sleepMillis = initialDelayMillis;
- } else {
- // This roughly matches the gRPC connection backoff.
- sleepMillis = (long) (delayMillis * Math.pow(1.6, tries));
- }
- String message = String.format("Retrying RPC to BES. Backoff %s ms.", sleepMillis);
- logger.log(Level.INFO, message, lastRetryError);
- sleeper.sleepMillis(sleepMillis);
+ @Override
+ public Type type() {
+ return Type.OPEN_STREAM;
}
}
- Preconditions.checkNotNull(lastRetryError);
- throw lastRetryError;
- }
- private void report(EventKind eventKind, String msg, Object... parameters) {
- commandLineReporter.handle(Event.of(eventKind, null, format(msg, parameters)));
- }
+ @Immutable
+ private static final class StreamCompleteCommand implements EventLoopCommand {
- @Nullable
- private static Throwable fromFuture(Future<?> f) {
- if (!f.isDone()) {
- return null;
- }
- try {
- f.get();
- return null;
- } catch (Throwable t) {
- return t;
- }
- }
+ private final Status status;
- /**
- * Representation of an {@link com.google.devtools.build.v1.OrderedBuildEvent} internal to the
- * {@link BuildEventServiceTransport}. This class wraps around the {@link
- * com.google.devtools.build.v1.OrderedBuildEvent} to simplify the retry logic, so that we don't
- * have to separately store events before the first send attempt (non-serialized) and after
- * (serialized).
- */
- private interface InternalOrderedBuildEvent {
-
- /** Returns whether this is the last event. */
- boolean isLastEvent();
-
- /** Returns the immutable sequence number for this event. */
- int getSequenceNumber();
-
- /** Returns the immutable Timestamp for this event. */
- Timestamp getTimestamp();
-
- ListenableFuture<PathConverter> localFileUploadProgress();
-
- PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
- }
-
- private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
- private final BuildEvent event;
- private final ArtifactGroupNamer artifactGroupNamer;
- private final ListenableFuture<PathConverter> artifactUpload;
- private final int sequenceNumber;
- private final Timestamp timestamp;
-
- DefaultInternalOrderedBuildEvent(
- BuildEvent event,
- ArtifactGroupNamer artifactGroupNamer,
- ListenableFuture<PathConverter> artifactUpload,
- int sequenceNumber,
- Timestamp timestamp) {
- this.event = Preconditions.checkNotNull(event);
- this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
- this.artifactUpload = artifactUpload;
- this.sequenceNumber = sequenceNumber;
- this.timestamp = timestamp;
- }
-
- @Override
- public boolean isLastEvent() {
- return false;
- }
-
- @Override
- public int getSequenceNumber() {
- return sequenceNumber;
- }
-
- public Timestamp getTimestamp() {
- return timestamp;
- }
-
- @Override
- public ListenableFuture<PathConverter> localFileUploadProgress() {
- return artifactUpload;
- }
-
- @Override
- public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
- BuildEventStreamProtos.BuildEvent eventProto =
- event.asStreamProto(
- new BuildEventContext() {
- @Override
- public PathConverter pathConverter() {
- return pathConverter;
- }
-
- @Override
- public ArtifactGroupNamer artifactGroupNamer() {
- return artifactGroupNamer;
- }
-
- @Override
- public BuildEventProtocolOptions getOptions() {
- return protocolOptions;
- }
- });
- if (eventProto.getSerializedSize()
- > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) {
- internalEventBus.post(new LargeBuildEventSerializedEvent(
- event.getEventId().toString(), eventProto.getSerializedSize()));
+ public StreamCompleteCommand(Status status) {
+ this.status = status;
}
- return besProtoUtil.bazelEvent(getSequenceNumber(), getTimestamp(), Any.pack(eventProto));
- }
- }
- private class LastInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
- private final int sequenceNumber;
- private final Timestamp timestamp;
+ public Status status() {
+ return status;
+ }
- LastInternalOrderedBuildEvent(int sequenceNumber, Timestamp timestamp) {
- this.sequenceNumber = sequenceNumber;
- this.timestamp = timestamp;
+ @Override
+ public Type type() {
+ return Type.STREAM_COMPLETE;
+ }
}
- @Override
- public boolean isLastEvent() {
- return true;
+ @Immutable
+ private static final class AckReceivedCommand implements EventLoopCommand {
+
+ private final long sequenceNumber;
+
+ public AckReceivedCommand(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public Type type() {
+ return Type.ACK_RECEIVED;
+ }
}
- @Override
- public int getSequenceNumber() {
- return sequenceNumber;
+ private interface SendBuildEventCommand extends EventLoopCommand {
+
+ long getSequenceNumber();
}
- public Timestamp getTimestamp() {
- return timestamp;
+ private final class SendRegularBuildEventCommand implements SendBuildEventCommand {
+
+ private final BuildEvent event;
+ private final ArtifactGroupNamer namer;
+ private final ListenableFuture<PathConverter> localFileUpload;
+ private final long sequenceNumber;
+ private final Timestamp creationTime;
+
+ private SendRegularBuildEventCommand(
+ BuildEvent event,
+ ArtifactGroupNamer namer,
+ ListenableFuture<PathConverter> localFileUpload,
+ long sequenceNumber,
+ Timestamp creationTime) {
+ this.event = event;
+ this.namer = namer;
+ this.localFileUpload = localFileUpload;
+ this.sequenceNumber = sequenceNumber;
+ this.creationTime = creationTime;
+ }
+
+ public ListenableFuture<PathConverter> localFileUploadProgress() {
+ return localFileUpload;
+ }
+
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+ BuildEventContext ctx =
+ new BuildEventContext() {
+ @Override
+ public PathConverter pathConverter() {
+ return pathConverter;
+ }
+
+ @Override
+ public ArtifactGroupNamer artifactGroupNamer() {
+ return namer;
+ }
+
+ @Override
+ public BuildEventProtocolOptions getOptions() {
+ return protocolOptions;
+ }
+ };
+ BuildEventStreamProtos.BuildEvent serializedBepEvent = event.asStreamProto(ctx);
+ buildEventLogger.log(serializedBepEvent);
+ return besProtoUtil.bazelEvent(sequenceNumber, creationTime, Any.pack(serializedBepEvent));
+ }
+
+ @Override
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public Type type() {
+ return Type.SEND_BUILD_EVENT;
+ }
}
- @Override
- public ListenableFuture<PathConverter> localFileUploadProgress() {
- return Futures.immediateFuture(PathConverter.NO_CONVERSION);
+ @Immutable
+ private final class SendLastBuildEventCommand implements SendBuildEventCommand {
+
+ private final long sequenceNumber;
+ private final Timestamp creationTime;
+
+ SendLastBuildEventCommand(long sequenceNumber, Timestamp creationTime) {
+ this.sequenceNumber = sequenceNumber;
+ this.creationTime = creationTime;
+ }
+
+ public PublishBuildToolEventStreamRequest serialize() {
+ return besProtoUtil.streamFinished(sequenceNumber, creationTime);
+ }
+
+ @Override
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public Type type() {
+ return Type.SEND_LAST_EVENT;
+ }
}
- @Override
- public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
- return besProtoUtil.streamFinished(getSequenceNumber(), getTimestamp());
- }
- }
+ private static class LocalFileUploadException extends Exception {
- private interface EventUploadCallable {
- void call() throws StatusException, LocalFileUploadException, InterruptedException;
+ public LocalFileUploadException(Exception cause) {
+ super(cause);
+ }
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index 2f571dd..3308295 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -25,6 +25,17 @@
/** Interface used to abstract the Stubby and gRPC client implementations. */
public interface BuildEventServiceClient {
+ /** Callback for ACKed build events. */
+ @FunctionalInterface
+ interface AckCallback {
+
+ /**
+ * Called whenever an ACK from the BES server is received. ACKs are expected to be received in
+ * sequence. Implementations need to be thread-safe.
+ */
+ void apply(PublishBuildToolEventStreamResponse ack);
+ }
+
/** Makes a blocking RPC call that publishes a {@code lifecycleEvent}. */
void publish(PublishLifecycleEventRequest lifecycleEvent)
throws StatusException, InterruptedException;
@@ -34,22 +45,23 @@
* future in order to guarantee that all callback calls have been received. The returned future
* will never fail, but in case of error will contain a corresponding status.
*/
- ListenableFuture<Status> openStream(
- Function<PublishBuildToolEventStreamResponse, Void> ackCallback)
- throws StatusException, InterruptedException;
+ ListenableFuture<Status> openStream(AckCallback callback) throws InterruptedException;
/**
- * Sends an event over the currently open stream. This method may block due to flow control.
+ * Sends an event over the currently open stream. In case of error, this method will fail silently
+ * and report the error via the {@link ListenableFuture} returned by {@link
+ * #openStream(AckCallback)}.
+ *
+ * <p>This method may block due to flow control.
*/
- void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
- throws StatusException, InterruptedException;
+ void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws InterruptedException;
/**
- * Closes the currently opened stream. This method does not block. Callers should block on
- * the future returned by {@link #openStream(Function)} in order to make sure that all
- * {@code ackCallback} calls have been received.
+ * Half closes the currently opened stream. This method does not block. Callers should block on
+ * the future returned by {@link #openStream(Function)} in order to make sure that all {@code
+ * ackCallback} calls have been received.
*/
- void closeStream();
+ void halfCloseStream();
/**
* Closes the currently opened stream with error. This method does not block. Callers should block
@@ -59,19 +71,10 @@
void abortStream(Status status);
/**
- * Checks if there is a currently an active stream.
- *
- * @return {@code true} if the current stream is active, false otherwise.
- */
- boolean isStreamActive();
-
- /**
* Called once to dispose resources that this client might be holding (such as thread pools). This
* should be the last method called on this object.
- *
- * @throws InterruptedException
*/
- void shutdown() throws InterruptedException;
+ void shutdown();
/**
* If possible, returns a user readable error message for a given {@link Throwable}.
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 4bc0914..e7e5eaf 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.buildeventservice.client;
-import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Preconditions;
@@ -35,7 +34,6 @@
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.function.Function;
import javax.annotation.Nullable;
/** Implementation of BuildEventServiceClient that uploads data using gRPC. */
@@ -46,6 +44,7 @@
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;
private volatile StreamObserver<PublishBuildToolEventStreamRequest> stream;
+ private volatile SettableFuture<Status> streamStatus;
public BuildEventServiceGrpcClient(Channel channel, @Nullable CallCredentials callCredentials) {
this.besAsync = withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials);
@@ -62,6 +61,7 @@
@Override
public void publish(PublishLifecycleEventRequest lifecycleEvent)
throws StatusException, InterruptedException {
+ throwIfInterrupted();
try {
besBlocking
.withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
@@ -73,61 +73,67 @@
}
@Override
- public ListenableFuture<Status> openStream(
- Function<PublishBuildToolEventStreamResponse, Void> ackCallback)
- throws StatusException, InterruptedException {
+ public ListenableFuture<Status> openStream(AckCallback ackCallback) throws InterruptedException {
Preconditions.checkState(
stream == null, "Starting a new stream without closing the previous one");
- SettableFuture<Status> streamFinished = SettableFuture.create();
- stream = createStream(ackCallback, streamFinished);
- return streamFinished;
- }
-
- private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
- final Function<PublishBuildToolEventStreamResponse, Void> ackCallback,
- final SettableFuture<Status> streamFinished)
- throws StatusException, InterruptedException {
+ streamStatus = SettableFuture.create();
try {
- return besAsync.publishBuildToolEventStream(
- new StreamObserver<PublishBuildToolEventStreamResponse>() {
- @Override
- public void onNext(PublishBuildToolEventStreamResponse response) {
- ackCallback.apply(response);
- }
+ stream =
+ besAsync.publishBuildToolEventStream(
+ new StreamObserver<PublishBuildToolEventStreamResponse>() {
+ @Override
+ public void onNext(PublishBuildToolEventStreamResponse response) {
+ ackCallback.apply(response);
+ }
- @Override
- public void onError(Throwable t) {
- stream = null;
- streamFinished.set(Status.fromThrowable(t));
- }
+ @Override
+ public void onError(Throwable t) {
+ Status error = Status.fromThrowable(t);
+ if (error.getCode() == Status.CANCELLED.getCode()
+ && error.getCause() != null
+ && Status.fromThrowable(error.getCause()).getCode()
+ != Status.UNKNOWN.getCode()) {
+ // gRPC likes to wrap Status(Runtime)Exceptions in StatusRuntimeExceptions. If
+ // the status is cancelled and has a Status(Runtime)Exception as a cause it
+ // means the error was generated client side.
+ error = Status.fromThrowable(error.getCause());
+ }
+ stream = null;
+ streamStatus.set(error);
+ streamStatus = null;
+ }
- @Override
- public void onCompleted() {
- stream = null;
- streamFinished.set(Status.OK);
- }
- });
+ @Override
+ public void onCompleted() {
+ stream = null;
+ streamStatus.set(Status.OK);
+ streamStatus = null;
+ }
+ });
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
- throw e.getStatus().asException();
+ setStreamStatus(Status.fromThrowable(e));
}
+ return streamStatus;
}
@Override
public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
- throws StatusException, InterruptedException {
+ throws InterruptedException {
+ throwIfInterrupted();
StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
- checkState(stream0 != null, "Attempting to send over a closed stream");
try {
- stream0.onNext(buildEvent);
+ if (stream0 != null) {
+ stream0.onNext(buildEvent);
+ }
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
- throw e.getStatus().asException();
+ setStreamStatus(Status.fromThrowable(e));
}
}
@Override
- public void closeStream() {
+ public void halfCloseStream() {
StreamObserver<PublishBuildToolEventStreamRequest> stream0 = stream;
if (stream0 != null) {
stream0.onCompleted();
@@ -143,11 +149,6 @@
}
@Override
- public boolean isStreamActive() {
- return stream != null;
- }
-
- @Override
public String userReadableError(Throwable t) {
if (t instanceof StatusException) {
Throwable rootCause = Throwables.getRootCause(t);
@@ -160,5 +161,18 @@
}
@Override
- public abstract void shutdown() throws InterruptedException;
+ public abstract void shutdown();
+
+ private void setStreamStatus(Status status) {
+ SettableFuture<Status> streamStatus0 = streamStatus;
+ if (streamStatus0 != null) {
+ streamStatus0.set(status);
+ }
+ }
+
+ private static void throwIfInterrupted() throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/ManagedBuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/ManagedBuildEventServiceGrpcClient.java
index f075215..dc43a8a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/ManagedBuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/ManagedBuildEventServiceGrpcClient.java
@@ -31,7 +31,7 @@
}
@Override
- public void shutdown() throws InterruptedException {
+ public void shutdown() {
channel.shutdown();
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/UnmanagedBuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/UnmanagedBuildEventServiceGrpcClient.java
index 2c0a551..9f8b934 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/UnmanagedBuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/UnmanagedBuildEventServiceGrpcClient.java
@@ -28,7 +28,7 @@
}
@Override
- public void shutdown() throws InterruptedException {
+ public void shutdown() {
// Nothing to do. We handle an unmanaged channel so it's not our responsibility to shut it down.
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
index 5292976..87ed5e4 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventStreamer.java
@@ -79,6 +79,8 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
/**
* Listens for {@link BuildEvent}s and streams them to the provided {@link BuildEventTransport}s.
@@ -136,8 +138,12 @@
String getErr();
}
+ @ThreadSafe
private static class CountingArtifactGroupNamer implements ArtifactGroupNamer {
+ @GuardedBy("this")
private final Map<Object, Long> reportedArtifactNames = new HashMap<>();
+
+ @GuardedBy("this")
private long nextArtifactName;
@Override
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java b/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java
index d1078fc..78fcaff 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java
@@ -146,7 +146,7 @@
/* buildRequestId= */ "foo",
/* invocationId= */ "bar",
commandName,
- /*internalEventBus =*/new EventBus());
+ /*eventbus=*/ new EventBus());
}
@Test
diff --git a/src/test/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtilTest.java b/src/test/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtilTest.java
index c97ac5a..7e089cd 100644
--- a/src/test/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtilTest.java
+++ b/src/test/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtilTest.java
@@ -59,7 +59,6 @@
BUILD_INVOCATION_ID,
PROJECT_ID,
COMMAND_NAME,
- clock,
ImmutableSet.of(ADDITIONAL_KEYWORD));
@Test