lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 1 | // Copyright 2019 The Bazel Authors. All rights reserved. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | package com.google.devtools.build.lib.buildeventservice; |
| 15 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 16 | import static com.google.common.base.Preconditions.checkNotNull; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 17 | import static com.google.common.base.Preconditions.checkState; |
| 18 | import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; |
| 19 | import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED; |
| 20 | import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED; |
| 21 | import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS; |
| 22 | |
| 23 | import com.google.common.annotations.VisibleForTesting; |
| 24 | import com.google.common.base.Preconditions; |
| 25 | import com.google.common.base.Throwables; |
| 26 | import com.google.common.collect.Iterables; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 27 | import com.google.common.eventbus.EventBus; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 28 | import com.google.common.util.concurrent.FutureCallback; |
| 29 | import com.google.common.util.concurrent.Futures; |
| 30 | import com.google.common.util.concurrent.ListenableFuture; |
| 31 | import com.google.common.util.concurrent.MoreExecutors; |
| 32 | import com.google.common.util.concurrent.SettableFuture; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 33 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand; |
| 34 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand; |
| 35 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand; |
| 36 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendBuildEventCommand; |
| 37 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendLastBuildEventCommand; |
| 38 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendRegularBuildEventCommand; |
| 39 | import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.StreamCompleteCommand; |
| 40 | import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; |
| 41 | import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext; |
| 42 | import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; |
| 43 | import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent; |
| 44 | import com.google.devtools.build.lib.buildeventstream.BuildEvent; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 45 | import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; |
| 46 | import com.google.devtools.build.lib.buildeventstream.BuildEventContext; |
| 47 | import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 48 | import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 49 | import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 50 | import com.google.devtools.build.lib.buildeventstream.PathConverter; |
| 51 | import com.google.devtools.build.lib.clock.Clock; |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 52 | import com.google.devtools.build.lib.util.AbruptExitException; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 53 | import com.google.devtools.build.lib.util.ExitCode; |
| 54 | import com.google.devtools.build.lib.util.LoggingUtil; |
| 55 | import com.google.devtools.build.lib.util.Sleeper; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 56 | import com.google.devtools.build.v1.BuildStatus.Result; |
| 57 | import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; |
| 58 | import com.google.devtools.build.v1.PublishLifecycleEventRequest; |
| 59 | import com.google.protobuf.Any; |
| 60 | import com.google.protobuf.Timestamp; |
| 61 | import com.google.protobuf.util.Timestamps; |
| 62 | import io.grpc.Status; |
| 63 | import io.grpc.Status.Code; |
| 64 | import io.grpc.StatusException; |
| 65 | import java.time.Duration; |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 66 | import java.util.ArrayDeque; |
| 67 | import java.util.Deque; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 68 | import java.util.concurrent.BlockingDeque; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 69 | import java.util.concurrent.ExecutionException; |
| 70 | import java.util.concurrent.LinkedBlockingDeque; |
| 71 | import java.util.concurrent.TimeUnit; |
| 72 | import java.util.concurrent.TimeoutException; |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 73 | import java.util.concurrent.atomic.AtomicBoolean; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 74 | import java.util.concurrent.atomic.AtomicLong; |
| 75 | import java.util.function.Consumer; |
| 76 | import java.util.logging.Level; |
| 77 | import java.util.logging.Logger; |
| 78 | import javax.annotation.concurrent.GuardedBy; |
| 79 | |
| 80 | /** |
| 81 | * Uploader of Build Events to the Build Event Service (BES). |
| 82 | * |
| 83 | * <p>The purpose is of this class is to manage the interaction between the BES client and the BES |
| 84 | * server. It implements the event loop pattern based on the commands defined by {@link |
| 85 | * BuildEventServiceUploaderCommands}. |
| 86 | */ |
| 87 | // TODO(lpino): This class should be package-private but there are unit tests that are in the |
| 88 | // different packages and rely on this. |
| 89 | @VisibleForTesting |
| 90 | public final class BuildEventServiceUploader implements Runnable { |
| 91 | private static final Logger logger = Logger.getLogger(BuildEventServiceUploader.class.getName()); |
| 92 | |
| 93 | /** Configuration knobs related to RPC retries. Values chosen by good judgement. */ |
| 94 | private static final int MAX_NUM_RETRIES = 4; |
| 95 | |
| 96 | private static final int DELAY_MILLIS = 1000; |
| 97 | |
| 98 | private final BuildEventServiceClient besClient; |
| 99 | private final BuildEventArtifactUploader localFileUploader; |
| 100 | private final BuildEventServiceProtoUtil besProtoUtil; |
| 101 | private final BuildEventProtocolOptions buildEventProtocolOptions; |
| 102 | private final boolean publishLifecycleEvents; |
| 103 | private final Duration closeTimeout; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 104 | private final Sleeper sleeper; |
| 105 | private final Clock clock; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 106 | private final ArtifactGroupNamer namer; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 107 | private final EventBus eventBus; |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 108 | private final AtomicBoolean startedClose = new AtomicBoolean(false); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 109 | |
| 110 | /** |
| 111 | * The event queue contains two types of events: - Build events, sorted by sequence number, that |
| 112 | * should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to |
| 113 | * change state. |
| 114 | */ |
| 115 | private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>(); |
| 116 | |
| 117 | /** |
| 118 | * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be |
| 119 | * consecutive monotonically increasing natural numbers. |
| 120 | */ |
| 121 | private final AtomicLong nextSeqNum = new AtomicLong(1); |
| 122 | |
| 123 | private final Object lock = new Object(); |
| 124 | |
| 125 | @GuardedBy("lock") |
| 126 | private Result buildStatus = UNKNOWN_STATUS; |
| 127 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 128 | private final SettableFuture<Void> closeFuture = SettableFuture.create(); |
felly | 6764100 | 2019-04-23 09:31:25 -0700 | [diff] [blame] | 129 | private final SettableFuture<Void> halfCloseFuture = SettableFuture.create(); |
| 130 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 131 | /** |
| 132 | * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily |
| 133 | * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes |
| 134 | * first). |
| 135 | */ |
| 136 | @GuardedBy("lock") |
| 137 | private Thread uploadThread; |
| 138 | |
| 139 | @GuardedBy("lock") |
| 140 | private boolean interruptCausedByTimeout; |
| 141 | |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 142 | @GuardedBy("lock") |
| 143 | private boolean interruptCausedByCancel; |
| 144 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 145 | private StreamContext streamContext; |
| 146 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 147 | private BuildEventServiceUploader( |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 148 | BuildEventServiceClient besClient, |
| 149 | BuildEventArtifactUploader localFileUploader, |
| 150 | BuildEventServiceProtoUtil besProtoUtil, |
| 151 | BuildEventProtocolOptions buildEventProtocolOptions, |
| 152 | boolean publishLifecycleEvents, |
| 153 | Duration closeTimeout, |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 154 | Sleeper sleeper, |
| 155 | Clock clock, |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 156 | ArtifactGroupNamer namer, |
| 157 | EventBus eventBus) { |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 158 | this.besClient = besClient; |
| 159 | this.localFileUploader = localFileUploader; |
| 160 | this.besProtoUtil = besProtoUtil; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 161 | this.buildEventProtocolOptions = buildEventProtocolOptions; |
| 162 | this.publishLifecycleEvents = publishLifecycleEvents; |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 163 | this.closeTimeout = closeTimeout; |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 164 | this.sleeper = sleeper; |
| 165 | this.clock = clock; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 166 | this.namer = namer; |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 167 | this.eventBus = eventBus; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 168 | } |
| 169 | |
| 170 | BuildEventArtifactUploader getLocalFileUploader() { |
| 171 | return localFileUploader; |
| 172 | } |
| 173 | |
| 174 | /** Enqueues an event for uploading to a BES backend. */ |
| 175 | void enqueueEvent(BuildEvent event) { |
| 176 | // This needs to happen outside a synchronized block as it may trigger |
| 177 | // stdout/stderr and lead to a deadlock. See b/109725432 |
| 178 | ListenableFuture<PathConverter> localFileUploadFuture = |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 179 | localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles()); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 180 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 181 | if (startedClose.get()) { |
| 182 | if (!localFileUploadFuture.isDone()) { |
| 183 | localFileUploadFuture.cancel(true); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 184 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 185 | return; |
| 186 | } |
| 187 | // BuildCompletingEvent marks the end of the build in the BEP event stream. |
| 188 | if (event instanceof BuildCompletingEvent) { |
| 189 | synchronized (lock) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 190 | this.buildStatus = extractBuildStatus((BuildCompletingEvent) event); |
| 191 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 192 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 193 | ensureUploadThreadStarted(); |
| 194 | eventQueue.addLast( |
| 195 | new SendRegularBuildEventCommand( |
| 196 | event, |
| 197 | localFileUploadFuture, |
| 198 | nextSeqNum.getAndIncrement(), |
| 199 | Timestamps.fromMillis(clock.currentTimeMillis()))); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 200 | } |
| 201 | |
| 202 | /** |
| 203 | * Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded |
| 204 | * and events enqueued after the call will be discarded. |
| 205 | * |
| 206 | * <p>The returned future completes when the upload completes. It's guaranteed to never fail. |
| 207 | */ |
| 208 | public ListenableFuture<Void> close() { |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 209 | if (startedClose.getAndSet(true)) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 210 | return closeFuture; |
| 211 | } |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 212 | |
| 213 | ensureUploadThreadStarted(); |
| 214 | |
| 215 | // Enqueue the last event which will terminate the upload. |
| 216 | eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime())); |
| 217 | |
| 218 | if (!closeTimeout.isZero()) { |
| 219 | startCloseTimer(closeFuture, closeTimeout); |
| 220 | } |
| 221 | |
| 222 | final SettableFuture<Void> finalCloseFuture = closeFuture; |
| 223 | closeFuture.addListener( |
| 224 | () -> { |
| 225 | // Make sure to cancel any pending uploads if the closing is cancelled. |
| 226 | if (finalCloseFuture.isCancelled()) { |
| 227 | closeOnCancel(); |
| 228 | } |
| 229 | }, |
| 230 | MoreExecutors.directExecutor()); |
| 231 | |
| 232 | return closeFuture; |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 233 | } |
| 234 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 235 | private void closeOnTimeout() { |
| 236 | synchronized (lock) { |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 237 | interruptCausedByTimeout = true; |
| 238 | closeNow(); |
| 239 | } |
| 240 | } |
| 241 | |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 242 | private void closeOnCancel() { |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 243 | synchronized (lock) { |
| 244 | interruptCausedByCancel = true; |
| 245 | closeNow(); |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */ |
| 250 | private void closeNow() { |
| 251 | synchronized (lock) { |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 252 | if (uploadThread != null) { |
| 253 | if (uploadThread.isInterrupted()) { |
| 254 | return; |
| 255 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 256 | uploadThread.interrupt(); |
| 257 | } |
| 258 | } |
| 259 | } |
| 260 | |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 261 | private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) { |
lpino | d143ca4 | 2019-04-15 04:27:39 -0700 | [diff] [blame] | 262 | checkState(!exitCode.equals(ExitCode.SUCCESS)); |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 263 | logger.severe(message); |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 264 | closeFuture.setException(new AbruptExitException(message, exitCode, cause)); |
lpino | 2a42e2b | 2019-03-01 15:11:28 -0800 | [diff] [blame] | 265 | } |
| 266 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 267 | @Override |
| 268 | public void run() { |
| 269 | try { |
| 270 | if (publishLifecycleEvents) { |
| 271 | publishLifecycleEvent(besProtoUtil.buildEnqueued(currentTime())); |
| 272 | publishLifecycleEvent(besProtoUtil.invocationStarted(currentTime())); |
| 273 | } |
| 274 | |
| 275 | try { |
| 276 | publishBuildEvents(); |
| 277 | } finally { |
| 278 | if (publishLifecycleEvents) { |
| 279 | Result buildStatus; |
| 280 | synchronized (lock) { |
| 281 | buildStatus = this.buildStatus; |
| 282 | } |
| 283 | publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus)); |
| 284 | publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus)); |
| 285 | } |
| 286 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 287 | } catch (InterruptedException e) { |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 288 | logger.info("Aborting the BES upload due to having received an interrupt"); |
| 289 | synchronized (lock) { |
| 290 | Preconditions.checkState( |
| 291 | interruptCausedByTimeout || interruptCausedByCancel, |
| 292 | "Unexpected interrupt on BES uploader thread"); |
| 293 | if (interruptCausedByTimeout) { |
| 294 | logAndExitAbruptly( |
| 295 | "The Build Event Protocol upload timed out", |
| 296 | ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| 297 | e); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 298 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 299 | } |
| 300 | } catch (StatusException e) { |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 301 | logAndExitAbruptly( |
| 302 | "The Build Event Protocol upload failed: " + besClient.userReadableError(e), |
| 303 | shouldRetryStatus(e.getStatus()) |
| 304 | ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR |
| 305 | : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| 306 | e); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 307 | } catch (LocalFileUploadException e) { |
| 308 | Throwables.throwIfUnchecked(e.getCause()); |
lpino | f51f14b | 2019-04-17 14:14:56 -0700 | [diff] [blame] | 309 | logAndExitAbruptly( |
| 310 | "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(), |
| 311 | ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| 312 | e.getCause()); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 313 | } catch (Throwable e) { |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 314 | closeFuture.setException(e); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 315 | logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug."); |
| 316 | throw e; |
| 317 | } finally { |
| 318 | localFileUploader.shutdown(); |
lpino | 7c79576 | 2019-04-25 04:31:51 -0700 | [diff] [blame] | 319 | closeFuture.set(null); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 320 | } |
| 321 | } |
| 322 | |
| 323 | private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent( |
| 324 | PathConverter pathConverter, |
| 325 | SendRegularBuildEventCommand buildEvent) { |
| 326 | BuildEventContext ctx = |
| 327 | new BuildEventContext() { |
| 328 | @Override |
| 329 | public PathConverter pathConverter() { |
| 330 | return pathConverter; |
| 331 | } |
| 332 | |
| 333 | @Override |
| 334 | public ArtifactGroupNamer artifactGroupNamer() { |
| 335 | return namer; |
| 336 | } |
| 337 | |
| 338 | @Override |
| 339 | public BuildEventProtocolOptions getOptions() { |
| 340 | return buildEventProtocolOptions; |
| 341 | } |
| 342 | }; |
| 343 | BuildEventStreamProtos.BuildEvent serializedBepEvent = |
| 344 | buildEvent.getEvent().asStreamProto(ctx); |
lpino | 1d205e1 | 2019-02-15 07:09:29 -0800 | [diff] [blame] | 345 | |
| 346 | // TODO(lpino): Remove this logging once we can make every single event smaller than 1MB |
| 347 | // as protobuf recommends. |
| 348 | if (serializedBepEvent.getSerializedSize() |
| 349 | > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) { |
| 350 | eventBus.post( |
| 351 | new LargeBuildEventSerializedEvent( |
| 352 | serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize())); |
| 353 | } |
| 354 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 355 | return serializedBepEvent; |
| 356 | } |
| 357 | |
| 358 | private void publishBuildEvents() |
| 359 | throws StatusException, LocalFileUploadException, InterruptedException { |
| 360 | eventQueue.addFirst(new OpenStreamCommand()); |
| 361 | |
| 362 | // Every build event sent to the server needs to be acknowledged by it. This queue stores |
| 363 | // the build events that have been sent and still have to be acknowledged by the server. |
| 364 | // The build events are stored in the order they were sent. |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 365 | Deque<SendBuildEventCommand> ackQueue = new ArrayDeque<>(); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 366 | boolean lastEventSent = false; |
| 367 | int acksReceived = 0; |
| 368 | int retryAttempt = 0; |
| 369 | |
| 370 | try { |
| 371 | // {@link BuildEventServiceUploaderCommands#OPEN_STREAM} is the first event and opens a |
| 372 | // bidi streaming RPC for sending build events and receiving ACKs. |
| 373 | // {@link BuildEventServiceUploaderCommands#SEND_REGULAR_BUILD_EVENT} sends a build event to |
| 374 | // the server. Sending of the Nth build event does |
| 375 | // does not wait for the ACK of the N-1th build event to have been received. |
| 376 | // {@link BuildEventServiceUploaderCommands#SEND_LAST_BUILD_EVENT} sends the last build event |
| 377 | // and half closes the RPC. |
| 378 | // {@link BuildEventServiceUploaderCommands#ACK_RECEIVED} is executed for every ACK from |
| 379 | // the server and checks that the ACKs are in the correct order. |
| 380 | // {@link BuildEventServiceUploaderCommands#STREAM_COMPLETE} checks that all build events |
| 381 | // have been sent and all ACKs have been received. If not it invokes a retry logic that may |
| 382 | // decide to re-send every build event for which an ACK has not been received. If so, it |
| 383 | // adds an OPEN_STREAM event. |
| 384 | while (true) { |
| 385 | EventLoopCommand event = eventQueue.takeFirst(); |
| 386 | switch (event.type()) { |
| 387 | case OPEN_STREAM: |
| 388 | { |
| 389 | // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT |
| 390 | // or SEND_LAST_BUILD_EVENT |
| 391 | logger.info( |
| 392 | String.format("Starting publishBuildEvents: eventQueue=%d", eventQueue.size())); |
| 393 | streamContext = |
| 394 | besClient.openStream( |
| 395 | (ack) -> eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber()))); |
| 396 | addStreamStatusListener( |
| 397 | streamContext.getStatus(), |
| 398 | (status) -> eventQueue.addLast(new StreamCompleteCommand(status))); |
| 399 | } |
| 400 | break; |
| 401 | |
| 402 | case SEND_REGULAR_BUILD_EVENT: |
| 403 | { |
| 404 | // Invariant: the eventQueue may contain events of any type |
| 405 | SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event; |
| 406 | ackQueue.addLast(buildEvent); |
| 407 | |
| 408 | PathConverter pathConverter = waitForLocalFileUploads(buildEvent); |
| 409 | |
| 410 | BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent = |
| 411 | createSerializedRegularBuildEvent(pathConverter, buildEvent); |
| 412 | |
| 413 | PublishBuildToolEventStreamRequest request = |
| 414 | besProtoUtil.bazelEvent( |
| 415 | buildEvent.getSequenceNumber(), |
| 416 | buildEvent.getCreationTime(), |
| 417 | Any.pack(serializedRegularBuildEvent)); |
| 418 | |
| 419 | streamContext.sendOverStream(request); |
| 420 | } |
| 421 | break; |
| 422 | |
| 423 | case SEND_LAST_BUILD_EVENT: |
| 424 | { |
| 425 | // Invariant: the eventQueue may contain events of any type |
| 426 | SendBuildEventCommand lastEvent = (SendLastBuildEventCommand) event; |
| 427 | ackQueue.addLast(lastEvent); |
| 428 | lastEventSent = true; |
| 429 | PublishBuildToolEventStreamRequest request = |
| 430 | besProtoUtil.streamFinished( |
| 431 | lastEvent.getSequenceNumber(), lastEvent.getCreationTime()); |
| 432 | streamContext.sendOverStream(request); |
| 433 | streamContext.halfCloseStream(); |
felly | 6764100 | 2019-04-23 09:31:25 -0700 | [diff] [blame] | 434 | halfCloseFuture.set(null); |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 435 | } |
| 436 | break; |
| 437 | |
| 438 | case ACK_RECEIVED: |
| 439 | { |
| 440 | // Invariant: the eventQueue may contain events of any type |
| 441 | AckReceivedCommand ackEvent = (AckReceivedCommand) event; |
| 442 | if (!ackQueue.isEmpty()) { |
| 443 | SendBuildEventCommand expected = ackQueue.removeFirst(); |
| 444 | long actualSeqNum = ackEvent.getSequenceNumber(); |
| 445 | if (expected.getSequenceNumber() == actualSeqNum) { |
| 446 | acksReceived++; |
| 447 | } else { |
| 448 | ackQueue.addFirst(expected); |
| 449 | String message = |
| 450 | String.format( |
| 451 | "Expected ACK with seqNum=%d but received ACK with seqNum=%d", |
| 452 | expected.getSequenceNumber(), actualSeqNum); |
| 453 | logger.info(message); |
| 454 | streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
felly | 916e736 | 2019-04-16 09:10:22 -0700 | [diff] [blame] | 455 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 456 | } else { |
| 457 | String message = |
| 458 | String.format( |
| 459 | "Received ACK (seqNum=%d) when no ACK was expected", |
| 460 | ackEvent.getSequenceNumber()); |
| 461 | logger.info(message); |
| 462 | streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message)); |
| 463 | } |
| 464 | } |
| 465 | break; |
| 466 | |
| 467 | case STREAM_COMPLETE: |
| 468 | { |
| 469 | // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT |
| 470 | // or SEND_LAST_BUILD_EVENT |
| 471 | streamContext = null; |
| 472 | StreamCompleteCommand completeEvent = (StreamCompleteCommand) event; |
| 473 | Status streamStatus = completeEvent.status(); |
| 474 | if (streamStatus.isOk()) { |
| 475 | if (lastEventSent && ackQueue.isEmpty()) { |
| 476 | logger.info("publishBuildEvents was successful"); |
| 477 | // Upload successful. Break out from the while(true) loop. |
| 478 | return; |
| 479 | } else { |
| 480 | throw (lastEventSent |
| 481 | ? ackQueueNotEmptyStatus(ackQueue.size()) |
| 482 | : lastEventNotSentStatus()) |
| 483 | .asException(); |
| 484 | } |
| 485 | } |
| 486 | |
| 487 | if (!shouldRetryStatus(streamStatus)) { |
| 488 | logger.info( |
| 489 | String.format("Not retrying publishBuildEvents: status='%s'", streamStatus)); |
| 490 | throw streamStatus.asException(); |
| 491 | } |
| 492 | if (retryAttempt == MAX_NUM_RETRIES) { |
| 493 | logger.info( |
| 494 | String.format( |
| 495 | "Not retrying publishBuildEvents, no more attempts left: status='%s'", |
| 496 | streamStatus)); |
| 497 | throw streamStatus.asException(); |
| 498 | } |
| 499 | |
| 500 | // Retry logic |
| 501 | // Adds events from the ackQueue to the front of the eventQueue, so that the |
| 502 | // events in the eventQueue are sorted by sequence number (ascending). |
| 503 | SendBuildEventCommand unacked; |
| 504 | while ((unacked = ackQueue.pollLast()) != null) { |
| 505 | eventQueue.addFirst(unacked); |
| 506 | } |
| 507 | |
| 508 | long sleepMillis = retrySleepMillis(retryAttempt); |
| 509 | logger.info( |
| 510 | String.format( |
| 511 | "Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis)); |
| 512 | sleeper.sleepMillis(sleepMillis); |
| 513 | |
| 514 | // If we made progress, meaning the server ACKed events that we sent, then reset |
| 515 | // the retry counter to 0. |
| 516 | if (acksReceived > 0) { |
| 517 | retryAttempt = 0; |
| 518 | } else { |
| 519 | retryAttempt++; |
| 520 | } |
| 521 | acksReceived = 0; |
| 522 | eventQueue.addFirst(new OpenStreamCommand()); |
| 523 | } |
| 524 | break; |
| 525 | } |
| 526 | } |
| 527 | } catch (InterruptedException | LocalFileUploadException e) { |
| 528 | int limit = 30; |
| 529 | logger.info( |
| 530 | String.format( |
| 531 | "Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, " |
| 532 | + "ack_queue: %s, event_queue_size: %d, event_queue: %s", |
| 533 | limit, |
| 534 | ackQueue.size(), |
| 535 | Iterables.limit(ackQueue, limit), |
| 536 | eventQueue.size(), |
| 537 | Iterables.limit(eventQueue, limit))); |
| 538 | if (streamContext != null) { |
| 539 | streamContext.abortStream(Status.CANCELLED); |
| 540 | } |
| 541 | throw e; |
| 542 | } finally { |
| 543 | // Cancel all local file uploads that may still be running |
| 544 | // of events that haven't been uploaded. |
| 545 | EventLoopCommand event; |
| 546 | while ((event = ackQueue.pollFirst()) != null) { |
| 547 | if (event instanceof SendRegularBuildEventCommand) { |
| 548 | cancelLocalFileUpload((SendRegularBuildEventCommand) event); |
| 549 | } |
| 550 | } |
| 551 | while ((event = eventQueue.pollFirst()) != null) { |
| 552 | if (event instanceof SendRegularBuildEventCommand) { |
| 553 | cancelLocalFileUpload((SendRegularBuildEventCommand) event); |
| 554 | } |
| 555 | } |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | private void cancelLocalFileUpload(SendRegularBuildEventCommand event) { |
| 560 | ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress(); |
| 561 | if (!localFileUploaderFuture.isDone()) { |
| 562 | localFileUploaderFuture.cancel(true); |
| 563 | } |
| 564 | } |
| 565 | |
| 566 | /** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */ |
| 567 | private void publishLifecycleEvent(PublishLifecycleEventRequest request) |
| 568 | throws StatusException, InterruptedException { |
| 569 | int retryAttempt = 0; |
| 570 | StatusException cause = null; |
| 571 | while (retryAttempt <= MAX_NUM_RETRIES) { |
| 572 | try { |
| 573 | besClient.publish(request); |
| 574 | return; |
| 575 | } catch (StatusException e) { |
| 576 | if (!shouldRetryStatus(e.getStatus())) { |
| 577 | logger.info( |
| 578 | String.format( |
| 579 | "Not retrying publishLifecycleEvent: status='%s'", e.getStatus().toString())); |
| 580 | throw e; |
| 581 | } |
| 582 | |
| 583 | cause = e; |
| 584 | |
| 585 | long sleepMillis = retrySleepMillis(retryAttempt); |
| 586 | logger.info( |
| 587 | String.format( |
| 588 | "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d", |
| 589 | e.getStatus().toString(), sleepMillis)); |
| 590 | sleeper.sleepMillis(sleepMillis); |
| 591 | retryAttempt++; |
| 592 | } |
| 593 | } |
| 594 | |
| 595 | // All retry attempts failed |
| 596 | throw cause; |
| 597 | } |
| 598 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 599 | private void ensureUploadThreadStarted() { |
| 600 | synchronized (lock) { |
| 601 | if (uploadThread == null) { |
| 602 | uploadThread = new Thread(this, "bes-uploader"); |
| 603 | uploadThread.start(); |
| 604 | } |
| 605 | } |
| 606 | } |
| 607 | |
| 608 | private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) { |
| 609 | Thread closeTimer = |
| 610 | new Thread( |
| 611 | () -> { |
| 612 | // Call closeOnTimeout() if the future does not complete within closeTimeout |
| 613 | try { |
| 614 | getUninterruptibly(closeFuture, closeTimeout.toMillis(), TimeUnit.MILLISECONDS); |
| 615 | } catch (TimeoutException e) { |
| 616 | closeOnTimeout(); |
| 617 | } catch (ExecutionException e) { |
| 618 | if (e.getCause() instanceof TimeoutException) { |
| 619 | // This is likely due to an internal timeout doing the local file uploading. |
| 620 | closeOnTimeout(); |
| 621 | } else { |
| 622 | // This code only cares about calling closeOnTimeout() if the closeFuture does |
| 623 | // not complete within closeTimeout. |
| 624 | String failureMsg = "BES close failure"; |
| 625 | logger.severe(failureMsg); |
| 626 | LoggingUtil.logToRemote(Level.SEVERE, failureMsg, e); |
| 627 | } |
| 628 | } |
| 629 | }, |
| 630 | "bes-uploader-close-timer"); |
| 631 | closeTimer.start(); |
| 632 | } |
| 633 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 634 | private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent) |
| 635 | throws LocalFileUploadException, InterruptedException { |
| 636 | try { |
| 637 | // Wait for the local file upload to have been completed. |
| 638 | return orderedBuildEvent.localFileUploadProgress().get(); |
| 639 | } catch (ExecutionException e) { |
| 640 | logger.log( |
| 641 | Level.WARNING, |
| 642 | String.format( |
| 643 | "Failed to upload local files referenced by build event: %s", e.getMessage()), |
| 644 | e); |
| 645 | Throwables.throwIfUnchecked(e.getCause()); |
| 646 | throw new LocalFileUploadException(e.getCause()); |
| 647 | } |
| 648 | } |
| 649 | |
| 650 | private Timestamp currentTime() { |
| 651 | return Timestamps.fromMillis(clock.currentTimeMillis()); |
| 652 | } |
| 653 | |
| 654 | private static Result extractBuildStatus(BuildCompletingEvent event) { |
| 655 | if (event.getExitCode() != null && event.getExitCode().getNumericExitCode() == 0) { |
| 656 | return COMMAND_SUCCEEDED; |
| 657 | } else { |
| 658 | return COMMAND_FAILED; |
| 659 | } |
| 660 | } |
| 661 | |
| 662 | private static Status lastEventNotSentStatus() { |
| 663 | return Status.FAILED_PRECONDITION.withDescription( |
| 664 | "Server closed stream with status OK but not all events have been sent"); |
| 665 | } |
| 666 | |
| 667 | private static Status ackQueueNotEmptyStatus(int ackQueueSize) { |
| 668 | return Status.FAILED_PRECONDITION.withDescription( |
| 669 | String.format( |
| 670 | "Server closed stream with status OK but not all ACKs have been" |
| 671 | + " received (ackQueue=%d)", |
| 672 | ackQueueSize)); |
| 673 | } |
| 674 | |
| 675 | private static void addStreamStatusListener( |
| 676 | ListenableFuture<Status> stream, Consumer<Status> onDone) { |
| 677 | Futures.addCallback( |
| 678 | stream, |
| 679 | new FutureCallback<Status>() { |
| 680 | @Override |
| 681 | public void onSuccess(Status result) { |
| 682 | onDone.accept(result); |
| 683 | } |
| 684 | |
| 685 | @Override |
| 686 | public void onFailure(Throwable t) {} |
| 687 | }, |
| 688 | MoreExecutors.directExecutor()); |
| 689 | } |
| 690 | |
| 691 | private static boolean shouldRetryStatus(Status status) { |
| 692 | return !status.getCode().equals(Code.INVALID_ARGUMENT) |
| 693 | && !status.getCode().equals(Code.FAILED_PRECONDITION); |
| 694 | } |
| 695 | |
| 696 | private static long retrySleepMillis(int attempt) { |
| 697 | // This somewhat matches the backoff used for gRPC connection backoffs. |
| 698 | return (long) (DELAY_MILLIS * Math.pow(1.6, attempt)); |
| 699 | } |
| 700 | |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 701 | /** Thrown when encountered problems while uploading build event artifacts. */ |
| 702 | private class LocalFileUploadException extends Exception { |
| 703 | LocalFileUploadException(Throwable cause) { |
| 704 | super(cause); |
| 705 | } |
| 706 | } |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 707 | |
| 708 | static class Builder { |
| 709 | private BuildEventServiceClient besClient; |
| 710 | private BuildEventArtifactUploader localFileUploader; |
| 711 | private BuildEventServiceProtoUtil besProtoUtil; |
| 712 | private BuildEventProtocolOptions bepOptions; |
| 713 | private boolean publishLifecycleEvents; |
| 714 | private Duration closeTimeout; |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 715 | private Sleeper sleeper; |
| 716 | private Clock clock; |
| 717 | private ArtifactGroupNamer artifactGroupNamer; |
| 718 | private EventBus eventBus; |
| 719 | |
| 720 | Builder besClient(BuildEventServiceClient value) { |
| 721 | this.besClient = value; |
| 722 | return this; |
| 723 | } |
| 724 | |
| 725 | Builder localFileUploader(BuildEventArtifactUploader value) { |
| 726 | this.localFileUploader = value; |
| 727 | return this; |
| 728 | } |
| 729 | |
| 730 | Builder besProtoUtil(BuildEventServiceProtoUtil value) { |
| 731 | this.besProtoUtil = value; |
| 732 | return this; |
| 733 | } |
| 734 | |
| 735 | Builder bepOptions(BuildEventProtocolOptions value) { |
| 736 | this.bepOptions = value; |
| 737 | return this; |
| 738 | } |
| 739 | |
| 740 | Builder publishLifecycleEvents(boolean value) { |
| 741 | this.publishLifecycleEvents = value; |
| 742 | return this; |
| 743 | } |
| 744 | |
| 745 | Builder closeTimeout(Duration value) { |
| 746 | this.closeTimeout = value; |
| 747 | return this; |
| 748 | } |
| 749 | |
| 750 | Builder clock(Clock value) { |
| 751 | this.clock = value; |
| 752 | return this; |
| 753 | } |
| 754 | |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 755 | Builder sleeper(Sleeper value) { |
| 756 | this.sleeper = value; |
| 757 | return this; |
| 758 | } |
| 759 | |
| 760 | Builder artifactGroupNamer(ArtifactGroupNamer value) { |
| 761 | this.artifactGroupNamer = value; |
| 762 | return this; |
| 763 | } |
| 764 | |
| 765 | Builder eventBus(EventBus value) { |
| 766 | this.eventBus = value; |
| 767 | return this; |
| 768 | } |
| 769 | |
| 770 | BuildEventServiceUploader build() { |
| 771 | return new BuildEventServiceUploader( |
| 772 | checkNotNull(besClient), |
| 773 | checkNotNull(localFileUploader), |
| 774 | checkNotNull(besProtoUtil), |
| 775 | checkNotNull(bepOptions), |
| 776 | publishLifecycleEvents, |
| 777 | checkNotNull(closeTimeout), |
lpino | 2bf8906 | 2019-02-21 03:24:49 -0800 | [diff] [blame] | 778 | checkNotNull(sleeper), |
| 779 | checkNotNull(clock), |
| 780 | checkNotNull(artifactGroupNamer), |
| 781 | checkNotNull(eventBus)); |
| 782 | } |
| 783 | } |
lpino | 83a1bfd | 2019-02-14 15:30:18 -0800 | [diff] [blame] | 784 | } |
| 785 | |