| // Copyright 2019 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.buildeventservice; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import com.google.common.eventbus.EventBus; |
| import com.google.common.flogger.GoogleLogger; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.JdkFutureAdapters; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.CommandContext; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.InvocationStatus; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.LifecycleEvent; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamEvent; |
| import com.google.devtools.build.lib.buildeventstream.AbortedEvent; |
| import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; |
| import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent; |
| import com.google.devtools.build.lib.buildeventstream.BuildEvent; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventContext; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions.OutputGroupFileModes; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; |
| import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent; |
| import com.google.devtools.build.lib.buildeventstream.PathConverter; |
| import com.google.devtools.build.lib.clock.Clock; |
| import com.google.devtools.build.lib.profiler.AutoProfiler; |
| import com.google.devtools.build.lib.profiler.GoogleAutoProfilerUtils; |
| import com.google.devtools.build.lib.server.FailureDetails.BuildProgress; |
| import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; |
| import com.google.devtools.build.lib.util.AbruptExitException; |
| import com.google.devtools.build.lib.util.DetailedExitCode; |
| import com.google.devtools.build.lib.util.ExitCode; |
| import com.google.devtools.build.lib.util.Sleeper; |
| import com.google.errorprone.annotations.CanIgnoreReturnValue; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import io.grpc.StatusException; |
| import java.time.Instant; |
| import java.util.ArrayDeque; |
| import java.util.Deque; |
| import java.util.EnumSet; |
| import java.util.concurrent.BlockingDeque; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import javax.annotation.concurrent.GuardedBy; |
| |
| /** |
| * Uploader of Build Events to the Build Event Service (BES). |
| * |
| * <p>The purpose is of this class is to manage the interaction between the BES client and the BES |
| * server. It implements an event loop pattern based on the commands defined by {@link Command}. |
| */ |
| // TODO(lpino): This class should be package-private but there are unit tests that are in the |
| // different packages and rely on this. |
| @VisibleForTesting |
| public final class BuildEventServiceUploader implements Runnable { |
| |
| /** Commands to drive the event loop. */ |
| private sealed interface Command { |
| /** Tells the event loop to open a new BES stream. */ |
| record OpenStream() implements Command {} |
| |
| /** Tells the event loop that the streaming RPC completed. */ |
| record StreamComplete(Status status) implements Command {} |
| |
| /** Tells the event loop that an ACK was received. */ |
| record AckReceived(long sequenceNumber) implements Command {} |
| |
| /** Tells the event loop to send a build event. */ |
| record SendRegularBuildEvent( |
| long sequenceNumber, |
| Instant creationTime, |
| BuildEvent event, |
| ListenableFuture<PathConverter> localFileUploadProgress) |
| implements Command {} |
| |
| /** Tells the event loop that this is the last event of the stream. */ |
| record SendLastBuildEvent(long sequenceNumber, Instant creationTime) implements Command {} |
| |
| /** Tells the event loop to retransmit a serialized build event. */ |
| record SendSerializedBuildEvent(StreamEvent request) implements Command {} |
| } |
| |
| private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); |
| |
| private static final ImmutableSet<Code> RETRYABLE_STATUS_CODES = |
| Sets.immutableEnumSet( |
| EnumSet.complementOf(EnumSet.of(Code.INVALID_ARGUMENT, Code.PERMISSION_DENIED))); |
| |
| private final BuildEventServiceClient besClient; |
| private final BuildEventArtifactUploader buildEventUploader; |
| private final BuildEventProtocolOptions buildEventProtocolOptions; |
| private final CommandContext commandContext; |
| private final boolean publishLifecycleEvents; |
| private final Sleeper sleeper; |
| private final Clock clock; |
| private final ArtifactGroupNamer namer; |
| private final EventBus eventBus; |
| // `commandStartTime` is an instant in time determined by the build tool's native launcher and |
| // matches `BuildStartingEvent.getRequest().getStartTime()`. |
| private final Instant commandStartTime; |
| // `eventStreamStartTime` is an instant *after* `commandStartTime` indicating when the |
| // BuildEventServiceUploader was initialized to begin reporting build events. This instant should |
| // be *before* the event_time for any BuildEvents uploaded after they are received via |
| // `#enqueueEvent(BuildEvent)`. |
| private final Instant eventStreamStartTime; |
| private boolean startedClose = false; |
| |
| private final ScheduledExecutorService timeoutExecutor = |
| MoreExecutors.listeningDecorator( |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder().setNameFormat("bes-uploader-timeout-%d").build())); |
| |
| /** |
| * The command queue contains two types of commands: |
| * |
| * <ul> |
| * <li>Commands containing build events, sorted by sequence number, to be sent to the server. |
| * <li>Commands that are used by {@link #publishBuildEvents()} to change state. |
| */ |
| private final BlockingDeque<Command> commandQueue = new LinkedBlockingDeque<>(); |
| |
| /** |
| * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be |
| * consecutive monotonically increasing natural numbers. |
| */ |
| private final AtomicLong nextSeqNum = new AtomicLong(1); |
| |
| private final Object lock = new Object(); |
| |
| @GuardedBy("lock") |
| private InvocationStatus invocationStatus = InvocationStatus.UNKNOWN; |
| |
| private final SettableFuture<Void> closeFuture = SettableFuture.create(); |
| private final SettableFuture<Void> halfCloseFuture = SettableFuture.create(); |
| |
| /** |
| * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily |
| * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes |
| * first). |
| */ |
| @GuardedBy("lock") |
| private Thread uploadThread; |
| |
| @GuardedBy("lock") |
| private boolean interruptCausedByCancel; |
| |
| private StreamContext streamContext; |
| |
| private BuildEventServiceUploader( |
| BuildEventServiceClient besClient, |
| BuildEventArtifactUploader localFileUploader, |
| BuildEventProtocolOptions buildEventProtocolOptions, |
| boolean publishLifecycleEvents, |
| Sleeper sleeper, |
| Clock clock, |
| ArtifactGroupNamer namer, |
| EventBus eventBus, |
| CommandContext commandContext, |
| Instant commandStartTime) { |
| this.besClient = besClient; |
| this.buildEventUploader = localFileUploader; |
| this.buildEventProtocolOptions = buildEventProtocolOptions; |
| this.publishLifecycleEvents = publishLifecycleEvents; |
| this.sleeper = sleeper; |
| this.clock = clock; |
| this.namer = namer; |
| this.eventBus = eventBus; |
| this.commandContext = commandContext; |
| this.commandStartTime = commandStartTime; |
| this.eventStreamStartTime = clock.now(); |
| // Ensure the half-close future is closed once the upload is complete. This is usually a no-op, |
| // but makes sure we half-close in case of error / interrupt. |
| closeFuture.addListener( |
| () -> halfCloseFuture.setFuture(closeFuture), MoreExecutors.directExecutor()); |
| } |
| |
| BuildEventArtifactUploader getBuildEventUploader() { |
| return buildEventUploader; |
| } |
| |
| /** Enqueues an event for uploading to a BES backend. */ |
| void enqueueEvent(BuildEvent event) { |
| // This needs to happen outside a synchronized block as it may trigger |
| // stdout/stderr and lead to a deadlock. See b/109725432 |
| ListenableFuture<PathConverter> localFileUploadFuture = |
| buildEventUploader.uploadReferencedLocalFiles(event.referencedLocalFiles()); |
| |
| // The generation of the sequence number and the addition to the {@link #commandQueue} should be |
| // atomic since BES expects the events in that exact order. |
| // More details can be found in b/131393380. |
| // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order. |
| synchronized (lock) { |
| if (startedClose) { |
| return; |
| } |
| // BuildCompletingEvent marks the end of the build in the BEP event stream. |
| if (event instanceof BuildCompletingEvent buildCompletingEvent) { |
| ExitCode exitCode = buildCompletingEvent.getExitCode(); |
| if (exitCode != null && exitCode.getNumericExitCode() == 0) { |
| invocationStatus = InvocationStatus.SUCCEEDED; |
| } else { |
| invocationStatus = InvocationStatus.FAILED; |
| } |
| } else if (event instanceof AbortedEvent && event.getEventId().hasBuildFinished()) { |
| // An AbortedEvent with a build finished ID means we are crashing. |
| invocationStatus = InvocationStatus.FAILED; |
| } |
| ensureUploadThreadStarted(); |
| |
| // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's |
| // always used under lock. It would be cleaner and more performant to update the sequence |
| // number when we take the item off the queue. |
| commandQueue.addLast( |
| new Command.SendRegularBuildEvent( |
| nextSeqNum.getAndIncrement(), clock.now(), event, localFileUploadFuture)); |
| } |
| } |
| |
| /** |
| * Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded |
| * and events enqueued after the call will be discarded. |
| * |
| * <p>The returned future completes when the upload completes. It's guaranteed to never fail. |
| */ |
| public ListenableFuture<Void> close() { |
| ensureUploadThreadStarted(); |
| |
| // The generation of the sequence number and the addition to the {@link #commandQueue} should be |
| // atomic since BES expects the events in that exact order. |
| // More details can be found in b/131393380. |
| // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order. |
| synchronized (lock) { |
| if (startedClose) { |
| return closeFuture; |
| } |
| startedClose = true; |
| // Enqueue the last event which will terminate the upload. |
| // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's |
| // always used under lock. It would be cleaner and more performant to update the sequence |
| // number when we take the item off the queue. |
| commandQueue.addLast( |
| new Command.SendLastBuildEvent(nextSeqNum.getAndIncrement(), clock.now())); |
| } |
| |
| final SettableFuture<Void> finalCloseFuture = closeFuture; |
| closeFuture.addListener( |
| () -> { |
| // Make sure to cancel any pending uploads if the closing is cancelled. |
| if (finalCloseFuture.isCancelled()) { |
| closeOnCancel(); |
| } |
| }, |
| MoreExecutors.directExecutor()); |
| |
| return closeFuture; |
| } |
| |
| private void closeOnCancel() { |
| synchronized (lock) { |
| interruptCausedByCancel = true; |
| closeNow(); |
| } |
| } |
| |
| /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */ |
| private void closeNow() { |
| synchronized (lock) { |
| if (uploadThread != null) { |
| if (uploadThread.isInterrupted()) { |
| return; |
| } |
| uploadThread.interrupt(); |
| } |
| } |
| } |
| |
| ListenableFuture<Void> getHalfCloseFuture() { |
| return halfCloseFuture; |
| } |
| |
| private DetailedExitCode logAndSetException( |
| String message, BuildProgress.Code bpCode, Throwable cause) { |
| logger.atSevere().log("%s", message); |
| DetailedExitCode detailedExitCode = |
| DetailedExitCode.of( |
| FailureDetail.newBuilder() |
| .setMessage(message + " " + besClient.userReadableError(cause)) |
| .setBuildProgress(BuildProgress.newBuilder().setCode(bpCode).build()) |
| .build()); |
| closeFuture.setException(new AbruptExitException(detailedExitCode, cause)); |
| return detailedExitCode; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| if (publishLifecycleEvents) { |
| publishLifecycleEvent(new LifecycleEvent.BuildEnqueued(commandStartTime)); |
| publishLifecycleEvent(new LifecycleEvent.InvocationStarted(eventStreamStartTime)); |
| } |
| |
| try { |
| publishBuildEvents(); |
| } finally { |
| if (publishLifecycleEvents) { |
| InvocationStatus invocationStatus; |
| synchronized (lock) { |
| invocationStatus = this.invocationStatus; |
| } |
| Instant now = clock.now(); |
| publishLifecycleEvent(new LifecycleEvent.InvocationFinished(now, invocationStatus)); |
| publishLifecycleEvent(new LifecycleEvent.BuildFinished(now, invocationStatus)); |
| } |
| } |
| eventBus.post(BuildEventServiceAvailabilityEvent.ofSuccess()); |
| } catch (InterruptedException e) { |
| synchronized (lock) { |
| Preconditions.checkState( |
| interruptCausedByCancel, "Unexpected interrupt on BES uploader thread"); |
| } |
| } catch (DetailedStatusException e) { |
| boolean isTransient = shouldRetryStatus(e.getStatus()); |
| ExitCode exitCode = |
| isTransient |
| ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR |
| : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR; |
| DetailedExitCode detailedExitCode = logAndSetException(e.extendedMessage, e.bpCode, e); |
| eventBus.post( |
| new BuildEventServiceAvailabilityEvent(exitCode, detailedExitCode.getFailureDetail())); |
| } catch (LocalFileUploadException e) { |
| Throwables.throwIfUnchecked(e.getCause()); |
| DetailedExitCode detailedExitCode = |
| logAndSetException( |
| "The Build Event Protocol local file upload failed:", |
| BuildProgress.Code.BES_UPLOAD_LOCAL_FILE_ERROR, |
| e.getCause()); |
| eventBus.post( |
| new BuildEventServiceAvailabilityEvent( |
| ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| detailedExitCode.getFailureDetail())); |
| } catch (Throwable e) { |
| closeFuture.setException(e); |
| logger.atSevere().log("BES upload failed due to a RuntimeException / Error. This is a bug."); |
| throw e; |
| } finally { |
| buildEventUploader.release(); |
| MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS); |
| closeFuture.set(null); |
| } |
| } |
| |
| private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent( |
| PathConverter pathConverter, Command.SendRegularBuildEvent cmd) throws InterruptedException { |
| |
| BuildEventContext ctx = |
| new BuildEventContext() { |
| private final OutputGroupFileModes outputGroupModes = |
| buildEventProtocolOptions.getOutputGroupFileModesMapping(); |
| |
| @Override |
| public PathConverter pathConverter() { |
| return pathConverter; |
| } |
| |
| @Override |
| public ArtifactGroupNamer artifactGroupNamer() { |
| return namer; |
| } |
| |
| @Override |
| public BuildEventProtocolOptions getOptions() { |
| return buildEventProtocolOptions; |
| } |
| |
| @Override |
| public OutputGroupFileMode getFileModeForOutputGroup(String outputGroup) { |
| return outputGroupModes.getMode(outputGroup); |
| } |
| }; |
| BuildEventStreamProtos.BuildEvent serializedBepEvent = cmd.event().asStreamProto(ctx); |
| |
| // TODO(lpino): Remove this logging once we can make every single event smaller than 1MB |
| // as protobuf recommends. |
| if (serializedBepEvent.getSerializedSize() |
| > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) { |
| eventBus.post( |
| new LargeBuildEventSerializedEvent( |
| serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize())); |
| } |
| |
| return serializedBepEvent; |
| } |
| |
| private void publishBuildEvents() |
| throws DetailedStatusException, LocalFileUploadException, InterruptedException { |
| commandQueue.addFirst(new Command.OpenStream()); |
| |
| // Every build event sent to the server needs to be acknowledged by it. This queue stores |
| // the build events that have been sent and still have to be acknowledged by the server. |
| // The build events are stored in the order they were sent. |
| Deque<Command.SendSerializedBuildEvent> ackQueue = new ArrayDeque<>(); |
| boolean lastEventSent = false; |
| int acksReceived = 0; |
| int retryAttempt = 0; |
| Command cmd = null; |
| try { |
| // {@link Command.OpenStream} is the first command and opens a bidirectional streaming RPC for |
| // sending build events and receiving ACKs. |
| // {@link Command.SendRegularBuildEvent} sends a build event to the server. Sending a build |
| // event does does not wait for the previous build event to have been ACKed. |
| // {@link Command.SendLastBuildEvent} sends the last build event and half closes the RPC. |
| // {@link Command.AckReceived} is executed for every ACK from the server and checks that the |
| // ACKs are in the correct order. |
| // {@link Command.StreamComplete} checks that all build events have been sent and all ACKs |
| // have been received. If not, it invokes a retry logic that may decide to re-send every build |
| // event that have not been ACKed. If so, it enqueues a {@link Command.OpenStream} command. |
| while (true) { |
| cmd = commandQueue.takeFirst(); |
| switch (cmd) { |
| case Command.OpenStream openStreamEventCmd -> { |
| // Invariant: commandQueue only contains commands of type SendRegularBuildEvent or |
| // SendLastBuildEvent |
| logger.atInfo().log( |
| "Starting publishBuildEvents: commandQueue=%d", commandQueue.size()); |
| streamContext = |
| besClient.openStream( |
| commandContext, (ack) -> commandQueue.addLast(new Command.AckReceived(ack))); |
| addStreamStatusListener( |
| streamContext.getStatus(), |
| (status) -> commandQueue.addLast(new Command.StreamComplete(status))); |
| } |
| case Command.SendRegularBuildEvent sendRegularBuildEventCmd -> { |
| // Invariant: commandQueue may contain commands of any type |
| |
| PathConverter pathConverter = waitForUploads(sendRegularBuildEventCmd); |
| |
| BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent = |
| createSerializedRegularBuildEvent(pathConverter, sendRegularBuildEventCmd); |
| |
| var bazelEvent = |
| new StreamEvent.BazelEvent( |
| sendRegularBuildEventCmd.creationTime(), |
| sendRegularBuildEventCmd.sequenceNumber(), |
| serializedRegularBuildEvent.toByteArray()); |
| ackQueue.addLast(new Command.SendSerializedBuildEvent(bazelEvent)); |
| streamContext.sendOverStream(bazelEvent); |
| } |
| case Command.SendSerializedBuildEvent sendSerializedBuildEvent -> { |
| ackQueue.addLast(sendSerializedBuildEvent); |
| streamContext.sendOverStream(sendSerializedBuildEvent.request); |
| // Re-close the stream if we are re-sending the last event. |
| if (sendSerializedBuildEvent.request instanceof StreamEvent.StreamFinished) { |
| halfCloseEventUploadingStream(); |
| } |
| } |
| case Command.SendLastBuildEvent sendLastBuildEventCmd -> { |
| // Invariant: the commandQueue may contain commands of any type |
| lastEventSent = true; |
| var streamFinishedEvent = |
| new StreamEvent.StreamFinished( |
| sendLastBuildEventCmd.creationTime(), sendLastBuildEventCmd.sequenceNumber()); |
| ackQueue.addLast(new Command.SendSerializedBuildEvent(streamFinishedEvent)); |
| streamContext.sendOverStream(streamFinishedEvent); |
| halfCloseEventUploadingStream(); |
| } |
| case Command.AckReceived ackReceivedCmd -> { |
| // Invariant: the commandQueue may contain commands of any type |
| if (!ackQueue.isEmpty()) { |
| Command.SendSerializedBuildEvent expected = ackQueue.removeFirst(); |
| long actualSeqNum = ackReceivedCmd.sequenceNumber(); |
| if (expected.request.sequenceNumber() == actualSeqNum) { |
| acksReceived++; |
| } else { |
| ackQueue.addFirst(expected); |
| String message = |
| String.format( |
| "Expected ACK with seqNum=%d but received ACK with seqNum=%d", |
| expected.request.sequenceNumber(), actualSeqNum); |
| logger.atInfo().log("%s", message); |
| streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
| } |
| } else { |
| String message = |
| String.format( |
| "Received ACK (seqNum=%d) when no ACK was expected", |
| ackReceivedCmd.sequenceNumber()); |
| logger.atInfo().log("%s", message); |
| streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
| } |
| } |
| case Command.StreamComplete streamCompleteCmd -> { |
| // Invariant: the commandQueue only contains commands of type SendRegularBuildEvent or |
| // SendLastBuildEvent. |
| streamContext = null; |
| Status streamStatus = streamCompleteCmd.status(); |
| if (streamStatus.isOk()) { |
| if (lastEventSent && ackQueue.isEmpty()) { |
| logger.atInfo().log("publishBuildEvents was successful"); |
| // Upload successful. Break out from the while(true) loop. |
| return; |
| } else { |
| Status status = |
| lastEventSent |
| ? ackQueueNotEmptyStatus(ackQueue.size()) |
| : lastEventNotSentStatus(); |
| BuildProgress.Code bpCode = |
| lastEventSent |
| ? BuildProgress.Code.BES_STREAM_COMPLETED_WITH_UNACK_EVENTS_ERROR |
| : BuildProgress.Code.BES_STREAM_COMPLETED_WITH_UNSENT_EVENTS_ERROR; |
| throw withFailureDetail(status.asException(), bpCode, status.getDescription()); |
| } |
| } else if (lastEventSent && ackQueue.isEmpty()) { |
| throw withFailureDetail( |
| streamStatus.asException(), |
| BuildProgress.Code.BES_STREAM_COMPLETED_WITH_REMOTE_ERROR, |
| streamStatus.getDescription()); |
| } |
| |
| if (!shouldRetryStatus(streamStatus) || shouldStartNewInvocation(streamStatus)) { |
| String message = |
| String.format("Not retrying publishBuildEvents: status='%s'", streamStatus); |
| logger.atInfo().log("%s", message); |
| BuildProgress.Code detailedCode = |
| shouldStartNewInvocation(streamStatus) |
| ? BuildProgress.Code.BES_UPLOAD_TIMEOUT_ERROR |
| : BuildProgress.Code.BES_STREAM_NOT_RETRYING_FAILURE; |
| throw withFailureDetail(streamStatus.asException(), detailedCode, message); |
| } |
| if (retryAttempt == buildEventProtocolOptions.besUploadMaxRetries) { |
| String message = |
| String.format( |
| "Not retrying publishBuildEvents, no more attempts left: status='%s'", |
| streamStatus); |
| logger.atInfo().log("%s", message); |
| throw withFailureDetail( |
| streamStatus.asException(), |
| BuildProgress.Code.BES_UPLOAD_RETRY_LIMIT_EXCEEDED_FAILURE, |
| message); |
| } |
| |
| // Retry logic |
| // Adds build event commands from the ackQueue to the front of the commandQueue, so that |
| // the commands in the commandQueue are sorted by sequence number (ascending). |
| Command.SendSerializedBuildEvent unacked; |
| while ((unacked = ackQueue.pollLast()) != null) { |
| commandQueue.addFirst(unacked); |
| } |
| |
| long sleepMillis = retrySleepMillis(retryAttempt); |
| logger.atInfo().log( |
| "Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis); |
| sleeper.sleepMillis(sleepMillis); |
| |
| // If we made progress, meaning the server ACKed events that we sent, then reset |
| // the retry counter to 0. |
| if (acksReceived > 0) { |
| retryAttempt = 0; |
| } else { |
| retryAttempt++; |
| } |
| acksReceived = 0; |
| commandQueue.addFirst(new Command.OpenStream()); |
| } |
| } |
| } |
| } catch (InterruptedException | LocalFileUploadException e) { |
| int limit = 30; |
| logger.atInfo().log( |
| "Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, " |
| + "ack_queue: %s, command_queue_size: %d, command_queue: %s", |
| limit, |
| ackQueue.size(), |
| Iterables.limit(ackQueue, limit), |
| commandQueue.size(), |
| Iterables.limit(commandQueue, limit)); |
| if (streamContext != null) { |
| streamContext.abortStream(Status.CANCELLED); |
| } |
| throw e; |
| } finally { |
| logger.atInfo().log("About to cancel all local file uploads"); |
| try (AutoProfiler ignored = |
| GoogleAutoProfilerUtils.logged("local file upload cancellation")) { |
| // If we failed in the middle of an event with uploads, cancel those. |
| if (cmd instanceof Command.SendRegularBuildEvent sendRegularBuildEventCmd) { |
| cancelLocalFileUpload(sendRegularBuildEventCmd); |
| } |
| // Drain ackQueue and commandQueue, cancelling all pending local file uploads. |
| ackQueue.clear(); |
| Command queuedCmd; |
| while ((queuedCmd = commandQueue.pollFirst()) != null) { |
| if (queuedCmd instanceof Command.SendRegularBuildEvent sendRegularBuildEventCmd) { |
| cancelLocalFileUpload(sendRegularBuildEventCmd); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Half-closes the uploading stream, which can happen when we send the final event or when we |
| * re-send the final event. |
| */ |
| private void halfCloseEventUploadingStream() { |
| streamContext.halfCloseStream(); |
| halfCloseFuture.set(null); |
| logger.atInfo().log("BES uploader is half-closed"); |
| } |
| |
| private void cancelLocalFileUpload(Command.SendRegularBuildEvent cmd) { |
| ListenableFuture<PathConverter> localFileUploaderFuture = cmd.localFileUploadProgress(); |
| if (!localFileUploaderFuture.isDone()) { |
| localFileUploaderFuture.cancel(true); |
| } |
| } |
| |
| /** Sends a {@link LifecycleEvent} to the BES backend. */ |
| private void publishLifecycleEvent(LifecycleEvent lifecycleEvent) |
| throws DetailedStatusException, InterruptedException { |
| int retryAttempt = 0; |
| StatusException cause = null; |
| while (retryAttempt <= this.buildEventProtocolOptions.besUploadMaxRetries) { |
| try { |
| besClient.publish(commandContext, lifecycleEvent); |
| return; |
| } catch (StatusException e) { |
| if (!shouldRetryStatus(e.getStatus()) || shouldStartNewInvocation(e.getStatus())) { |
| String message = |
| String.format("Not retrying publishLifecycleEvent: status='%s'", e.getStatus()); |
| logger.atInfo().log("%s", message); |
| throw withFailureDetail(e, BuildProgress.Code.BES_STREAM_NOT_RETRYING_FAILURE, message); |
| } |
| |
| cause = e; |
| |
| long sleepMillis = retrySleepMillis(retryAttempt); |
| logger.atInfo().log( |
| "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d", |
| e.getStatus(), sleepMillis); |
| sleeper.sleepMillis(sleepMillis); |
| retryAttempt++; |
| } |
| } |
| |
| // All retry attempts failed |
| throw withFailureDetail( |
| cause, |
| BuildProgress.Code.BES_UPLOAD_RETRY_LIMIT_EXCEEDED_FAILURE, |
| String.format("All %d retry attempts failed.", retryAttempt - 1)); |
| } |
| |
| private void ensureUploadThreadStarted() { |
| synchronized (lock) { |
| if (uploadThread == null) { |
| uploadThread = new Thread(this, "bes-uploader"); |
| uploadThread.start(); |
| } |
| } |
| } |
| |
| @SuppressWarnings("LogAndThrow") // Not confident in BES's error-handling. |
| private PathConverter waitForUploads(Command.SendRegularBuildEvent sendRegularBuildEventCmd) |
| throws LocalFileUploadException, InterruptedException { |
| try { |
| // Wait for the local file and pending remote uploads to complete. |
| buildEventUploader |
| .waitForRemoteUploads(sendRegularBuildEventCmd.event().remoteUploads(), timeoutExecutor) |
| .get(); |
| return sendRegularBuildEventCmd.localFileUploadProgress().get(); |
| } catch (ExecutionException e) { |
| logger.atWarning().withCause(e).log( |
| "Failed to upload files referenced by build event: %s", e.getMessage()); |
| Throwables.throwIfUnchecked(e.getCause()); |
| throw new LocalFileUploadException(e.getCause()); |
| } |
| } |
| |
| private static Status lastEventNotSentStatus() { |
| return Status.FAILED_PRECONDITION.withDescription( |
| "Server closed stream with status OK but not all events have been sent"); |
| } |
| |
| private static Status ackQueueNotEmptyStatus(int ackQueueSize) { |
| return Status.FAILED_PRECONDITION.withDescription( |
| String.format( |
| "Server closed stream with status OK but not all ACKs have been" |
| + " received (ackQueue=%d)", |
| ackQueueSize)); |
| } |
| |
| private static void addStreamStatusListener(Future<Status> stream, Consumer<Status> onDone) { |
| Futures.addCallback( |
| JdkFutureAdapters.listenInPoolThread(stream), |
| new FutureCallback<Status>() { |
| @Override |
| public void onSuccess(Status result) { |
| onDone.accept(result); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) {} |
| }, |
| MoreExecutors.directExecutor()); |
| } |
| |
| private static boolean shouldRetryStatus(Status status) { |
| return RETRYABLE_STATUS_CODES.contains(status.getCode()); |
| } |
| |
| private static boolean shouldStartNewInvocation(Status status) { |
| return status.getCode().equals(Code.FAILED_PRECONDITION); |
| } |
| |
| private long retrySleepMillis(int attempt) { |
| Preconditions.checkArgument(attempt >= 0, "attempt must be nonnegative: %s", attempt); |
| // This somewhat matches the backoff used for gRPC connection backoffs. |
| return (long) |
| (this.buildEventProtocolOptions.besUploadRetryInitialDelay.toMillis() |
| * Math.pow(1.6, attempt)); |
| } |
| |
| private DetailedStatusException withFailureDetail( |
| StatusException exception, BuildProgress.Code bpCode, String message) { |
| return new DetailedStatusException( |
| exception, bpCode, message + " " + besClient.userReadableError(exception)); |
| } |
| |
| /** Thrown when encountered problems while uploading build event artifacts. */ |
| private class LocalFileUploadException extends Exception { |
| LocalFileUploadException(Throwable cause) { |
| super(cause); |
| } |
| } |
| |
| static class Builder { |
| private BuildEventServiceClient besClient; |
| private BuildEventArtifactUploader localFileUploader; |
| private BuildEventProtocolOptions bepOptions; |
| private boolean publishLifecycleEvents; |
| private Sleeper sleeper; |
| private Clock clock; |
| private ArtifactGroupNamer artifactGroupNamer; |
| private EventBus eventBus; |
| private CommandContext commandContext; |
| private Instant commandStartTime; |
| |
| @CanIgnoreReturnValue |
| Builder besClient(BuildEventServiceClient value) { |
| this.besClient = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder localFileUploader(BuildEventArtifactUploader value) { |
| this.localFileUploader = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder bepOptions(BuildEventProtocolOptions value) { |
| this.bepOptions = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder publishLifecycleEvents(boolean value) { |
| this.publishLifecycleEvents = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder clock(Clock value) { |
| this.clock = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder sleeper(Sleeper value) { |
| this.sleeper = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder artifactGroupNamer(ArtifactGroupNamer value) { |
| this.artifactGroupNamer = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder eventBus(EventBus value) { |
| this.eventBus = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| Builder commandContext(CommandContext value) { |
| this.commandContext = value; |
| return this; |
| } |
| |
| @CanIgnoreReturnValue |
| public Builder commandStartTime(Instant value) { |
| this.commandStartTime = value; |
| return this; |
| } |
| |
| BuildEventServiceUploader build() { |
| return new BuildEventServiceUploader( |
| checkNotNull(besClient), |
| checkNotNull(localFileUploader), |
| checkNotNull(bepOptions), |
| publishLifecycleEvents, |
| checkNotNull(sleeper), |
| checkNotNull(clock), |
| checkNotNull(artifactGroupNamer), |
| checkNotNull(eventBus), |
| checkNotNull(commandContext), |
| checkNotNull(commandStartTime)); |
| } |
| } |
| |
| /** |
| * A wrapper Exception class that contains the {@link StatusException}, the {@link |
| * BuildProgress.Code}, and a message. |
| */ |
| static class DetailedStatusException extends StatusException { |
| private final BuildProgress.Code bpCode; |
| private final String extendedMessage; |
| |
| DetailedStatusException( |
| StatusException statusException, BuildProgress.Code bpCode, String message) { |
| super(statusException.getStatus(), statusException.getTrailers()); |
| this.bpCode = bpCode; |
| this.extendedMessage = "The Build Event Protocol upload failed: " + message; |
| } |
| } |
| } |