lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 1 | // Copyright 2019 The Bazel Authors. All rights reserved. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | package com.google.devtools.build.lib.buildeventservice; |
| 15 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 16 | import static com.google.common.base.Preconditions.checkNotNull; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 17 | import static com.google.common.base.Preconditions.checkState; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 18 | import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED; |
| 19 | import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED; |
| 20 | import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS; |
| 21 | |
| 22 | import com.google.common.annotations.VisibleForTesting; |
| 23 | import com.google.common.base.Preconditions; |
| 24 | import com.google.common.base.Throwables; |
| 25 | import com.google.common.collect.Iterables; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 26 | import com.google.common.eventbus.EventBus; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 27 | import com.google.common.util.concurrent.FutureCallback; |
| 28 | import com.google.common.util.concurrent.Futures; |
| 29 | import com.google.common.util.concurrent.ListenableFuture; |
| 30 | import com.google.common.util.concurrent.MoreExecutors; |
| 31 | import com.google.common.util.concurrent.SettableFuture; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 32 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand; |
| 33 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand; |
| 34 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand; |
| 35 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendBuildEventCommand; |
| 36 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendLastBuildEventCommand; |
| 37 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendRegularBuildEventCommand; |
| 38 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.StreamCompleteCommand; |
| 39 | import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; |
| 40 | import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext; |
| 41 | import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; |
| 42 | import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent; |
| 43 | import com.google.devtools.build.lib.buildeventstream.BuildEvent; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 44 | import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; |
| 45 | import com.google.devtools.build.lib.buildeventstream.BuildEventContext; |
| 46 | import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; |
| 47 | import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 48 | import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 49 | import com.google.devtools.build.lib.buildeventstream.PathConverter; |
| 50 | import com.google.devtools.build.lib.clock.Clock; |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 51 | import com.google.devtools.build.lib.util.AbruptExitException; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 52 | import com.google.devtools.build.lib.util.ExitCode; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 53 | import com.google.devtools.build.lib.util.Sleeper; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 54 | import com.google.devtools.build.v1.BuildStatus.Result; |
| 55 | import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; |
| 56 | import com.google.devtools.build.v1.PublishLifecycleEventRequest; |
| 57 | import com.google.protobuf.Any; |
| 58 | import com.google.protobuf.Timestamp; |
| 59 | import com.google.protobuf.util.Timestamps; |
| 60 | import io.grpc.Status; |
| 61 | import io.grpc.Status.Code; |
| 62 | import io.grpc.StatusException; |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 63 | import java.util.ArrayDeque; |
| 64 | import java.util.Deque; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 65 | import java.util.concurrent.BlockingDeque; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 66 | import java.util.concurrent.ExecutionException; |
| 67 | import java.util.concurrent.LinkedBlockingDeque; |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 68 | import java.util.concurrent.atomic.AtomicBoolean; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 69 | import java.util.concurrent.atomic.AtomicLong; |
| 70 | import java.util.function.Consumer; |
| 71 | import java.util.logging.Level; |
| 72 | import java.util.logging.Logger; |
| 73 | import javax.annotation.concurrent.GuardedBy; |
| 74 | |
| 75 | /** |
| 76 | * Uploader of Build Events to the Build Event Service (BES). |
| 77 | * |
| 78 | * <p>The purpose is of this class is to manage the interaction between the BES client and the BES |
| 79 | * server. It implements the event loop pattern based on the commands defined by {@link |
| 80 | * BuildEventServiceUploaderCommands}. |
| 81 | */ |
| 82 | // TODO(lpino): This class should be package-private but there are unit tests that are in the |
| 83 | // different packages and rely on this. |
| 84 | @VisibleForTesting |
| 85 | public final class BuildEventServiceUploader implements Runnable { |
| 86 | private static final Logger logger = Logger.getLogger(BuildEventServiceUploader.class.getName()); |
| 87 | |
| 88 | /** Configuration knobs related to RPC retries. Values chosen by good judgement. */ |
| 89 | private static final int MAX_NUM_RETRIES = 4; |
| 90 | |
| 91 | private static final int DELAY_MILLIS = 1000; |
| 92 | |
| 93 | private final BuildEventServiceClient besClient; |
| 94 | private final BuildEventArtifactUploader localFileUploader; |
| 95 | private final BuildEventServiceProtoUtil besProtoUtil; |
| 96 | private final BuildEventProtocolOptions buildEventProtocolOptions; |
| 97 | private final boolean publishLifecycleEvents; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 98 | private final Sleeper sleeper; |
| 99 | private final Clock clock; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 100 | private final ArtifactGroupNamer namer; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 101 | private final EventBus eventBus; |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 102 | private final AtomicBoolean startedClose = new AtomicBoolean(false); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 103 | |
| 104 | /** |
| 105 | * The event queue contains two types of events: - Build events, sorted by sequence number, that |
| 106 | * should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to |
| 107 | * change state. |
| 108 | */ |
| 109 | private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>(); |
| 110 | |
| 111 | /** |
| 112 | * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be |
| 113 | * consecutive monotonically increasing natural numbers. |
| 114 | */ |
| 115 | private final AtomicLong nextSeqNum = new AtomicLong(1); |
| 116 | |
| 117 | private final Object lock = new Object(); |
| 118 | |
| 119 | @GuardedBy("lock") |
| 120 | private Result buildStatus = UNKNOWN_STATUS; |
| 121 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 122 | private final SettableFuture<Void> closeFuture = SettableFuture.create(); |
felly | 6764100 | 2019-04-23 09:31:25 -0700 | [diff] [blame] | 123 | private final SettableFuture<Void> halfCloseFuture = SettableFuture.create(); |
| 124 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 125 | /** |
| 126 | * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily |
| 127 | * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes |
| 128 | * first). |
| 129 | */ |
| 130 | @GuardedBy("lock") |
| 131 | private Thread uploadThread; |
| 132 | |
| 133 | @GuardedBy("lock") |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 134 | private boolean interruptCausedByCancel; |
| 135 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 136 | private StreamContext streamContext; |
| 137 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 138 | private BuildEventServiceUploader( |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 139 | BuildEventServiceClient besClient, |
| 140 | BuildEventArtifactUploader localFileUploader, |
| 141 | BuildEventServiceProtoUtil besProtoUtil, |
| 142 | BuildEventProtocolOptions buildEventProtocolOptions, |
| 143 | boolean publishLifecycleEvents, |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 144 | Sleeper sleeper, |
| 145 | Clock clock, |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 146 | ArtifactGroupNamer namer, |
| 147 | EventBus eventBus) { |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 148 | this.besClient = besClient; |
| 149 | this.localFileUploader = localFileUploader; |
| 150 | this.besProtoUtil = besProtoUtil; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 151 | this.buildEventProtocolOptions = buildEventProtocolOptions; |
| 152 | this.publishLifecycleEvents = publishLifecycleEvents; |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 153 | this.sleeper = sleeper; |
| 154 | this.clock = clock; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 155 | this.namer = namer; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 156 | this.eventBus = eventBus; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 157 | } |
| 158 | |
| 159 | BuildEventArtifactUploader getLocalFileUploader() { |
| 160 | return localFileUploader; |
| 161 | } |
| 162 | |
| 163 | /** Enqueues an event for uploading to a BES backend. */ |
| 164 | void enqueueEvent(BuildEvent event) { |
| 165 | // This needs to happen outside a synchronized block as it may trigger |
| 166 | // stdout/stderr and lead to a deadlock. See b/109725432 |
| 167 | ListenableFuture<PathConverter> localFileUploadFuture = |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 168 | localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles()); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 169 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 170 | if (startedClose.get()) { |
| 171 | if (!localFileUploadFuture.isDone()) { |
| 172 | localFileUploadFuture.cancel(true); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 173 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 174 | return; |
| 175 | } |
lpino | 4b092cd | 2019-04-26 07:20:53 -0700 | [diff] [blame] | 176 | |
| 177 | // The generation of the sequence number and the addition to the {@link #eventQueue} should be |
| 178 | // atomic since BES expects the events in that exact order. |
| 179 | // More details can be found in b/131393380. |
| 180 | // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order. |
| 181 | synchronized (lock) { |
| 182 | // BuildCompletingEvent marks the end of the build in the BEP event stream. |
| 183 | if (event instanceof BuildCompletingEvent) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 184 | this.buildStatus = extractBuildStatus((BuildCompletingEvent) event); |
| 185 | } |
lpino | 4b092cd | 2019-04-26 07:20:53 -0700 | [diff] [blame] | 186 | ensureUploadThreadStarted(); |
| 187 | |
| 188 | // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's |
| 189 | // always used under lock. It would be cleaner and more performant to update the sequence |
| 190 | // number when we take the item off the queue. |
| 191 | eventQueue.addLast( |
| 192 | new SendRegularBuildEventCommand( |
| 193 | event, |
| 194 | localFileUploadFuture, |
| 195 | nextSeqNum.getAndIncrement(), |
| 196 | Timestamps.fromMillis(clock.currentTimeMillis()))); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 197 | } |
| 198 | } |
| 199 | |
| 200 | /** |
| 201 | * Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded |
| 202 | * and events enqueued after the call will be discarded. |
| 203 | * |
| 204 | * <p>The returned future completes when the upload completes. It's guaranteed to never fail. |
| 205 | */ |
| 206 | public ListenableFuture<Void> close() { |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 207 | if (startedClose.getAndSet(true)) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 208 | return closeFuture; |
| 209 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 210 | |
| 211 | ensureUploadThreadStarted(); |
| 212 | |
lpino | 4b092cd | 2019-04-26 07:20:53 -0700 | [diff] [blame] | 213 | // The generation of the sequence number and the addition to the {@link #eventQueue} should be |
| 214 | // atomic since BES expects the events in that exact order. |
| 215 | // More details can be found in b/131393380. |
| 216 | // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order. |
| 217 | synchronized (lock) { |
| 218 | // Enqueue the last event which will terminate the upload. |
| 219 | // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's |
| 220 | // always used under lock. It would be cleaner and more performant to update the sequence |
| 221 | // number when we take the item off the queue. |
| 222 | eventQueue.addLast( |
| 223 | new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime())); |
| 224 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 225 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 226 | final SettableFuture<Void> finalCloseFuture = closeFuture; |
| 227 | closeFuture.addListener( |
| 228 | () -> { |
| 229 | // Make sure to cancel any pending uploads if the closing is cancelled. |
| 230 | if (finalCloseFuture.isCancelled()) { |
| 231 | closeOnCancel(); |
| 232 | } |
| 233 | }, |
| 234 | MoreExecutors.directExecutor()); |
| 235 | |
| 236 | return closeFuture; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 237 | } |
| 238 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 239 | private void closeOnCancel() { |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 240 | synchronized (lock) { |
| 241 | interruptCausedByCancel = true; |
| 242 | closeNow(); |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */ |
| 247 | private void closeNow() { |
| 248 | synchronized (lock) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 249 | if (uploadThread != null) { |
| 250 | if (uploadThread.isInterrupted()) { |
| 251 | return; |
| 252 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 253 | uploadThread.interrupt(); |
| 254 | } |
| 255 | } |
| 256 | } |
| 257 | |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 258 | private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) { |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 259 | checkState(!exitCode.equals(ExitCode.SUCCESS)); |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 260 | logger.severe(message); |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 261 | closeFuture.setException(new AbruptExitException(message, exitCode, cause)); |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 262 | } |
| 263 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 264 | @Override |
| 265 | public void run() { |
| 266 | try { |
| 267 | if (publishLifecycleEvents) { |
| 268 | publishLifecycleEvent(besProtoUtil.buildEnqueued(currentTime())); |
| 269 | publishLifecycleEvent(besProtoUtil.invocationStarted(currentTime())); |
| 270 | } |
| 271 | |
| 272 | try { |
| 273 | publishBuildEvents(); |
| 274 | } finally { |
| 275 | if (publishLifecycleEvents) { |
| 276 | Result buildStatus; |
| 277 | synchronized (lock) { |
| 278 | buildStatus = this.buildStatus; |
| 279 | } |
| 280 | publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus)); |
| 281 | publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus)); |
| 282 | } |
| 283 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 284 | } catch (InterruptedException e) { |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 285 | logger.info("Aborting the BES upload due to having received an interrupt"); |
| 286 | synchronized (lock) { |
| 287 | Preconditions.checkState( |
lpino | 82f5090 | 2019-04-25 16:08:49 -0700 | [diff] [blame] | 288 | interruptCausedByCancel, "Unexpected interrupt on BES uploader thread"); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 289 | } |
| 290 | } catch (StatusException e) { |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 291 | logAndExitAbruptly( |
| 292 | "The Build Event Protocol upload failed: " + besClient.userReadableError(e), |
| 293 | shouldRetryStatus(e.getStatus()) |
| 294 | ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR |
| 295 | : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| 296 | e); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 297 | } catch (LocalFileUploadException e) { |
| 298 | Throwables.throwIfUnchecked(e.getCause()); |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 299 | logAndExitAbruptly( |
| 300 | "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(), |
| 301 | ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| 302 | e.getCause()); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 303 | } catch (Throwable e) { |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 304 | closeFuture.setException(e); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 305 | logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug."); |
| 306 | throw e; |
| 307 | } finally { |
| 308 | localFileUploader.shutdown(); |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 309 | closeFuture.set(null); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 310 | } |
| 311 | } |
| 312 | |
| 313 | private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent( |
| 314 | PathConverter pathConverter, |
| 315 | SendRegularBuildEventCommand buildEvent) { |
| 316 | BuildEventContext ctx = |
| 317 | new BuildEventContext() { |
| 318 | @Override |
| 319 | public PathConverter pathConverter() { |
| 320 | return pathConverter; |
| 321 | } |
| 322 | |
| 323 | @Override |
| 324 | public ArtifactGroupNamer artifactGroupNamer() { |
| 325 | return namer; |
| 326 | } |
| 327 | |
| 328 | @Override |
| 329 | public BuildEventProtocolOptions getOptions() { |
| 330 | return buildEventProtocolOptions; |
| 331 | } |
| 332 | }; |
| 333 | BuildEventStreamProtos.BuildEvent serializedBepEvent = |
| 334 | buildEvent.getEvent().asStreamProto(ctx); |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 335 | |
| 336 | // TODO(lpino): Remove this logging once we can make every single event smaller than 1MB |
| 337 | // as protobuf recommends. |
| 338 | if (serializedBepEvent.getSerializedSize() |
| 339 | > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) { |
| 340 | eventBus.post( |
| 341 | new LargeBuildEventSerializedEvent( |
| 342 | serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize())); |
| 343 | } |
| 344 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 345 | return serializedBepEvent; |
| 346 | } |
| 347 | |
| 348 | private void publishBuildEvents() |
| 349 | throws StatusException, LocalFileUploadException, InterruptedException { |
| 350 | eventQueue.addFirst(new OpenStreamCommand()); |
| 351 | |
| 352 | // Every build event sent to the server needs to be acknowledged by it. This queue stores |
| 353 | // the build events that have been sent and still have to be acknowledged by the server. |
| 354 | // The build events are stored in the order they were sent. |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 355 | Deque<SendBuildEventCommand> ackQueue = new ArrayDeque<>(); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 356 | boolean lastEventSent = false; |
| 357 | int acksReceived = 0; |
| 358 | int retryAttempt = 0; |
| 359 | |
| 360 | try { |
| 361 | // {@link BuildEventServiceUploaderCommands#OPEN_STREAM} is the first event and opens a |
| 362 | // bidi streaming RPC for sending build events and receiving ACKs. |
| 363 | // {@link BuildEventServiceUploaderCommands#SEND_REGULAR_BUILD_EVENT} sends a build event to |
| 364 | // the server. Sending of the Nth build event does |
| 365 | // does not wait for the ACK of the N-1th build event to have been received. |
| 366 | // {@link BuildEventServiceUploaderCommands#SEND_LAST_BUILD_EVENT} sends the last build event |
| 367 | // and half closes the RPC. |
| 368 | // {@link BuildEventServiceUploaderCommands#ACK_RECEIVED} is executed for every ACK from |
| 369 | // the server and checks that the ACKs are in the correct order. |
| 370 | // {@link BuildEventServiceUploaderCommands#STREAM_COMPLETE} checks that all build events |
| 371 | // have been sent and all ACKs have been received. If not it invokes a retry logic that may |
| 372 | // decide to re-send every build event for which an ACK has not been received. If so, it |
| 373 | // adds an OPEN_STREAM event. |
| 374 | while (true) { |
| 375 | EventLoopCommand event = eventQueue.takeFirst(); |
| 376 | switch (event.type()) { |
| 377 | case OPEN_STREAM: |
| 378 | { |
| 379 | // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT |
| 380 | // or SEND_LAST_BUILD_EVENT |
| 381 | logger.info( |
| 382 | String.format("Starting publishBuildEvents: eventQueue=%d", eventQueue.size())); |
| 383 | streamContext = |
| 384 | besClient.openStream( |
| 385 | (ack) -> eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber()))); |
| 386 | addStreamStatusListener( |
| 387 | streamContext.getStatus(), |
| 388 | (status) -> eventQueue.addLast(new StreamCompleteCommand(status))); |
| 389 | } |
| 390 | break; |
| 391 | |
| 392 | case SEND_REGULAR_BUILD_EVENT: |
| 393 | { |
| 394 | // Invariant: the eventQueue may contain events of any type |
| 395 | SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event; |
| 396 | ackQueue.addLast(buildEvent); |
| 397 | |
| 398 | PathConverter pathConverter = waitForLocalFileUploads(buildEvent); |
| 399 | |
| 400 | BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent = |
| 401 | createSerializedRegularBuildEvent(pathConverter, buildEvent); |
| 402 | |
| 403 | PublishBuildToolEventStreamRequest request = |
| 404 | besProtoUtil.bazelEvent( |
| 405 | buildEvent.getSequenceNumber(), |
| 406 | buildEvent.getCreationTime(), |
| 407 | Any.pack(serializedRegularBuildEvent)); |
| 408 | |
| 409 | streamContext.sendOverStream(request); |
| 410 | } |
| 411 | break; |
| 412 | |
| 413 | case SEND_LAST_BUILD_EVENT: |
| 414 | { |
| 415 | // Invariant: the eventQueue may contain events of any type |
| 416 | SendBuildEventCommand lastEvent = (SendLastBuildEventCommand) event; |
| 417 | ackQueue.addLast(lastEvent); |
| 418 | lastEventSent = true; |
| 419 | PublishBuildToolEventStreamRequest request = |
| 420 | besProtoUtil.streamFinished( |
| 421 | lastEvent.getSequenceNumber(), lastEvent.getCreationTime()); |
| 422 | streamContext.sendOverStream(request); |
| 423 | streamContext.halfCloseStream(); |
felly | 6764100 | 2019-04-23 09:31:25 -0700 | [diff] [blame] | 424 | halfCloseFuture.set(null); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 425 | } |
| 426 | break; |
| 427 | |
| 428 | case ACK_RECEIVED: |
| 429 | { |
| 430 | // Invariant: the eventQueue may contain events of any type |
| 431 | AckReceivedCommand ackEvent = (AckReceivedCommand) event; |
| 432 | if (!ackQueue.isEmpty()) { |
| 433 | SendBuildEventCommand expected = ackQueue.removeFirst(); |
| 434 | long actualSeqNum = ackEvent.getSequenceNumber(); |
| 435 | if (expected.getSequenceNumber() == actualSeqNum) { |
| 436 | acksReceived++; |
| 437 | } else { |
| 438 | ackQueue.addFirst(expected); |
| 439 | String message = |
| 440 | String.format( |
| 441 | "Expected ACK with seqNum=%d but received ACK with seqNum=%d", |
| 442 | expected.getSequenceNumber(), actualSeqNum); |
| 443 | logger.info(message); |
| 444 | streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 445 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 446 | } else { |
| 447 | String message = |
| 448 | String.format( |
| 449 | "Received ACK (seqNum=%d) when no ACK was expected", |
| 450 | ackEvent.getSequenceNumber()); |
| 451 | logger.info(message); |
| 452 | streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
| 453 | } |
| 454 | } |
| 455 | break; |
| 456 | |
| 457 | case STREAM_COMPLETE: |
| 458 | { |
| 459 | // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT |
| 460 | // or SEND_LAST_BUILD_EVENT |
| 461 | streamContext = null; |
| 462 | StreamCompleteCommand completeEvent = (StreamCompleteCommand) event; |
| 463 | Status streamStatus = completeEvent.status(); |
| 464 | if (streamStatus.isOk()) { |
| 465 | if (lastEventSent && ackQueue.isEmpty()) { |
| 466 | logger.info("publishBuildEvents was successful"); |
| 467 | // Upload successful. Break out from the while(true) loop. |
| 468 | return; |
| 469 | } else { |
| 470 | throw (lastEventSent |
| 471 | ? ackQueueNotEmptyStatus(ackQueue.size()) |
| 472 | : lastEventNotSentStatus()) |
| 473 | .asException(); |
| 474 | } |
| 475 | } |
| 476 | |
| 477 | if (!shouldRetryStatus(streamStatus)) { |
| 478 | logger.info( |
| 479 | String.format("Not retrying publishBuildEvents: status='%s'", streamStatus)); |
| 480 | throw streamStatus.asException(); |
| 481 | } |
| 482 | if (retryAttempt == MAX_NUM_RETRIES) { |
| 483 | logger.info( |
| 484 | String.format( |
| 485 | "Not retrying publishBuildEvents, no more attempts left: status='%s'", |
| 486 | streamStatus)); |
| 487 | throw streamStatus.asException(); |
| 488 | } |
| 489 | |
| 490 | // Retry logic |
| 491 | // Adds events from the ackQueue to the front of the eventQueue, so that the |
| 492 | // events in the eventQueue are sorted by sequence number (ascending). |
| 493 | SendBuildEventCommand unacked; |
| 494 | while ((unacked = ackQueue.pollLast()) != null) { |
| 495 | eventQueue.addFirst(unacked); |
| 496 | } |
| 497 | |
| 498 | long sleepMillis = retrySleepMillis(retryAttempt); |
| 499 | logger.info( |
| 500 | String.format( |
| 501 | "Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis)); |
| 502 | sleeper.sleepMillis(sleepMillis); |
| 503 | |
| 504 | // If we made progress, meaning the server ACKed events that we sent, then reset |
| 505 | // the retry counter to 0. |
| 506 | if (acksReceived > 0) { |
| 507 | retryAttempt = 0; |
| 508 | } else { |
| 509 | retryAttempt++; |
| 510 | } |
| 511 | acksReceived = 0; |
| 512 | eventQueue.addFirst(new OpenStreamCommand()); |
| 513 | } |
| 514 | break; |
| 515 | } |
| 516 | } |
| 517 | } catch (InterruptedException | LocalFileUploadException e) { |
| 518 | int limit = 30; |
| 519 | logger.info( |
| 520 | String.format( |
| 521 | "Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, " |
| 522 | + "ack_queue: %s, event_queue_size: %d, event_queue: %s", |
| 523 | limit, |
| 524 | ackQueue.size(), |
| 525 | Iterables.limit(ackQueue, limit), |
| 526 | eventQueue.size(), |
| 527 | Iterables.limit(eventQueue, limit))); |
| 528 | if (streamContext != null) { |
| 529 | streamContext.abortStream(Status.CANCELLED); |
| 530 | } |
| 531 | throw e; |
| 532 | } finally { |
| 533 | // Cancel all local file uploads that may still be running |
| 534 | // of events that haven't been uploaded. |
| 535 | EventLoopCommand event; |
| 536 | while ((event = ackQueue.pollFirst()) != null) { |
| 537 | if (event instanceof SendRegularBuildEventCommand) { |
| 538 | cancelLocalFileUpload((SendRegularBuildEventCommand) event); |
| 539 | } |
| 540 | } |
| 541 | while ((event = eventQueue.pollFirst()) != null) { |
| 542 | if (event instanceof SendRegularBuildEventCommand) { |
| 543 | cancelLocalFileUpload((SendRegularBuildEventCommand) event); |
| 544 | } |
| 545 | } |
| 546 | } |
| 547 | } |
| 548 | |
| 549 | private void cancelLocalFileUpload(SendRegularBuildEventCommand event) { |
| 550 | ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress(); |
| 551 | if (!localFileUploaderFuture.isDone()) { |
| 552 | localFileUploaderFuture.cancel(true); |
| 553 | } |
| 554 | } |
| 555 | |
| 556 | /** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */ |
| 557 | private void publishLifecycleEvent(PublishLifecycleEventRequest request) |
| 558 | throws StatusException, InterruptedException { |
| 559 | int retryAttempt = 0; |
| 560 | StatusException cause = null; |
| 561 | while (retryAttempt <= MAX_NUM_RETRIES) { |
| 562 | try { |
| 563 | besClient.publish(request); |
| 564 | return; |
| 565 | } catch (StatusException e) { |
| 566 | if (!shouldRetryStatus(e.getStatus())) { |
| 567 | logger.info( |
| 568 | String.format( |
| 569 | "Not retrying publishLifecycleEvent: status='%s'", e.getStatus().toString())); |
| 570 | throw e; |
| 571 | } |
| 572 | |
| 573 | cause = e; |
| 574 | |
| 575 | long sleepMillis = retrySleepMillis(retryAttempt); |
| 576 | logger.info( |
| 577 | String.format( |
| 578 | "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d", |
| 579 | e.getStatus().toString(), sleepMillis)); |
| 580 | sleeper.sleepMillis(sleepMillis); |
| 581 | retryAttempt++; |
| 582 | } |
| 583 | } |
| 584 | |
| 585 | // All retry attempts failed |
| 586 | throw cause; |
| 587 | } |
| 588 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 589 | private void ensureUploadThreadStarted() { |
| 590 | synchronized (lock) { |
| 591 | if (uploadThread == null) { |
| 592 | uploadThread = new Thread(this, "bes-uploader"); |
| 593 | uploadThread.start(); |
| 594 | } |
| 595 | } |
| 596 | } |
| 597 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 598 | private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent) |
| 599 | throws LocalFileUploadException, InterruptedException { |
| 600 | try { |
| 601 | // Wait for the local file upload to have been completed. |
| 602 | return orderedBuildEvent.localFileUploadProgress().get(); |
| 603 | } catch (ExecutionException e) { |
| 604 | logger.log( |
| 605 | Level.WARNING, |
| 606 | String.format( |
| 607 | "Failed to upload local files referenced by build event: %s", e.getMessage()), |
| 608 | e); |
| 609 | Throwables.throwIfUnchecked(e.getCause()); |
| 610 | throw new LocalFileUploadException(e.getCause()); |
| 611 | } |
| 612 | } |
| 613 | |
| 614 | private Timestamp currentTime() { |
| 615 | return Timestamps.fromMillis(clock.currentTimeMillis()); |
| 616 | } |
| 617 | |
| 618 | private static Result extractBuildStatus(BuildCompletingEvent event) { |
| 619 | if (event.getExitCode() != null && event.getExitCode().getNumericExitCode() == 0) { |
| 620 | return COMMAND_SUCCEEDED; |
| 621 | } else { |
| 622 | return COMMAND_FAILED; |
| 623 | } |
| 624 | } |
| 625 | |
| 626 | private static Status lastEventNotSentStatus() { |
| 627 | return Status.FAILED_PRECONDITION.withDescription( |
| 628 | "Server closed stream with status OK but not all events have been sent"); |
| 629 | } |
| 630 | |
| 631 | private static Status ackQueueNotEmptyStatus(int ackQueueSize) { |
| 632 | return Status.FAILED_PRECONDITION.withDescription( |
| 633 | String.format( |
| 634 | "Server closed stream with status OK but not all ACKs have been" |
| 635 | + " received (ackQueue=%d)", |
| 636 | ackQueueSize)); |
| 637 | } |
| 638 | |
| 639 | private static void addStreamStatusListener( |
| 640 | ListenableFuture<Status> stream, Consumer<Status> onDone) { |
| 641 | Futures.addCallback( |
| 642 | stream, |
| 643 | new FutureCallback<Status>() { |
| 644 | @Override |
| 645 | public void onSuccess(Status result) { |
| 646 | onDone.accept(result); |
| 647 | } |
| 648 | |
| 649 | @Override |
| 650 | public void onFailure(Throwable t) {} |
| 651 | }, |
| 652 | MoreExecutors.directExecutor()); |
| 653 | } |
| 654 | |
| 655 | private static boolean shouldRetryStatus(Status status) { |
| 656 | return !status.getCode().equals(Code.INVALID_ARGUMENT) |
| 657 | && !status.getCode().equals(Code.FAILED_PRECONDITION); |
| 658 | } |
| 659 | |
| 660 | private static long retrySleepMillis(int attempt) { |
| 661 | // This somewhat matches the backoff used for gRPC connection backoffs. |
| 662 | return (long) (DELAY_MILLIS * Math.pow(1.6, attempt)); |
| 663 | } |
| 664 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 665 | /** Thrown when encountered problems while uploading build event artifacts. */ |
| 666 | private class LocalFileUploadException extends Exception { |
| 667 | LocalFileUploadException(Throwable cause) { |
| 668 | super(cause); |
| 669 | } |
| 670 | } |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 671 | |
| 672 | static class Builder { |
| 673 | private BuildEventServiceClient besClient; |
| 674 | private BuildEventArtifactUploader localFileUploader; |
| 675 | private BuildEventServiceProtoUtil besProtoUtil; |
| 676 | private BuildEventProtocolOptions bepOptions; |
| 677 | private boolean publishLifecycleEvents; |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 678 | private Sleeper sleeper; |
| 679 | private Clock clock; |
| 680 | private ArtifactGroupNamer artifactGroupNamer; |
| 681 | private EventBus eventBus; |
| 682 | |
| 683 | Builder besClient(BuildEventServiceClient value) { |
| 684 | this.besClient = value; |
| 685 | return this; |
| 686 | } |
| 687 | |
| 688 | Builder localFileUploader(BuildEventArtifactUploader value) { |
| 689 | this.localFileUploader = value; |
| 690 | return this; |
| 691 | } |
| 692 | |
| 693 | Builder besProtoUtil(BuildEventServiceProtoUtil value) { |
| 694 | this.besProtoUtil = value; |
| 695 | return this; |
| 696 | } |
| 697 | |
| 698 | Builder bepOptions(BuildEventProtocolOptions value) { |
| 699 | this.bepOptions = value; |
| 700 | return this; |
| 701 | } |
| 702 | |
| 703 | Builder publishLifecycleEvents(boolean value) { |
| 704 | this.publishLifecycleEvents = value; |
| 705 | return this; |
| 706 | } |
| 707 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 708 | Builder clock(Clock value) { |
| 709 | this.clock = value; |
| 710 | return this; |
| 711 | } |
| 712 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 713 | Builder sleeper(Sleeper value) { |
| 714 | this.sleeper = value; |
| 715 | return this; |
| 716 | } |
| 717 | |
| 718 | Builder artifactGroupNamer(ArtifactGroupNamer value) { |
| 719 | this.artifactGroupNamer = value; |
| 720 | return this; |
| 721 | } |
| 722 | |
| 723 | Builder eventBus(EventBus value) { |
| 724 | this.eventBus = value; |
| 725 | return this; |
| 726 | } |
| 727 | |
| 728 | BuildEventServiceUploader build() { |
| 729 | return new BuildEventServiceUploader( |
| 730 | checkNotNull(besClient), |
| 731 | checkNotNull(localFileUploader), |
| 732 | checkNotNull(besProtoUtil), |
| 733 | checkNotNull(bepOptions), |
| 734 | publishLifecycleEvents, |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 735 | checkNotNull(sleeper), |
| 736 | checkNotNull(clock), |
| 737 | checkNotNull(artifactGroupNamer), |
| 738 | checkNotNull(eventBus)); |
| 739 | } |
| 740 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 741 | } |
lpino | a902ce7 | 2019-05-03 10:57:50 -0700 | [diff] [blame] | 742 | |