blob: 66ce615e2588e9cfab1270a5627f4fc618703bbe [file] [log] [blame]
lpino83a1bfd2019-02-14 15:30:18 -08001// Copyright 2019 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.buildeventservice;
15
lpino2bf89062019-02-21 03:24:49 -080016import static com.google.common.base.Preconditions.checkNotNull;
lpino83a1bfd2019-02-14 15:30:18 -080017import static com.google.common.base.Preconditions.checkState;
lpino83a1bfd2019-02-14 15:30:18 -080018import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
19import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
20import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
21
22import com.google.common.annotations.VisibleForTesting;
23import com.google.common.base.Preconditions;
24import com.google.common.base.Throwables;
25import com.google.common.collect.Iterables;
lpino1d205e12019-02-15 07:09:29 -080026import com.google.common.eventbus.EventBus;
lpino83a1bfd2019-02-14 15:30:18 -080027import com.google.common.util.concurrent.FutureCallback;
28import com.google.common.util.concurrent.Futures;
29import com.google.common.util.concurrent.ListenableFuture;
30import com.google.common.util.concurrent.MoreExecutors;
31import com.google.common.util.concurrent.SettableFuture;
lpino83a1bfd2019-02-14 15:30:18 -080032import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand;
33import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand;
34import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand;
35import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendBuildEventCommand;
36import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendLastBuildEventCommand;
37import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendRegularBuildEventCommand;
38import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.StreamCompleteCommand;
39import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
40import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext;
41import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
42import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent;
43import com.google.devtools.build.lib.buildeventstream.BuildEvent;
lpino83a1bfd2019-02-14 15:30:18 -080044import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
45import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
46import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
47import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
lpino1d205e12019-02-15 07:09:29 -080048import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
lpino83a1bfd2019-02-14 15:30:18 -080049import com.google.devtools.build.lib.buildeventstream.PathConverter;
50import com.google.devtools.build.lib.clock.Clock;
lpino2a42e2b2019-03-01 15:11:28 -080051import com.google.devtools.build.lib.util.AbruptExitException;
lpino83a1bfd2019-02-14 15:30:18 -080052import com.google.devtools.build.lib.util.ExitCode;
lpino83a1bfd2019-02-14 15:30:18 -080053import com.google.devtools.build.lib.util.Sleeper;
lpino83a1bfd2019-02-14 15:30:18 -080054import com.google.devtools.build.v1.BuildStatus.Result;
55import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
56import com.google.devtools.build.v1.PublishLifecycleEventRequest;
57import com.google.protobuf.Any;
58import com.google.protobuf.Timestamp;
59import com.google.protobuf.util.Timestamps;
60import io.grpc.Status;
61import io.grpc.Status.Code;
62import io.grpc.StatusException;
felly916e7362019-04-16 09:10:22 -070063import java.util.ArrayDeque;
64import java.util.Deque;
lpino83a1bfd2019-02-14 15:30:18 -080065import java.util.concurrent.BlockingDeque;
lpino83a1bfd2019-02-14 15:30:18 -080066import java.util.concurrent.ExecutionException;
67import java.util.concurrent.LinkedBlockingDeque;
lpino7c795762019-04-25 04:31:51 -070068import java.util.concurrent.atomic.AtomicBoolean;
lpino83a1bfd2019-02-14 15:30:18 -080069import java.util.concurrent.atomic.AtomicLong;
70import java.util.function.Consumer;
71import java.util.logging.Level;
72import java.util.logging.Logger;
73import javax.annotation.concurrent.GuardedBy;
74
75/**
76 * Uploader of Build Events to the Build Event Service (BES).
77 *
78 * <p>The purpose is of this class is to manage the interaction between the BES client and the BES
79 * server. It implements the event loop pattern based on the commands defined by {@link
80 * BuildEventServiceUploaderCommands}.
81 */
82// TODO(lpino): This class should be package-private but there are unit tests that are in the
83// different packages and rely on this.
84@VisibleForTesting
85public final class BuildEventServiceUploader implements Runnable {
86 private static final Logger logger = Logger.getLogger(BuildEventServiceUploader.class.getName());
87
88 /** Configuration knobs related to RPC retries. Values chosen by good judgement. */
89 private static final int MAX_NUM_RETRIES = 4;
90
91 private static final int DELAY_MILLIS = 1000;
92
93 private final BuildEventServiceClient besClient;
94 private final BuildEventArtifactUploader localFileUploader;
95 private final BuildEventServiceProtoUtil besProtoUtil;
96 private final BuildEventProtocolOptions buildEventProtocolOptions;
97 private final boolean publishLifecycleEvents;
lpino83a1bfd2019-02-14 15:30:18 -080098 private final Sleeper sleeper;
99 private final Clock clock;
lpino83a1bfd2019-02-14 15:30:18 -0800100 private final ArtifactGroupNamer namer;
lpino1d205e12019-02-15 07:09:29 -0800101 private final EventBus eventBus;
lpino7c795762019-04-25 04:31:51 -0700102 private final AtomicBoolean startedClose = new AtomicBoolean(false);
lpino83a1bfd2019-02-14 15:30:18 -0800103
104 /**
105 * The event queue contains two types of events: - Build events, sorted by sequence number, that
106 * should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to
107 * change state.
108 */
109 private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>();
110
111 /**
112 * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be
113 * consecutive monotonically increasing natural numbers.
114 */
115 private final AtomicLong nextSeqNum = new AtomicLong(1);
116
117 private final Object lock = new Object();
118
119 @GuardedBy("lock")
120 private Result buildStatus = UNKNOWN_STATUS;
121
lpino7c795762019-04-25 04:31:51 -0700122 private final SettableFuture<Void> closeFuture = SettableFuture.create();
felly67641002019-04-23 09:31:25 -0700123 private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
124
lpino83a1bfd2019-02-14 15:30:18 -0800125 /**
126 * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
127 * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes
128 * first).
129 */
130 @GuardedBy("lock")
131 private Thread uploadThread;
132
133 @GuardedBy("lock")
lpinod143ca42019-04-15 04:27:39 -0700134 private boolean interruptCausedByCancel;
135
lpino83a1bfd2019-02-14 15:30:18 -0800136 private StreamContext streamContext;
137
lpino2bf89062019-02-21 03:24:49 -0800138 private BuildEventServiceUploader(
lpino83a1bfd2019-02-14 15:30:18 -0800139 BuildEventServiceClient besClient,
140 BuildEventArtifactUploader localFileUploader,
141 BuildEventServiceProtoUtil besProtoUtil,
142 BuildEventProtocolOptions buildEventProtocolOptions,
143 boolean publishLifecycleEvents,
lpino83a1bfd2019-02-14 15:30:18 -0800144 Sleeper sleeper,
145 Clock clock,
lpino1d205e12019-02-15 07:09:29 -0800146 ArtifactGroupNamer namer,
147 EventBus eventBus) {
lpino2bf89062019-02-21 03:24:49 -0800148 this.besClient = besClient;
149 this.localFileUploader = localFileUploader;
150 this.besProtoUtil = besProtoUtil;
lpino83a1bfd2019-02-14 15:30:18 -0800151 this.buildEventProtocolOptions = buildEventProtocolOptions;
152 this.publishLifecycleEvents = publishLifecycleEvents;
lpino2bf89062019-02-21 03:24:49 -0800153 this.sleeper = sleeper;
154 this.clock = clock;
lpino83a1bfd2019-02-14 15:30:18 -0800155 this.namer = namer;
lpino1d205e12019-02-15 07:09:29 -0800156 this.eventBus = eventBus;
lpino83a1bfd2019-02-14 15:30:18 -0800157 }
158
159 BuildEventArtifactUploader getLocalFileUploader() {
160 return localFileUploader;
161 }
162
163 /** Enqueues an event for uploading to a BES backend. */
164 void enqueueEvent(BuildEvent event) {
165 // This needs to happen outside a synchronized block as it may trigger
166 // stdout/stderr and lead to a deadlock. See b/109725432
167 ListenableFuture<PathConverter> localFileUploadFuture =
felly916e7362019-04-16 09:10:22 -0700168 localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
lpino83a1bfd2019-02-14 15:30:18 -0800169
lpino7c795762019-04-25 04:31:51 -0700170 if (startedClose.get()) {
171 if (!localFileUploadFuture.isDone()) {
172 localFileUploadFuture.cancel(true);
lpino83a1bfd2019-02-14 15:30:18 -0800173 }
lpino7c795762019-04-25 04:31:51 -0700174 return;
175 }
lpino4b092cd2019-04-26 07:20:53 -0700176
177 // The generation of the sequence number and the addition to the {@link #eventQueue} should be
178 // atomic since BES expects the events in that exact order.
179 // More details can be found in b/131393380.
180 // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order.
181 synchronized (lock) {
182 // BuildCompletingEvent marks the end of the build in the BEP event stream.
183 if (event instanceof BuildCompletingEvent) {
lpino83a1bfd2019-02-14 15:30:18 -0800184 this.buildStatus = extractBuildStatus((BuildCompletingEvent) event);
185 }
lpino4b092cd2019-04-26 07:20:53 -0700186 ensureUploadThreadStarted();
187
188 // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's
189 // always used under lock. It would be cleaner and more performant to update the sequence
190 // number when we take the item off the queue.
191 eventQueue.addLast(
192 new SendRegularBuildEventCommand(
193 event,
194 localFileUploadFuture,
195 nextSeqNum.getAndIncrement(),
196 Timestamps.fromMillis(clock.currentTimeMillis())));
lpino83a1bfd2019-02-14 15:30:18 -0800197 }
198 }
199
200 /**
201 * Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded
202 * and events enqueued after the call will be discarded.
203 *
204 * <p>The returned future completes when the upload completes. It's guaranteed to never fail.
205 */
206 public ListenableFuture<Void> close() {
lpino7c795762019-04-25 04:31:51 -0700207 if (startedClose.getAndSet(true)) {
lpino83a1bfd2019-02-14 15:30:18 -0800208 return closeFuture;
209 }
lpino7c795762019-04-25 04:31:51 -0700210
211 ensureUploadThreadStarted();
212
lpino4b092cd2019-04-26 07:20:53 -0700213 // The generation of the sequence number and the addition to the {@link #eventQueue} should be
214 // atomic since BES expects the events in that exact order.
215 // More details can be found in b/131393380.
216 // TODO(bazel-team): Consider relaxing this invariant by having a more relaxed order.
217 synchronized (lock) {
218 // Enqueue the last event which will terminate the upload.
219 // TODO(b/131393380): {@link #nextSeqNum} doesn't need to be an AtomicInteger if it's
220 // always used under lock. It would be cleaner and more performant to update the sequence
221 // number when we take the item off the queue.
222 eventQueue.addLast(
223 new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
224 }
lpino7c795762019-04-25 04:31:51 -0700225
lpino7c795762019-04-25 04:31:51 -0700226 final SettableFuture<Void> finalCloseFuture = closeFuture;
227 closeFuture.addListener(
228 () -> {
229 // Make sure to cancel any pending uploads if the closing is cancelled.
230 if (finalCloseFuture.isCancelled()) {
231 closeOnCancel();
232 }
233 },
234 MoreExecutors.directExecutor());
235
236 return closeFuture;
lpino83a1bfd2019-02-14 15:30:18 -0800237 }
238
lpino7c795762019-04-25 04:31:51 -0700239 private void closeOnCancel() {
lpinod143ca42019-04-15 04:27:39 -0700240 synchronized (lock) {
241 interruptCausedByCancel = true;
242 closeNow();
243 }
244 }
245
246 /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
247 private void closeNow() {
248 synchronized (lock) {
lpino83a1bfd2019-02-14 15:30:18 -0800249 if (uploadThread != null) {
250 if (uploadThread.isInterrupted()) {
251 return;
252 }
lpino83a1bfd2019-02-14 15:30:18 -0800253 uploadThread.interrupt();
254 }
255 }
256 }
257
lpino2a42e2b2019-03-01 15:11:28 -0800258 private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
lpinod143ca42019-04-15 04:27:39 -0700259 checkState(!exitCode.equals(ExitCode.SUCCESS));
lpinof51f14b2019-04-17 14:14:56 -0700260 logger.severe(message);
lpino7c795762019-04-25 04:31:51 -0700261 closeFuture.setException(new AbruptExitException(message, exitCode, cause));
lpino2a42e2b2019-03-01 15:11:28 -0800262 }
263
lpino83a1bfd2019-02-14 15:30:18 -0800264 @Override
265 public void run() {
266 try {
267 if (publishLifecycleEvents) {
268 publishLifecycleEvent(besProtoUtil.buildEnqueued(currentTime()));
269 publishLifecycleEvent(besProtoUtil.invocationStarted(currentTime()));
270 }
271
272 try {
273 publishBuildEvents();
274 } finally {
275 if (publishLifecycleEvents) {
276 Result buildStatus;
277 synchronized (lock) {
278 buildStatus = this.buildStatus;
279 }
280 publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus));
281 publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus));
282 }
283 }
lpino83a1bfd2019-02-14 15:30:18 -0800284 } catch (InterruptedException e) {
lpinof51f14b2019-04-17 14:14:56 -0700285 logger.info("Aborting the BES upload due to having received an interrupt");
286 synchronized (lock) {
287 Preconditions.checkState(
lpino82f50902019-04-25 16:08:49 -0700288 interruptCausedByCancel, "Unexpected interrupt on BES uploader thread");
lpino83a1bfd2019-02-14 15:30:18 -0800289 }
290 } catch (StatusException e) {
lpinof51f14b2019-04-17 14:14:56 -0700291 logAndExitAbruptly(
292 "The Build Event Protocol upload failed: " + besClient.userReadableError(e),
293 shouldRetryStatus(e.getStatus())
294 ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR
295 : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
296 e);
lpino83a1bfd2019-02-14 15:30:18 -0800297 } catch (LocalFileUploadException e) {
298 Throwables.throwIfUnchecked(e.getCause());
lpinof51f14b2019-04-17 14:14:56 -0700299 logAndExitAbruptly(
300 "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(),
301 ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
302 e.getCause());
lpino83a1bfd2019-02-14 15:30:18 -0800303 } catch (Throwable e) {
lpino7c795762019-04-25 04:31:51 -0700304 closeFuture.setException(e);
lpino83a1bfd2019-02-14 15:30:18 -0800305 logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
306 throw e;
307 } finally {
308 localFileUploader.shutdown();
lpino7c795762019-04-25 04:31:51 -0700309 closeFuture.set(null);
lpino83a1bfd2019-02-14 15:30:18 -0800310 }
311 }
312
313 private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent(
314 PathConverter pathConverter,
315 SendRegularBuildEventCommand buildEvent) {
316 BuildEventContext ctx =
317 new BuildEventContext() {
318 @Override
319 public PathConverter pathConverter() {
320 return pathConverter;
321 }
322
323 @Override
324 public ArtifactGroupNamer artifactGroupNamer() {
325 return namer;
326 }
327
328 @Override
329 public BuildEventProtocolOptions getOptions() {
330 return buildEventProtocolOptions;
331 }
332 };
333 BuildEventStreamProtos.BuildEvent serializedBepEvent =
334 buildEvent.getEvent().asStreamProto(ctx);
lpino1d205e12019-02-15 07:09:29 -0800335
336 // TODO(lpino): Remove this logging once we can make every single event smaller than 1MB
337 // as protobuf recommends.
338 if (serializedBepEvent.getSerializedSize()
339 > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) {
340 eventBus.post(
341 new LargeBuildEventSerializedEvent(
342 serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize()));
343 }
344
lpino83a1bfd2019-02-14 15:30:18 -0800345 return serializedBepEvent;
346 }
347
348 private void publishBuildEvents()
349 throws StatusException, LocalFileUploadException, InterruptedException {
350 eventQueue.addFirst(new OpenStreamCommand());
351
352 // Every build event sent to the server needs to be acknowledged by it. This queue stores
353 // the build events that have been sent and still have to be acknowledged by the server.
354 // The build events are stored in the order they were sent.
felly916e7362019-04-16 09:10:22 -0700355 Deque<SendBuildEventCommand> ackQueue = new ArrayDeque<>();
lpino83a1bfd2019-02-14 15:30:18 -0800356 boolean lastEventSent = false;
357 int acksReceived = 0;
358 int retryAttempt = 0;
359
360 try {
361 // {@link BuildEventServiceUploaderCommands#OPEN_STREAM} is the first event and opens a
362 // bidi streaming RPC for sending build events and receiving ACKs.
363 // {@link BuildEventServiceUploaderCommands#SEND_REGULAR_BUILD_EVENT} sends a build event to
364 // the server. Sending of the Nth build event does
365 // does not wait for the ACK of the N-1th build event to have been received.
366 // {@link BuildEventServiceUploaderCommands#SEND_LAST_BUILD_EVENT} sends the last build event
367 // and half closes the RPC.
368 // {@link BuildEventServiceUploaderCommands#ACK_RECEIVED} is executed for every ACK from
369 // the server and checks that the ACKs are in the correct order.
370 // {@link BuildEventServiceUploaderCommands#STREAM_COMPLETE} checks that all build events
371 // have been sent and all ACKs have been received. If not it invokes a retry logic that may
372 // decide to re-send every build event for which an ACK has not been received. If so, it
373 // adds an OPEN_STREAM event.
374 while (true) {
375 EventLoopCommand event = eventQueue.takeFirst();
376 switch (event.type()) {
377 case OPEN_STREAM:
378 {
379 // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
380 // or SEND_LAST_BUILD_EVENT
381 logger.info(
382 String.format("Starting publishBuildEvents: eventQueue=%d", eventQueue.size()));
383 streamContext =
384 besClient.openStream(
385 (ack) -> eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber())));
386 addStreamStatusListener(
387 streamContext.getStatus(),
388 (status) -> eventQueue.addLast(new StreamCompleteCommand(status)));
389 }
390 break;
391
392 case SEND_REGULAR_BUILD_EVENT:
393 {
394 // Invariant: the eventQueue may contain events of any type
395 SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
396 ackQueue.addLast(buildEvent);
397
398 PathConverter pathConverter = waitForLocalFileUploads(buildEvent);
399
400 BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent =
401 createSerializedRegularBuildEvent(pathConverter, buildEvent);
402
403 PublishBuildToolEventStreamRequest request =
404 besProtoUtil.bazelEvent(
405 buildEvent.getSequenceNumber(),
406 buildEvent.getCreationTime(),
407 Any.pack(serializedRegularBuildEvent));
408
409 streamContext.sendOverStream(request);
410 }
411 break;
412
413 case SEND_LAST_BUILD_EVENT:
414 {
415 // Invariant: the eventQueue may contain events of any type
416 SendBuildEventCommand lastEvent = (SendLastBuildEventCommand) event;
417 ackQueue.addLast(lastEvent);
418 lastEventSent = true;
419 PublishBuildToolEventStreamRequest request =
420 besProtoUtil.streamFinished(
421 lastEvent.getSequenceNumber(), lastEvent.getCreationTime());
422 streamContext.sendOverStream(request);
423 streamContext.halfCloseStream();
felly67641002019-04-23 09:31:25 -0700424 halfCloseFuture.set(null);
lpino83a1bfd2019-02-14 15:30:18 -0800425 }
426 break;
427
428 case ACK_RECEIVED:
429 {
430 // Invariant: the eventQueue may contain events of any type
431 AckReceivedCommand ackEvent = (AckReceivedCommand) event;
432 if (!ackQueue.isEmpty()) {
433 SendBuildEventCommand expected = ackQueue.removeFirst();
434 long actualSeqNum = ackEvent.getSequenceNumber();
435 if (expected.getSequenceNumber() == actualSeqNum) {
436 acksReceived++;
437 } else {
438 ackQueue.addFirst(expected);
439 String message =
440 String.format(
441 "Expected ACK with seqNum=%d but received ACK with seqNum=%d",
442 expected.getSequenceNumber(), actualSeqNum);
443 logger.info(message);
444 streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
felly916e7362019-04-16 09:10:22 -0700445 }
lpino83a1bfd2019-02-14 15:30:18 -0800446 } else {
447 String message =
448 String.format(
449 "Received ACK (seqNum=%d) when no ACK was expected",
450 ackEvent.getSequenceNumber());
451 logger.info(message);
452 streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
453 }
454 }
455 break;
456
457 case STREAM_COMPLETE:
458 {
459 // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
460 // or SEND_LAST_BUILD_EVENT
461 streamContext = null;
462 StreamCompleteCommand completeEvent = (StreamCompleteCommand) event;
463 Status streamStatus = completeEvent.status();
464 if (streamStatus.isOk()) {
465 if (lastEventSent && ackQueue.isEmpty()) {
466 logger.info("publishBuildEvents was successful");
467 // Upload successful. Break out from the while(true) loop.
468 return;
469 } else {
470 throw (lastEventSent
471 ? ackQueueNotEmptyStatus(ackQueue.size())
472 : lastEventNotSentStatus())
473 .asException();
474 }
475 }
476
477 if (!shouldRetryStatus(streamStatus)) {
478 logger.info(
479 String.format("Not retrying publishBuildEvents: status='%s'", streamStatus));
480 throw streamStatus.asException();
481 }
482 if (retryAttempt == MAX_NUM_RETRIES) {
483 logger.info(
484 String.format(
485 "Not retrying publishBuildEvents, no more attempts left: status='%s'",
486 streamStatus));
487 throw streamStatus.asException();
488 }
489
490 // Retry logic
491 // Adds events from the ackQueue to the front of the eventQueue, so that the
492 // events in the eventQueue are sorted by sequence number (ascending).
493 SendBuildEventCommand unacked;
494 while ((unacked = ackQueue.pollLast()) != null) {
495 eventQueue.addFirst(unacked);
496 }
497
498 long sleepMillis = retrySleepMillis(retryAttempt);
499 logger.info(
500 String.format(
501 "Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis));
502 sleeper.sleepMillis(sleepMillis);
503
504 // If we made progress, meaning the server ACKed events that we sent, then reset
505 // the retry counter to 0.
506 if (acksReceived > 0) {
507 retryAttempt = 0;
508 } else {
509 retryAttempt++;
510 }
511 acksReceived = 0;
512 eventQueue.addFirst(new OpenStreamCommand());
513 }
514 break;
515 }
516 }
517 } catch (InterruptedException | LocalFileUploadException e) {
518 int limit = 30;
519 logger.info(
520 String.format(
521 "Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, "
522 + "ack_queue: %s, event_queue_size: %d, event_queue: %s",
523 limit,
524 ackQueue.size(),
525 Iterables.limit(ackQueue, limit),
526 eventQueue.size(),
527 Iterables.limit(eventQueue, limit)));
528 if (streamContext != null) {
529 streamContext.abortStream(Status.CANCELLED);
530 }
531 throw e;
532 } finally {
533 // Cancel all local file uploads that may still be running
534 // of events that haven't been uploaded.
535 EventLoopCommand event;
536 while ((event = ackQueue.pollFirst()) != null) {
537 if (event instanceof SendRegularBuildEventCommand) {
538 cancelLocalFileUpload((SendRegularBuildEventCommand) event);
539 }
540 }
541 while ((event = eventQueue.pollFirst()) != null) {
542 if (event instanceof SendRegularBuildEventCommand) {
543 cancelLocalFileUpload((SendRegularBuildEventCommand) event);
544 }
545 }
546 }
547 }
548
549 private void cancelLocalFileUpload(SendRegularBuildEventCommand event) {
550 ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress();
551 if (!localFileUploaderFuture.isDone()) {
552 localFileUploaderFuture.cancel(true);
553 }
554 }
555
556 /** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */
557 private void publishLifecycleEvent(PublishLifecycleEventRequest request)
558 throws StatusException, InterruptedException {
559 int retryAttempt = 0;
560 StatusException cause = null;
561 while (retryAttempt <= MAX_NUM_RETRIES) {
562 try {
563 besClient.publish(request);
564 return;
565 } catch (StatusException e) {
566 if (!shouldRetryStatus(e.getStatus())) {
567 logger.info(
568 String.format(
569 "Not retrying publishLifecycleEvent: status='%s'", e.getStatus().toString()));
570 throw e;
571 }
572
573 cause = e;
574
575 long sleepMillis = retrySleepMillis(retryAttempt);
576 logger.info(
577 String.format(
578 "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d",
579 e.getStatus().toString(), sleepMillis));
580 sleeper.sleepMillis(sleepMillis);
581 retryAttempt++;
582 }
583 }
584
585 // All retry attempts failed
586 throw cause;
587 }
588
lpino83a1bfd2019-02-14 15:30:18 -0800589 private void ensureUploadThreadStarted() {
590 synchronized (lock) {
591 if (uploadThread == null) {
592 uploadThread = new Thread(this, "bes-uploader");
593 uploadThread.start();
594 }
595 }
596 }
597
lpino83a1bfd2019-02-14 15:30:18 -0800598 private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
599 throws LocalFileUploadException, InterruptedException {
600 try {
601 // Wait for the local file upload to have been completed.
602 return orderedBuildEvent.localFileUploadProgress().get();
603 } catch (ExecutionException e) {
604 logger.log(
605 Level.WARNING,
606 String.format(
607 "Failed to upload local files referenced by build event: %s", e.getMessage()),
608 e);
609 Throwables.throwIfUnchecked(e.getCause());
610 throw new LocalFileUploadException(e.getCause());
611 }
612 }
613
614 private Timestamp currentTime() {
615 return Timestamps.fromMillis(clock.currentTimeMillis());
616 }
617
618 private static Result extractBuildStatus(BuildCompletingEvent event) {
619 if (event.getExitCode() != null && event.getExitCode().getNumericExitCode() == 0) {
620 return COMMAND_SUCCEEDED;
621 } else {
622 return COMMAND_FAILED;
623 }
624 }
625
626 private static Status lastEventNotSentStatus() {
627 return Status.FAILED_PRECONDITION.withDescription(
628 "Server closed stream with status OK but not all events have been sent");
629 }
630
631 private static Status ackQueueNotEmptyStatus(int ackQueueSize) {
632 return Status.FAILED_PRECONDITION.withDescription(
633 String.format(
634 "Server closed stream with status OK but not all ACKs have been"
635 + " received (ackQueue=%d)",
636 ackQueueSize));
637 }
638
639 private static void addStreamStatusListener(
640 ListenableFuture<Status> stream, Consumer<Status> onDone) {
641 Futures.addCallback(
642 stream,
643 new FutureCallback<Status>() {
644 @Override
645 public void onSuccess(Status result) {
646 onDone.accept(result);
647 }
648
649 @Override
650 public void onFailure(Throwable t) {}
651 },
652 MoreExecutors.directExecutor());
653 }
654
655 private static boolean shouldRetryStatus(Status status) {
656 return !status.getCode().equals(Code.INVALID_ARGUMENT)
657 && !status.getCode().equals(Code.FAILED_PRECONDITION);
658 }
659
660 private static long retrySleepMillis(int attempt) {
661 // This somewhat matches the backoff used for gRPC connection backoffs.
662 return (long) (DELAY_MILLIS * Math.pow(1.6, attempt));
663 }
664
lpino83a1bfd2019-02-14 15:30:18 -0800665 /** Thrown when encountered problems while uploading build event artifacts. */
666 private class LocalFileUploadException extends Exception {
667 LocalFileUploadException(Throwable cause) {
668 super(cause);
669 }
670 }
lpino2bf89062019-02-21 03:24:49 -0800671
672 static class Builder {
673 private BuildEventServiceClient besClient;
674 private BuildEventArtifactUploader localFileUploader;
675 private BuildEventServiceProtoUtil besProtoUtil;
676 private BuildEventProtocolOptions bepOptions;
677 private boolean publishLifecycleEvents;
lpino2bf89062019-02-21 03:24:49 -0800678 private Sleeper sleeper;
679 private Clock clock;
680 private ArtifactGroupNamer artifactGroupNamer;
681 private EventBus eventBus;
682
683 Builder besClient(BuildEventServiceClient value) {
684 this.besClient = value;
685 return this;
686 }
687
688 Builder localFileUploader(BuildEventArtifactUploader value) {
689 this.localFileUploader = value;
690 return this;
691 }
692
693 Builder besProtoUtil(BuildEventServiceProtoUtil value) {
694 this.besProtoUtil = value;
695 return this;
696 }
697
698 Builder bepOptions(BuildEventProtocolOptions value) {
699 this.bepOptions = value;
700 return this;
701 }
702
703 Builder publishLifecycleEvents(boolean value) {
704 this.publishLifecycleEvents = value;
705 return this;
706 }
707
lpino2bf89062019-02-21 03:24:49 -0800708 Builder clock(Clock value) {
709 this.clock = value;
710 return this;
711 }
712
lpino2bf89062019-02-21 03:24:49 -0800713 Builder sleeper(Sleeper value) {
714 this.sleeper = value;
715 return this;
716 }
717
718 Builder artifactGroupNamer(ArtifactGroupNamer value) {
719 this.artifactGroupNamer = value;
720 return this;
721 }
722
723 Builder eventBus(EventBus value) {
724 this.eventBus = value;
725 return this;
726 }
727
728 BuildEventServiceUploader build() {
729 return new BuildEventServiceUploader(
730 checkNotNull(besClient),
731 checkNotNull(localFileUploader),
732 checkNotNull(besProtoUtil),
733 checkNotNull(bepOptions),
734 publishLifecycleEvents,
lpino2bf89062019-02-21 03:24:49 -0800735 checkNotNull(sleeper),
736 checkNotNull(clock),
737 checkNotNull(artifactGroupNamer),
738 checkNotNull(eventBus));
739 }
740 }
lpino83a1bfd2019-02-14 15:30:18 -0800741}
lpinoa902ce72019-05-03 10:57:50 -0700742