blob: 16b9e9e98a4d29fcdbc4e63ee03dedf57dfa9976 [file] [log] [blame]
// Copyright 2019 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.eventbus.EventBus;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendBuildEventCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendLastBuildEventCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendRegularBuildEventCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.StreamCompleteCommand;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext;
import com.google.devtools.build.lib.buildeventstream.AbortedEvent;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.profiler.GoogleAutoProfilerUtils;
import com.google.devtools.build.lib.server.FailureDetails.BuildProgress;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.DetailedExitCode;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.v1.BuildStatus.Result;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.protobuf.Any;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
/**
* Uploader of Build Events to the Build Event Service (BES).
*
* <p>The purpose is of this class is to manage the interaction between the BES client and the BES
* server. It implements the event loop pattern based on the commands defined by {@link
* BuildEventServiceUploaderCommands}.
*/
// TODO(lpino): This class should be package-private but there are unit tests that are in the
// different packages and rely on this.
@VisibleForTesting
public final class BuildEventServiceUploader implements Runnable {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
/** Configuration knobs related to RPC retries. Values chosen by good judgement. */
private static final int MAX_NUM_RETRIES =
Integer.parseInt(System.getProperty("BAZEL_BES_NUM_RETRIES_ON_RPC_FAILURE", "4"));
private static final int DELAY_MILLIS = 1000;
private final BuildEventServiceClient besClient;
private final BuildEventArtifactUploader buildEventUploader;
private final BuildEventServiceProtoUtil besProtoUtil;
private final BuildEventProtocolOptions buildEventProtocolOptions;
private final boolean publishLifecycleEvents;
private final Sleeper sleeper;
private final Clock clock;
private final ArtifactGroupNamer namer;
private final EventBus eventBus;
// `commandStartTime` is an instant in time determined by the build tool's native launcher and
// matches `BuildStartingEvent.getRequest().getStartTime()`.
private final Timestamp commandStartTime;
// `eventStreamStartTime` is an instant *after* `commandStartTime` indicating when the
// BuildEventServiceUploader was initialized to begin reporting build events. This instant should
// be *before* the event_time for any BuildEvents uploaded after they are received via
// `#enqueueEvent(BuildEvent)`.
private final Timestamp eventStreamStartTime;
private boolean startedClose = false;
private final ScheduledExecutorService timeoutExecutor =
MoreExecutors.listeningDecorator(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("bes-uploader-timeout-%d").build()));
/**
* The event queue contains two types of events: - Build events, sorted by sequence number, that
* should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to
* change state.
*/
private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>();
/**
* Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be
* consecutive monotonically increasing natural numbers.
*/
private final AtomicLong nextSeqNum = new AtomicLong(1);
private final Object lock = new Object();
@GuardedBy("lock")
private Result buildStatus = UNKNOWN_STATUS;
private final SettableFuture<Void> closeFuture = SettableFuture.create();
private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
/**
* The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
* on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes
* first).
*/
@GuardedBy("lock")
private Thread uploadThread;
@GuardedBy("lock")
private boolean interruptCausedByCancel;
private StreamContext streamContext;
private BuildEventServiceUploader(
BuildEventServiceClient besClient,
BuildEventArtifactUploader localFileUploader,
BuildEventServiceProtoUtil besProtoUtil,
BuildEventProtocolOptions buildEventProtocolOptions,
boolean publishLifecycleEvents,
Sleeper sleeper,
Clock clock,
ArtifactGroupNamer namer,
EventBus eventBus,
Timestamp commandStartTime) {
this.besClient = besClient;
this.buildEventUploader = localFileUploader;
this.besProtoUtil = besProtoUtil;
this.buildEventProtocolOptions = buildEventProtocolOptions;
this.publishLifecycleEvents = publishLifecycleEvents;
this.sleeper = sleeper;
this.clock = clock;
this.namer = namer;
this.eventBus = eventBus;
this.commandStartTime = commandStartTime;
this.eventStreamStartTime = currentTime();
// Ensure the half-close future is closed once the upload is complete. This is usually a no-op,
// but makes sure we half-close in case of error / interrupt.
closeFuture.addListener(
() -> halfCloseFuture.setFuture(closeFuture), MoreExecutors.directExecutor());
}
BuildEventArtifactUploader getBuildEventUploader() {
return buildEventUploader;
}
/** Enqueues an event for uploading to a BES backend. */
void enqueueEvent(BuildEvent event) {
// This needs to happen outside a synchronized block as it may trigger
// stdout/stderr and lead to a deadlock. See b/109725432
ListenableFuture<PathConverter> localFileUploadFuture =
buildEventUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
// The generation of the sequence number and the addition to the {@link #eventQueue} should be
// atomic since BES expects the events in that exact order.
// More details can be found in b/131393380.
// TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order.
synchronized (lock) {
if (startedClose) {
return;
}
// BuildCompletingEvent marks the end of the build in the BEP event stream.
if (event instanceof BuildCompletingEvent) {
ExitCode exitCode = ((BuildCompletingEvent) event).getExitCode();
if (exitCode != null && exitCode.getNumericExitCode() == 0) {
buildStatus = COMMAND_SUCCEEDED;
} else {
buildStatus = COMMAND_FAILED;
}
} else if (event instanceof AbortedEvent && event.getEventId().hasBuildFinished()) {
// An AbortedEvent with a build finished ID means we are crashing.
buildStatus = COMMAND_FAILED;
}
ensureUploadThreadStarted();
// TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's
// always used under lock. It would be cleaner and more performant to update the sequence
// number when we take the item off the queue.
eventQueue.addLast(
new SendRegularBuildEventCommand(
event,
localFileUploadFuture,
nextSeqNum.getAndIncrement(),
Timestamps.fromMillis(clock.currentTimeMillis())));
}
}
/**
* Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded
* and events enqueued after the call will be discarded.
*
* <p>The returned future completes when the upload completes. It's guaranteed to never fail.
*/
public ListenableFuture<Void> close() {
ensureUploadThreadStarted();
// The generation of the sequence number and the addition to the {@link #eventQueue} should be
// atomic since BES expects the events in that exact order.
// More details can be found in b/131393380.
// TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order.
synchronized (lock) {
if (startedClose) {
return closeFuture;
}
startedClose = true;
// Enqueue the last event which will terminate the upload.
// TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's
// always used under lock. It would be cleaner and more performant to update the sequence
// number when we take the item off the queue.
eventQueue.addLast(
new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
}
final SettableFuture<Void> finalCloseFuture = closeFuture;
closeFuture.addListener(
() -> {
// Make sure to cancel any pending uploads if the closing is cancelled.
if (finalCloseFuture.isCancelled()) {
closeOnCancel();
}
},
MoreExecutors.directExecutor());
return closeFuture;
}
private void closeOnCancel() {
synchronized (lock) {
interruptCausedByCancel = true;
closeNow();
}
}
/** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
private void closeNow() {
synchronized (lock) {
if (uploadThread != null) {
if (uploadThread.isInterrupted()) {
return;
}
uploadThread.interrupt();
}
}
}
ListenableFuture<Void> getHalfCloseFuture() {
return halfCloseFuture;
}
private DetailedExitCode logAndSetException(
String message, BuildProgress.Code bpCode, Throwable cause) {
logger.atSevere().log("%s", message);
DetailedExitCode detailedExitCode =
DetailedExitCode.of(
FailureDetail.newBuilder()
.setMessage(message + " " + besClient.userReadableError(cause))
.setBuildProgress(BuildProgress.newBuilder().setCode(bpCode).build())
.build());
closeFuture.setException(new AbruptExitException(detailedExitCode, cause));
return detailedExitCode;
}
@Override
public void run() {
try {
if (publishLifecycleEvents) {
publishLifecycleEvent(besProtoUtil.buildEnqueued(commandStartTime));
publishLifecycleEvent(besProtoUtil.invocationStarted(eventStreamStartTime));
}
try {
publishBuildEvents();
} finally {
if (publishLifecycleEvents) {
Result buildStatus;
synchronized (lock) {
buildStatus = this.buildStatus;
}
publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus));
publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus));
}
}
eventBus.post(BuildEventServiceAvailabilityEvent.ofSuccess());
} catch (InterruptedException e) {
synchronized (lock) {
Preconditions.checkState(
interruptCausedByCancel, "Unexpected interrupt on BES uploader thread");
}
} catch (DetailedStatusException e) {
boolean isTransient = shouldRetryStatus(e.getStatus());
ExitCode exitCode =
isTransient
? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR
: ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR;
DetailedExitCode detailedExitCode = logAndSetException(e.extendedMessage, e.bpCode, e);
eventBus.post(
new BuildEventServiceAvailabilityEvent(exitCode, detailedExitCode.getFailureDetail()));
} catch (LocalFileUploadException e) {
Throwables.throwIfUnchecked(e.getCause());
DetailedExitCode detailedExitCode =
logAndSetException(
"The Build Event Protocol local file upload failed:",
BuildProgress.Code.BES_UPLOAD_LOCAL_FILE_ERROR,
e.getCause());
eventBus.post(
new BuildEventServiceAvailabilityEvent(
ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
detailedExitCode.getFailureDetail()));
} catch (Throwable e) {
closeFuture.setException(e);
logger.atSevere().log("BES upload failed due to a RuntimeException / Error. This is a bug.");
throw e;
} finally {
buildEventUploader.release();
MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
closeFuture.set(null);
}
}
private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent(
PathConverter pathConverter, SendRegularBuildEventCommand buildEvent)
throws InterruptedException {
BuildEventContext ctx =
new BuildEventContext() {
@Override
public PathConverter pathConverter() {
return pathConverter;
}
@Override
public ArtifactGroupNamer artifactGroupNamer() {
return namer;
}
@Override
public BuildEventProtocolOptions getOptions() {
return buildEventProtocolOptions;
}
};
BuildEventStreamProtos.BuildEvent serializedBepEvent = buildEvent.getEvent().asStreamProto(ctx);
// TODO(lpino): Remove this logging once we can make every single event smaller than 1MB
// as protobuf recommends.
if (serializedBepEvent.getSerializedSize()
> LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) {
eventBus.post(
new LargeBuildEventSerializedEvent(
serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize()));
}
return serializedBepEvent;
}
private void publishBuildEvents()
throws DetailedStatusException, LocalFileUploadException, InterruptedException {
eventQueue.addFirst(new OpenStreamCommand());
// Every build event sent to the server needs to be acknowledged by it. This queue stores
// the build events that have been sent and still have to be acknowledged by the server.
// The build events are stored in the order they were sent.
Deque<SendBuildEventCommand> ackQueue = new ArrayDeque<>();
boolean lastEventSent = false;
int acksReceived = 0;
int retryAttempt = 0;
try {
// {@link BuildEventServiceUploaderCommands#OPEN_STREAM} is the first event and opens a
// bidi streaming RPC for sending build events and receiving ACKs.
// {@link BuildEventServiceUploaderCommands#SEND_REGULAR_BUILD_EVENT} sends a build event to
// the server. Sending of the Nth build event does
// does not wait for the ACK of the N-1th build event to have been received.
// {@link BuildEventServiceUploaderCommands#SEND_LAST_BUILD_EVENT} sends the last build event
// and half closes the RPC.
// {@link BuildEventServiceUploaderCommands#ACK_RECEIVED} is executed for every ACK from
// the server and checks that the ACKs are in the correct order.
// {@link BuildEventServiceUploaderCommands#STREAM_COMPLETE} checks that all build events
// have been sent and all ACKs have been received. If not it invokes a retry logic that may
// decide to re-send every build event for which an ACK has not been received. If so, it
// adds an OPEN_STREAM event.
while (true) {
EventLoopCommand event = eventQueue.takeFirst();
switch (event.type()) {
case OPEN_STREAM:
{
// Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
// or SEND_LAST_BUILD_EVENT
logger.atInfo().log("Starting publishBuildEvents: eventQueue=%d", eventQueue.size());
streamContext =
besClient.openStream(
(ack) -> eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber())));
addStreamStatusListener(
streamContext.getStatus(),
(status) -> eventQueue.addLast(new StreamCompleteCommand(status)));
}
break;
case SEND_REGULAR_BUILD_EVENT:
{
// Invariant: the eventQueue may contain events of any type
SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
ackQueue.addLast(buildEvent);
PathConverter pathConverter = waitForUploads(buildEvent);
BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent =
createSerializedRegularBuildEvent(pathConverter, buildEvent);
PublishBuildToolEventStreamRequest request =
besProtoUtil.bazelEvent(
buildEvent.getSequenceNumber(),
buildEvent.getCreationTime(),
Any.pack(serializedRegularBuildEvent));
streamContext.sendOverStream(request);
}
break;
case SEND_LAST_BUILD_EVENT:
{
// Invariant: the eventQueue may contain events of any type
SendBuildEventCommand lastEvent = (SendLastBuildEventCommand) event;
ackQueue.addLast(lastEvent);
lastEventSent = true;
PublishBuildToolEventStreamRequest request =
besProtoUtil.streamFinished(
lastEvent.getSequenceNumber(), lastEvent.getCreationTime());
streamContext.sendOverStream(request);
streamContext.halfCloseStream();
halfCloseFuture.set(null);
logger.atInfo().log("BES uploader is half-closed");
}
break;
case ACK_RECEIVED:
{
// Invariant: the eventQueue may contain events of any type
AckReceivedCommand ackEvent = (AckReceivedCommand) event;
if (!ackQueue.isEmpty()) {
SendBuildEventCommand expected = ackQueue.removeFirst();
long actualSeqNum = ackEvent.getSequenceNumber();
if (expected.getSequenceNumber() == actualSeqNum) {
acksReceived++;
} else {
ackQueue.addFirst(expected);
String message =
String.format(
"Expected ACK with seqNum=%d but received ACK with seqNum=%d",
expected.getSequenceNumber(), actualSeqNum);
logger.atInfo().log("%s", message);
streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
}
} else {
String message =
String.format(
"Received ACK (seqNum=%d) when no ACK was expected",
ackEvent.getSequenceNumber());
logger.atInfo().log("%s", message);
streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
}
}
break;
case STREAM_COMPLETE:
{
// Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
// or SEND_LAST_BUILD_EVENT
streamContext = null;
StreamCompleteCommand completeEvent = (StreamCompleteCommand) event;
Status streamStatus = completeEvent.status();
if (streamStatus.isOk()) {
if (lastEventSent && ackQueue.isEmpty()) {
logger.atInfo().log("publishBuildEvents was successful");
// Upload successful. Break out from the while(true) loop.
return;
} else {
Status status =
lastEventSent
? ackQueueNotEmptyStatus(ackQueue.size())
: lastEventNotSentStatus();
BuildProgress.Code bpCode =
lastEventSent
? BuildProgress.Code.BES_STREAM_COMPLETED_WITH_UNACK_EVENTS_ERROR
: BuildProgress.Code.BES_STREAM_COMPLETED_WITH_UNSENT_EVENTS_ERROR;
throw withFailureDetail(status.asException(), bpCode, status.getDescription());
}
} else if (lastEventSent && ackQueue.isEmpty()) {
throw withFailureDetail(
streamStatus.asException(),
BuildProgress.Code.BES_STREAM_COMPLETED_WITH_REMOTE_ERROR,
streamStatus.getDescription());
}
if (!shouldRetryStatus(streamStatus)) {
String message =
String.format("Not retrying publishBuildEvents: status='%s'", streamStatus);
logger.atInfo().log("%s", message);
throw withFailureDetail(
streamStatus.asException(),
BuildProgress.Code.BES_STREAM_NOT_RETRYING_FAILURE,
message);
}
if (retryAttempt == MAX_NUM_RETRIES) {
String message =
String.format(
"Not retrying publishBuildEvents, no more attempts left: status='%s'",
streamStatus);
logger.atInfo().log("%s", message);
throw withFailureDetail(
streamStatus.asException(),
BuildProgress.Code.BES_UPLOAD_RETRY_LIMIT_EXCEEDED_FAILURE,
message);
}
// Retry logic
// Adds events from the ackQueue to the front of the eventQueue, so that the
// events in the eventQueue are sorted by sequence number (ascending).
SendBuildEventCommand unacked;
while ((unacked = ackQueue.pollLast()) != null) {
eventQueue.addFirst(unacked);
}
long sleepMillis = retrySleepMillis(retryAttempt);
logger.atInfo().log(
"Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis);
sleeper.sleepMillis(sleepMillis);
// If we made progress, meaning the server ACKed events that we sent, then reset
// the retry counter to 0.
if (acksReceived > 0) {
retryAttempt = 0;
} else {
retryAttempt++;
}
acksReceived = 0;
eventQueue.addFirst(new OpenStreamCommand());
}
break;
}
}
} catch (InterruptedException | LocalFileUploadException e) {
int limit = 30;
logger.atInfo().log(
"Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, "
+ "ack_queue: %s, event_queue_size: %d, event_queue: %s",
limit,
ackQueue.size(),
Iterables.limit(ackQueue, limit),
eventQueue.size(),
Iterables.limit(eventQueue, limit));
if (streamContext != null) {
streamContext.abortStream(Status.CANCELLED);
}
throw e;
} finally {
logger.atInfo().log("About to cancel all local file uploads");
try (AutoProfiler ignored =
GoogleAutoProfilerUtils.logged("local file upload cancellation")) {
// Cancel all local file uploads that may still be running
// of events that haven't been uploaded.
EventLoopCommand event;
while ((event = ackQueue.pollFirst()) != null) {
if (event instanceof SendRegularBuildEventCommand) {
cancelLocalFileUpload((SendRegularBuildEventCommand) event);
}
}
while ((event = eventQueue.pollFirst()) != null) {
if (event instanceof SendRegularBuildEventCommand) {
cancelLocalFileUpload((SendRegularBuildEventCommand) event);
}
}
}
}
}
private void cancelLocalFileUpload(SendRegularBuildEventCommand event) {
ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress();
if (!localFileUploaderFuture.isDone()) {
localFileUploaderFuture.cancel(true);
}
}
/** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */
private void publishLifecycleEvent(PublishLifecycleEventRequest request)
throws DetailedStatusException, InterruptedException {
int retryAttempt = 0;
StatusException cause = null;
while (retryAttempt <= MAX_NUM_RETRIES) {
try {
besClient.publish(request);
return;
} catch (StatusException e) {
if (!shouldRetryStatus(e.getStatus())) {
String message =
String.format("Not retrying publishLifecycleEvent: status='%s'", e.getStatus());
logger.atInfo().log("%s", message);
throw withFailureDetail(e, BuildProgress.Code.BES_STREAM_NOT_RETRYING_FAILURE, message);
}
cause = e;
long sleepMillis = retrySleepMillis(retryAttempt);
logger.atInfo().log(
"Retrying publishLifecycleEvent: status='%s', sleepMillis=%d",
e.getStatus(), sleepMillis);
sleeper.sleepMillis(sleepMillis);
retryAttempt++;
}
}
// All retry attempts failed
throw withFailureDetail(
cause,
BuildProgress.Code.BES_UPLOAD_RETRY_LIMIT_EXCEEDED_FAILURE,
"All retry attempts failed.");
}
private void ensureUploadThreadStarted() {
synchronized (lock) {
if (uploadThread == null) {
uploadThread = new Thread(this, "bes-uploader");
uploadThread.start();
}
}
}
@SuppressWarnings("LogAndThrow") // Not confident in BES's error-handling.
private PathConverter waitForUploads(SendRegularBuildEventCommand orderedBuildEvent)
throws LocalFileUploadException, InterruptedException {
try {
// Wait for the local file and pending remote uploads to complete.
buildEventUploader
.waitForRemoteUploads(orderedBuildEvent.getEvent().remoteUploads(), timeoutExecutor)
.get();
return orderedBuildEvent.localFileUploadProgress().get();
} catch (ExecutionException e) {
logger.atWarning().withCause(e).log(
"Failed to upload files referenced by build event: %s", e.getMessage());
Throwables.throwIfUnchecked(e.getCause());
throw new LocalFileUploadException(e.getCause());
}
}
private Timestamp currentTime() {
return Timestamps.fromMillis(clock.currentTimeMillis());
}
private static Status lastEventNotSentStatus() {
return Status.FAILED_PRECONDITION.withDescription(
"Server closed stream with status OK but not all events have been sent");
}
private static Status ackQueueNotEmptyStatus(int ackQueueSize) {
return Status.FAILED_PRECONDITION.withDescription(
String.format(
"Server closed stream with status OK but not all ACKs have been"
+ " received (ackQueue=%d)",
ackQueueSize));
}
private static void addStreamStatusListener(
ListenableFuture<Status> stream, Consumer<Status> onDone) {
Futures.addCallback(
stream,
new FutureCallback<Status>() {
@Override
public void onSuccess(Status result) {
onDone.accept(result);
}
@Override
public void onFailure(Throwable t) {}
},
MoreExecutors.directExecutor());
}
private static boolean shouldRetryStatus(Status status) {
return !status.getCode().equals(Code.INVALID_ARGUMENT)
&& !status.getCode().equals(Code.FAILED_PRECONDITION);
}
private static long retrySleepMillis(int attempt) {
// This somewhat matches the backoff used for gRPC connection backoffs.
return (long) (DELAY_MILLIS * Math.pow(1.6, attempt));
}
private DetailedStatusException withFailureDetail(
StatusException exception, BuildProgress.Code bpCode, String message) {
return new DetailedStatusException(
exception, bpCode, message + " " + besClient.userReadableError(exception));
}
/** Thrown when encountered problems while uploading build event artifacts. */
private class LocalFileUploadException extends Exception {
LocalFileUploadException(Throwable cause) {
super(cause);
}
}
static class Builder {
private BuildEventServiceClient besClient;
private BuildEventArtifactUploader localFileUploader;
private BuildEventServiceProtoUtil besProtoUtil;
private BuildEventProtocolOptions bepOptions;
private boolean publishLifecycleEvents;
private Sleeper sleeper;
private Clock clock;
private ArtifactGroupNamer artifactGroupNamer;
private EventBus eventBus;
private Timestamp commandStartTime;
Builder besClient(BuildEventServiceClient value) {
this.besClient = value;
return this;
}
Builder localFileUploader(BuildEventArtifactUploader value) {
this.localFileUploader = value;
return this;
}
Builder besProtoUtil(BuildEventServiceProtoUtil value) {
this.besProtoUtil = value;
return this;
}
Builder bepOptions(BuildEventProtocolOptions value) {
this.bepOptions = value;
return this;
}
Builder publishLifecycleEvents(boolean value) {
this.publishLifecycleEvents = value;
return this;
}
Builder clock(Clock value) {
this.clock = value;
return this;
}
Builder sleeper(Sleeper value) {
this.sleeper = value;
return this;
}
Builder artifactGroupNamer(ArtifactGroupNamer value) {
this.artifactGroupNamer = value;
return this;
}
Builder eventBus(EventBus value) {
this.eventBus = value;
return this;
}
public Builder commandStartTime(Timestamp value) {
this.commandStartTime = value;
return this;
}
BuildEventServiceUploader build() {
return new BuildEventServiceUploader(
checkNotNull(besClient),
checkNotNull(localFileUploader),
checkNotNull(besProtoUtil),
checkNotNull(bepOptions),
publishLifecycleEvents,
checkNotNull(sleeper),
checkNotNull(clock),
checkNotNull(artifactGroupNamer),
checkNotNull(eventBus),
checkNotNull(commandStartTime));
}
}
/**
* A wrapper Exception class that contains the {@link StatusException}, the {@link
* BuildProgress.Code}, and a message.
*/
static class DetailedStatusException extends StatusException {
private final BuildProgress.Code bpCode;
private final String extendedMessage;
DetailedStatusException(
StatusException statusException, BuildProgress.Code bpCode, String message) {
super(statusException.getStatus(), statusException.getTrailers());
this.bpCode = bpCode;
this.extendedMessage = "The Build Event Protocol upload failed: " + message;
}
}
}