blob: 145c2279d8fb6009e9755d65e26c6c1ff1157c8a [file] [log] [blame]
lpino83a1bfd2019-02-14 15:30:18 -08001// Copyright 2019 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package com.google.devtools.build.lib.buildeventservice;
15
lpino2bf89062019-02-21 03:24:49 -080016import static com.google.common.base.Preconditions.checkNotNull;
lpino83a1bfd2019-02-14 15:30:18 -080017import static com.google.common.base.Preconditions.checkState;
18import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
19import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
20import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
21import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
22
23import com.google.common.annotations.VisibleForTesting;
24import com.google.common.base.Preconditions;
25import com.google.common.base.Throwables;
26import com.google.common.collect.Iterables;
lpino1d205e12019-02-15 07:09:29 -080027import com.google.common.eventbus.EventBus;
lpino83a1bfd2019-02-14 15:30:18 -080028import com.google.common.util.concurrent.FutureCallback;
29import com.google.common.util.concurrent.Futures;
30import com.google.common.util.concurrent.ListenableFuture;
31import com.google.common.util.concurrent.MoreExecutors;
32import com.google.common.util.concurrent.SettableFuture;
lpino83a1bfd2019-02-14 15:30:18 -080033import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand;
34import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand;
35import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand;
36import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendBuildEventCommand;
37import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendLastBuildEventCommand;
38import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.SendRegularBuildEventCommand;
39import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.StreamCompleteCommand;
40import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
41import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient.StreamContext;
42import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
43import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent;
44import com.google.devtools.build.lib.buildeventstream.BuildEvent;
lpino83a1bfd2019-02-14 15:30:18 -080045import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
46import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
47import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
lpino83a1bfd2019-02-14 15:30:18 -080048import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
lpino1d205e12019-02-15 07:09:29 -080049import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
lpino83a1bfd2019-02-14 15:30:18 -080050import com.google.devtools.build.lib.buildeventstream.PathConverter;
51import com.google.devtools.build.lib.clock.Clock;
lpino2a42e2b2019-03-01 15:11:28 -080052import com.google.devtools.build.lib.util.AbruptExitException;
lpino83a1bfd2019-02-14 15:30:18 -080053import com.google.devtools.build.lib.util.ExitCode;
54import com.google.devtools.build.lib.util.LoggingUtil;
55import com.google.devtools.build.lib.util.Sleeper;
lpino83a1bfd2019-02-14 15:30:18 -080056import com.google.devtools.build.v1.BuildStatus.Result;
57import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
58import com.google.devtools.build.v1.PublishLifecycleEventRequest;
59import com.google.protobuf.Any;
60import com.google.protobuf.Timestamp;
61import com.google.protobuf.util.Timestamps;
62import io.grpc.Status;
63import io.grpc.Status.Code;
64import io.grpc.StatusException;
65import java.time.Duration;
felly916e7362019-04-16 09:10:22 -070066import java.util.ArrayDeque;
67import java.util.Deque;
lpino83a1bfd2019-02-14 15:30:18 -080068import java.util.concurrent.BlockingDeque;
lpino83a1bfd2019-02-14 15:30:18 -080069import java.util.concurrent.ExecutionException;
70import java.util.concurrent.LinkedBlockingDeque;
71import java.util.concurrent.TimeUnit;
72import java.util.concurrent.TimeoutException;
lpino7c795762019-04-25 04:31:51 -070073import java.util.concurrent.atomic.AtomicBoolean;
lpino83a1bfd2019-02-14 15:30:18 -080074import java.util.concurrent.atomic.AtomicLong;
75import java.util.function.Consumer;
76import java.util.logging.Level;
77import java.util.logging.Logger;
78import javax.annotation.concurrent.GuardedBy;
79
80/**
81 * Uploader of Build Events to the Build Event Service (BES).
82 *
83 * <p>The purpose is of this class is to manage the interaction between the BES client and the BES
84 * server. It implements the event loop pattern based on the commands defined by {@link
85 * BuildEventServiceUploaderCommands}.
86 */
87// TODO(lpino): This class should be package-private but there are unit tests that are in the
88// different packages and rely on this.
89@VisibleForTesting
90public final class BuildEventServiceUploader implements Runnable {
91 private static final Logger logger = Logger.getLogger(BuildEventServiceUploader.class.getName());
92
93 /** Configuration knobs related to RPC retries. Values chosen by good judgement. */
94 private static final int MAX_NUM_RETRIES = 4;
95
96 private static final int DELAY_MILLIS = 1000;
97
98 private final BuildEventServiceClient besClient;
99 private final BuildEventArtifactUploader localFileUploader;
100 private final BuildEventServiceProtoUtil besProtoUtil;
101 private final BuildEventProtocolOptions buildEventProtocolOptions;
102 private final boolean publishLifecycleEvents;
103 private final Duration closeTimeout;
lpino83a1bfd2019-02-14 15:30:18 -0800104 private final Sleeper sleeper;
105 private final Clock clock;
lpino83a1bfd2019-02-14 15:30:18 -0800106 private final ArtifactGroupNamer namer;
lpino1d205e12019-02-15 07:09:29 -0800107 private final EventBus eventBus;
lpino7c795762019-04-25 04:31:51 -0700108 private final AtomicBoolean startedClose = new AtomicBoolean(false);
lpino83a1bfd2019-02-14 15:30:18 -0800109
110 /**
111 * The event queue contains two types of events: - Build events, sorted by sequence number, that
112 * should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to
113 * change state.
114 */
115 private final BlockingDeque<EventLoopCommand> eventQueue = new LinkedBlockingDeque<>();
116
117 /**
118 * Computes sequence numbers for build events. As per the BES protocol, sequence numbers must be
119 * consecutive monotonically increasing natural numbers.
120 */
121 private final AtomicLong nextSeqNum = new AtomicLong(1);
122
123 private final Object lock = new Object();
124
125 @GuardedBy("lock")
126 private Result buildStatus = UNKNOWN_STATUS;
127
lpino7c795762019-04-25 04:31:51 -0700128 private final SettableFuture<Void> closeFuture = SettableFuture.create();
felly67641002019-04-23 09:31:25 -0700129 private final SettableFuture<Void> halfCloseFuture = SettableFuture.create();
130
lpino83a1bfd2019-02-14 15:30:18 -0800131 /**
132 * The thread that calls the lifecycle RPCs and does the build event upload. It's started lazily
133 * on the first call to {@link #enqueueEvent(BuildEvent)} or {@link #close()} (which ever comes
134 * first).
135 */
136 @GuardedBy("lock")
137 private Thread uploadThread;
138
139 @GuardedBy("lock")
140 private boolean interruptCausedByTimeout;
141
lpinod143ca42019-04-15 04:27:39 -0700142 @GuardedBy("lock")
143 private boolean interruptCausedByCancel;
144
lpino83a1bfd2019-02-14 15:30:18 -0800145 private StreamContext streamContext;
146
lpino2bf89062019-02-21 03:24:49 -0800147 private BuildEventServiceUploader(
lpino83a1bfd2019-02-14 15:30:18 -0800148 BuildEventServiceClient besClient,
149 BuildEventArtifactUploader localFileUploader,
150 BuildEventServiceProtoUtil besProtoUtil,
151 BuildEventProtocolOptions buildEventProtocolOptions,
152 boolean publishLifecycleEvents,
153 Duration closeTimeout,
lpino83a1bfd2019-02-14 15:30:18 -0800154 Sleeper sleeper,
155 Clock clock,
lpino1d205e12019-02-15 07:09:29 -0800156 ArtifactGroupNamer namer,
157 EventBus eventBus) {
lpino2bf89062019-02-21 03:24:49 -0800158 this.besClient = besClient;
159 this.localFileUploader = localFileUploader;
160 this.besProtoUtil = besProtoUtil;
lpino83a1bfd2019-02-14 15:30:18 -0800161 this.buildEventProtocolOptions = buildEventProtocolOptions;
162 this.publishLifecycleEvents = publishLifecycleEvents;
lpino2bf89062019-02-21 03:24:49 -0800163 this.closeTimeout = closeTimeout;
lpino2bf89062019-02-21 03:24:49 -0800164 this.sleeper = sleeper;
165 this.clock = clock;
lpino83a1bfd2019-02-14 15:30:18 -0800166 this.namer = namer;
lpino1d205e12019-02-15 07:09:29 -0800167 this.eventBus = eventBus;
lpino83a1bfd2019-02-14 15:30:18 -0800168 }
169
170 BuildEventArtifactUploader getLocalFileUploader() {
171 return localFileUploader;
172 }
173
174 /** Enqueues an event for uploading to a BES backend. */
175 void enqueueEvent(BuildEvent event) {
176 // This needs to happen outside a synchronized block as it may trigger
177 // stdout/stderr and lead to a deadlock. See b/109725432
178 ListenableFuture<PathConverter> localFileUploadFuture =
felly916e7362019-04-16 09:10:22 -0700179 localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
lpino83a1bfd2019-02-14 15:30:18 -0800180
lpino7c795762019-04-25 04:31:51 -0700181 if (startedClose.get()) {
182 if (!localFileUploadFuture.isDone()) {
183 localFileUploadFuture.cancel(true);
lpino83a1bfd2019-02-14 15:30:18 -0800184 }
lpino7c795762019-04-25 04:31:51 -0700185 return;
186 }
187 // BuildCompletingEvent marks the end of the build in the BEP event stream.
188 if (event instanceof BuildCompletingEvent) {
189 synchronized (lock) {
lpino83a1bfd2019-02-14 15:30:18 -0800190 this.buildStatus = extractBuildStatus((BuildCompletingEvent) event);
191 }
lpino83a1bfd2019-02-14 15:30:18 -0800192 }
lpino7c795762019-04-25 04:31:51 -0700193 ensureUploadThreadStarted();
194 eventQueue.addLast(
195 new SendRegularBuildEventCommand(
196 event,
197 localFileUploadFuture,
198 nextSeqNum.getAndIncrement(),
199 Timestamps.fromMillis(clock.currentTimeMillis())));
lpino83a1bfd2019-02-14 15:30:18 -0800200 }
201
202 /**
203 * Gracefully stops the BES upload. All events enqueued before the call to close will be uploaded
204 * and events enqueued after the call will be discarded.
205 *
206 * <p>The returned future completes when the upload completes. It's guaranteed to never fail.
207 */
208 public ListenableFuture<Void> close() {
lpino7c795762019-04-25 04:31:51 -0700209 if (startedClose.getAndSet(true)) {
lpino83a1bfd2019-02-14 15:30:18 -0800210 return closeFuture;
211 }
lpino7c795762019-04-25 04:31:51 -0700212
213 ensureUploadThreadStarted();
214
215 // Enqueue the last event which will terminate the upload.
216 eventQueue.addLast(new SendLastBuildEventCommand(nextSeqNum.getAndIncrement(), currentTime()));
217
218 if (!closeTimeout.isZero()) {
219 startCloseTimer(closeFuture, closeTimeout);
220 }
221
222 final SettableFuture<Void> finalCloseFuture = closeFuture;
223 closeFuture.addListener(
224 () -> {
225 // Make sure to cancel any pending uploads if the closing is cancelled.
226 if (finalCloseFuture.isCancelled()) {
227 closeOnCancel();
228 }
229 },
230 MoreExecutors.directExecutor());
231
232 return closeFuture;
lpino83a1bfd2019-02-14 15:30:18 -0800233 }
234
lpino83a1bfd2019-02-14 15:30:18 -0800235 private void closeOnTimeout() {
236 synchronized (lock) {
lpinod143ca42019-04-15 04:27:39 -0700237 interruptCausedByTimeout = true;
238 closeNow();
239 }
240 }
241
lpino7c795762019-04-25 04:31:51 -0700242 private void closeOnCancel() {
lpinod143ca42019-04-15 04:27:39 -0700243 synchronized (lock) {
244 interruptCausedByCancel = true;
245 closeNow();
246 }
247 }
248
249 /** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
250 private void closeNow() {
251 synchronized (lock) {
lpino83a1bfd2019-02-14 15:30:18 -0800252 if (uploadThread != null) {
253 if (uploadThread.isInterrupted()) {
254 return;
255 }
lpino83a1bfd2019-02-14 15:30:18 -0800256 uploadThread.interrupt();
257 }
258 }
259 }
260
lpino2a42e2b2019-03-01 15:11:28 -0800261 private void logAndExitAbruptly(String message, ExitCode exitCode, Throwable cause) {
lpinod143ca42019-04-15 04:27:39 -0700262 checkState(!exitCode.equals(ExitCode.SUCCESS));
lpinof51f14b2019-04-17 14:14:56 -0700263 logger.severe(message);
lpino7c795762019-04-25 04:31:51 -0700264 closeFuture.setException(new AbruptExitException(message, exitCode, cause));
lpino2a42e2b2019-03-01 15:11:28 -0800265 }
266
lpino83a1bfd2019-02-14 15:30:18 -0800267 @Override
268 public void run() {
269 try {
270 if (publishLifecycleEvents) {
271 publishLifecycleEvent(besProtoUtil.buildEnqueued(currentTime()));
272 publishLifecycleEvent(besProtoUtil.invocationStarted(currentTime()));
273 }
274
275 try {
276 publishBuildEvents();
277 } finally {
278 if (publishLifecycleEvents) {
279 Result buildStatus;
280 synchronized (lock) {
281 buildStatus = this.buildStatus;
282 }
283 publishLifecycleEvent(besProtoUtil.invocationFinished(currentTime(), buildStatus));
284 publishLifecycleEvent(besProtoUtil.buildFinished(currentTime(), buildStatus));
285 }
286 }
lpino83a1bfd2019-02-14 15:30:18 -0800287 } catch (InterruptedException e) {
lpinof51f14b2019-04-17 14:14:56 -0700288 logger.info("Aborting the BES upload due to having received an interrupt");
289 synchronized (lock) {
290 Preconditions.checkState(
291 interruptCausedByTimeout || interruptCausedByCancel,
292 "Unexpected interrupt on BES uploader thread");
293 if (interruptCausedByTimeout) {
294 logAndExitAbruptly(
295 "The Build Event Protocol upload timed out",
296 ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
297 e);
lpino83a1bfd2019-02-14 15:30:18 -0800298 }
lpino83a1bfd2019-02-14 15:30:18 -0800299 }
300 } catch (StatusException e) {
lpinof51f14b2019-04-17 14:14:56 -0700301 logAndExitAbruptly(
302 "The Build Event Protocol upload failed: " + besClient.userReadableError(e),
303 shouldRetryStatus(e.getStatus())
304 ? ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR
305 : ExitCode.PERSISTENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
306 e);
lpino83a1bfd2019-02-14 15:30:18 -0800307 } catch (LocalFileUploadException e) {
308 Throwables.throwIfUnchecked(e.getCause());
lpinof51f14b2019-04-17 14:14:56 -0700309 logAndExitAbruptly(
310 "The Build Event Protocol local file upload failed: " + e.getCause().getMessage(),
311 ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR,
312 e.getCause());
lpino83a1bfd2019-02-14 15:30:18 -0800313 } catch (Throwable e) {
lpino7c795762019-04-25 04:31:51 -0700314 closeFuture.setException(e);
lpino83a1bfd2019-02-14 15:30:18 -0800315 logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
316 throw e;
317 } finally {
318 localFileUploader.shutdown();
lpino7c795762019-04-25 04:31:51 -0700319 closeFuture.set(null);
lpino83a1bfd2019-02-14 15:30:18 -0800320 }
321 }
322
323 private BuildEventStreamProtos.BuildEvent createSerializedRegularBuildEvent(
324 PathConverter pathConverter,
325 SendRegularBuildEventCommand buildEvent) {
326 BuildEventContext ctx =
327 new BuildEventContext() {
328 @Override
329 public PathConverter pathConverter() {
330 return pathConverter;
331 }
332
333 @Override
334 public ArtifactGroupNamer artifactGroupNamer() {
335 return namer;
336 }
337
338 @Override
339 public BuildEventProtocolOptions getOptions() {
340 return buildEventProtocolOptions;
341 }
342 };
343 BuildEventStreamProtos.BuildEvent serializedBepEvent =
344 buildEvent.getEvent().asStreamProto(ctx);
lpino1d205e12019-02-15 07:09:29 -0800345
346 // TODO(lpino): Remove this logging once we can make every single event smaller than 1MB
347 // as protobuf recommends.
348 if (serializedBepEvent.getSerializedSize()
349 > LargeBuildEventSerializedEvent.SIZE_OF_LARGE_BUILD_EVENTS_IN_BYTES) {
350 eventBus.post(
351 new LargeBuildEventSerializedEvent(
352 serializedBepEvent.getId().toString(), serializedBepEvent.getSerializedSize()));
353 }
354
lpino83a1bfd2019-02-14 15:30:18 -0800355 return serializedBepEvent;
356 }
357
358 private void publishBuildEvents()
359 throws StatusException, LocalFileUploadException, InterruptedException {
360 eventQueue.addFirst(new OpenStreamCommand());
361
362 // Every build event sent to the server needs to be acknowledged by it. This queue stores
363 // the build events that have been sent and still have to be acknowledged by the server.
364 // The build events are stored in the order they were sent.
felly916e7362019-04-16 09:10:22 -0700365 Deque<SendBuildEventCommand> ackQueue = new ArrayDeque<>();
lpino83a1bfd2019-02-14 15:30:18 -0800366 boolean lastEventSent = false;
367 int acksReceived = 0;
368 int retryAttempt = 0;
369
370 try {
371 // {@link BuildEventServiceUploaderCommands#OPEN_STREAM} is the first event and opens a
372 // bidi streaming RPC for sending build events and receiving ACKs.
373 // {@link BuildEventServiceUploaderCommands#SEND_REGULAR_BUILD_EVENT} sends a build event to
374 // the server. Sending of the Nth build event does
375 // does not wait for the ACK of the N-1th build event to have been received.
376 // {@link BuildEventServiceUploaderCommands#SEND_LAST_BUILD_EVENT} sends the last build event
377 // and half closes the RPC.
378 // {@link BuildEventServiceUploaderCommands#ACK_RECEIVED} is executed for every ACK from
379 // the server and checks that the ACKs are in the correct order.
380 // {@link BuildEventServiceUploaderCommands#STREAM_COMPLETE} checks that all build events
381 // have been sent and all ACKs have been received. If not it invokes a retry logic that may
382 // decide to re-send every build event for which an ACK has not been received. If so, it
383 // adds an OPEN_STREAM event.
384 while (true) {
385 EventLoopCommand event = eventQueue.takeFirst();
386 switch (event.type()) {
387 case OPEN_STREAM:
388 {
389 // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
390 // or SEND_LAST_BUILD_EVENT
391 logger.info(
392 String.format("Starting publishBuildEvents: eventQueue=%d", eventQueue.size()));
393 streamContext =
394 besClient.openStream(
395 (ack) -> eventQueue.addLast(new AckReceivedCommand(ack.getSequenceNumber())));
396 addStreamStatusListener(
397 streamContext.getStatus(),
398 (status) -> eventQueue.addLast(new StreamCompleteCommand(status)));
399 }
400 break;
401
402 case SEND_REGULAR_BUILD_EVENT:
403 {
404 // Invariant: the eventQueue may contain events of any type
405 SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
406 ackQueue.addLast(buildEvent);
407
408 PathConverter pathConverter = waitForLocalFileUploads(buildEvent);
409
410 BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent =
411 createSerializedRegularBuildEvent(pathConverter, buildEvent);
412
413 PublishBuildToolEventStreamRequest request =
414 besProtoUtil.bazelEvent(
415 buildEvent.getSequenceNumber(),
416 buildEvent.getCreationTime(),
417 Any.pack(serializedRegularBuildEvent));
418
419 streamContext.sendOverStream(request);
420 }
421 break;
422
423 case SEND_LAST_BUILD_EVENT:
424 {
425 // Invariant: the eventQueue may contain events of any type
426 SendBuildEventCommand lastEvent = (SendLastBuildEventCommand) event;
427 ackQueue.addLast(lastEvent);
428 lastEventSent = true;
429 PublishBuildToolEventStreamRequest request =
430 besProtoUtil.streamFinished(
431 lastEvent.getSequenceNumber(), lastEvent.getCreationTime());
432 streamContext.sendOverStream(request);
433 streamContext.halfCloseStream();
felly67641002019-04-23 09:31:25 -0700434 halfCloseFuture.set(null);
lpino83a1bfd2019-02-14 15:30:18 -0800435 }
436 break;
437
438 case ACK_RECEIVED:
439 {
440 // Invariant: the eventQueue may contain events of any type
441 AckReceivedCommand ackEvent = (AckReceivedCommand) event;
442 if (!ackQueue.isEmpty()) {
443 SendBuildEventCommand expected = ackQueue.removeFirst();
444 long actualSeqNum = ackEvent.getSequenceNumber();
445 if (expected.getSequenceNumber() == actualSeqNum) {
446 acksReceived++;
447 } else {
448 ackQueue.addFirst(expected);
449 String message =
450 String.format(
451 "Expected ACK with seqNum=%d but received ACK with seqNum=%d",
452 expected.getSequenceNumber(), actualSeqNum);
453 logger.info(message);
454 streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
felly916e7362019-04-16 09:10:22 -0700455 }
lpino83a1bfd2019-02-14 15:30:18 -0800456 } else {
457 String message =
458 String.format(
459 "Received ACK (seqNum=%d) when no ACK was expected",
460 ackEvent.getSequenceNumber());
461 logger.info(message);
462 streamContext.abortStream(Status.FAILED_PRECONDITION.withDescription(message));
463 }
464 }
465 break;
466
467 case STREAM_COMPLETE:
468 {
469 // Invariant: the eventQueue only contains events of type SEND_REGULAR_BUILD_EVENT
470 // or SEND_LAST_BUILD_EVENT
471 streamContext = null;
472 StreamCompleteCommand completeEvent = (StreamCompleteCommand) event;
473 Status streamStatus = completeEvent.status();
474 if (streamStatus.isOk()) {
475 if (lastEventSent && ackQueue.isEmpty()) {
476 logger.info("publishBuildEvents was successful");
477 // Upload successful. Break out from the while(true) loop.
478 return;
479 } else {
480 throw (lastEventSent
481 ? ackQueueNotEmptyStatus(ackQueue.size())
482 : lastEventNotSentStatus())
483 .asException();
484 }
485 }
486
487 if (!shouldRetryStatus(streamStatus)) {
488 logger.info(
489 String.format("Not retrying publishBuildEvents: status='%s'", streamStatus));
490 throw streamStatus.asException();
491 }
492 if (retryAttempt == MAX_NUM_RETRIES) {
493 logger.info(
494 String.format(
495 "Not retrying publishBuildEvents, no more attempts left: status='%s'",
496 streamStatus));
497 throw streamStatus.asException();
498 }
499
500 // Retry logic
501 // Adds events from the ackQueue to the front of the eventQueue, so that the
502 // events in the eventQueue are sorted by sequence number (ascending).
503 SendBuildEventCommand unacked;
504 while ((unacked = ackQueue.pollLast()) != null) {
505 eventQueue.addFirst(unacked);
506 }
507
508 long sleepMillis = retrySleepMillis(retryAttempt);
509 logger.info(
510 String.format(
511 "Retrying stream: status='%s', sleepMillis=%d", streamStatus, sleepMillis));
512 sleeper.sleepMillis(sleepMillis);
513
514 // If we made progress, meaning the server ACKed events that we sent, then reset
515 // the retry counter to 0.
516 if (acksReceived > 0) {
517 retryAttempt = 0;
518 } else {
519 retryAttempt++;
520 }
521 acksReceived = 0;
522 eventQueue.addFirst(new OpenStreamCommand());
523 }
524 break;
525 }
526 }
527 } catch (InterruptedException | LocalFileUploadException e) {
528 int limit = 30;
529 logger.info(
530 String.format(
531 "Publish interrupt. Showing up to %d items from queues: ack_queue_size: %d, "
532 + "ack_queue: %s, event_queue_size: %d, event_queue: %s",
533 limit,
534 ackQueue.size(),
535 Iterables.limit(ackQueue, limit),
536 eventQueue.size(),
537 Iterables.limit(eventQueue, limit)));
538 if (streamContext != null) {
539 streamContext.abortStream(Status.CANCELLED);
540 }
541 throw e;
542 } finally {
543 // Cancel all local file uploads that may still be running
544 // of events that haven't been uploaded.
545 EventLoopCommand event;
546 while ((event = ackQueue.pollFirst()) != null) {
547 if (event instanceof SendRegularBuildEventCommand) {
548 cancelLocalFileUpload((SendRegularBuildEventCommand) event);
549 }
550 }
551 while ((event = eventQueue.pollFirst()) != null) {
552 if (event instanceof SendRegularBuildEventCommand) {
553 cancelLocalFileUpload((SendRegularBuildEventCommand) event);
554 }
555 }
556 }
557 }
558
559 private void cancelLocalFileUpload(SendRegularBuildEventCommand event) {
560 ListenableFuture<PathConverter> localFileUploaderFuture = event.localFileUploadProgress();
561 if (!localFileUploaderFuture.isDone()) {
562 localFileUploaderFuture.cancel(true);
563 }
564 }
565
566 /** Sends a {@link PublishLifecycleEventRequest} to the BES backend. */
567 private void publishLifecycleEvent(PublishLifecycleEventRequest request)
568 throws StatusException, InterruptedException {
569 int retryAttempt = 0;
570 StatusException cause = null;
571 while (retryAttempt <= MAX_NUM_RETRIES) {
572 try {
573 besClient.publish(request);
574 return;
575 } catch (StatusException e) {
576 if (!shouldRetryStatus(e.getStatus())) {
577 logger.info(
578 String.format(
579 "Not retrying publishLifecycleEvent: status='%s'", e.getStatus().toString()));
580 throw e;
581 }
582
583 cause = e;
584
585 long sleepMillis = retrySleepMillis(retryAttempt);
586 logger.info(
587 String.format(
588 "Retrying publishLifecycleEvent: status='%s', sleepMillis=%d",
589 e.getStatus().toString(), sleepMillis));
590 sleeper.sleepMillis(sleepMillis);
591 retryAttempt++;
592 }
593 }
594
595 // All retry attempts failed
596 throw cause;
597 }
598
lpino83a1bfd2019-02-14 15:30:18 -0800599 private void ensureUploadThreadStarted() {
600 synchronized (lock) {
601 if (uploadThread == null) {
602 uploadThread = new Thread(this, "bes-uploader");
603 uploadThread.start();
604 }
605 }
606 }
607
608 private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeTimeout) {
609 Thread closeTimer =
610 new Thread(
611 () -> {
612 // Call closeOnTimeout() if the future does not complete within closeTimeout
613 try {
614 getUninterruptibly(closeFuture, closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
615 } catch (TimeoutException e) {
616 closeOnTimeout();
617 } catch (ExecutionException e) {
618 if (e.getCause() instanceof TimeoutException) {
619 // This is likely due to an internal timeout doing the local file uploading.
620 closeOnTimeout();
621 } else {
622 // This code only cares about calling closeOnTimeout() if the closeFuture does
623 // not complete within closeTimeout.
624 String failureMsg = "BES close failure";
625 logger.severe(failureMsg);
626 LoggingUtil.logToRemote(Level.SEVERE, failureMsg, e);
627 }
628 }
629 },
630 "bes-uploader-close-timer");
631 closeTimer.start();
632 }
633
lpino83a1bfd2019-02-14 15:30:18 -0800634 private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
635 throws LocalFileUploadException, InterruptedException {
636 try {
637 // Wait for the local file upload to have been completed.
638 return orderedBuildEvent.localFileUploadProgress().get();
639 } catch (ExecutionException e) {
640 logger.log(
641 Level.WARNING,
642 String.format(
643 "Failed to upload local files referenced by build event: %s", e.getMessage()),
644 e);
645 Throwables.throwIfUnchecked(e.getCause());
646 throw new LocalFileUploadException(e.getCause());
647 }
648 }
649
650 private Timestamp currentTime() {
651 return Timestamps.fromMillis(clock.currentTimeMillis());
652 }
653
654 private static Result extractBuildStatus(BuildCompletingEvent event) {
655 if (event.getExitCode() != null && event.getExitCode().getNumericExitCode() == 0) {
656 return COMMAND_SUCCEEDED;
657 } else {
658 return COMMAND_FAILED;
659 }
660 }
661
662 private static Status lastEventNotSentStatus() {
663 return Status.FAILED_PRECONDITION.withDescription(
664 "Server closed stream with status OK but not all events have been sent");
665 }
666
667 private static Status ackQueueNotEmptyStatus(int ackQueueSize) {
668 return Status.FAILED_PRECONDITION.withDescription(
669 String.format(
670 "Server closed stream with status OK but not all ACKs have been"
671 + " received (ackQueue=%d)",
672 ackQueueSize));
673 }
674
675 private static void addStreamStatusListener(
676 ListenableFuture<Status> stream, Consumer<Status> onDone) {
677 Futures.addCallback(
678 stream,
679 new FutureCallback<Status>() {
680 @Override
681 public void onSuccess(Status result) {
682 onDone.accept(result);
683 }
684
685 @Override
686 public void onFailure(Throwable t) {}
687 },
688 MoreExecutors.directExecutor());
689 }
690
691 private static boolean shouldRetryStatus(Status status) {
692 return !status.getCode().equals(Code.INVALID_ARGUMENT)
693 && !status.getCode().equals(Code.FAILED_PRECONDITION);
694 }
695
696 private static long retrySleepMillis(int attempt) {
697 // This somewhat matches the backoff used for gRPC connection backoffs.
698 return (long) (DELAY_MILLIS * Math.pow(1.6, attempt));
699 }
700
lpino83a1bfd2019-02-14 15:30:18 -0800701 /** Thrown when encountered problems while uploading build event artifacts. */
702 private class LocalFileUploadException extends Exception {
703 LocalFileUploadException(Throwable cause) {
704 super(cause);
705 }
706 }
lpino2bf89062019-02-21 03:24:49 -0800707
708 static class Builder {
709 private BuildEventServiceClient besClient;
710 private BuildEventArtifactUploader localFileUploader;
711 private BuildEventServiceProtoUtil besProtoUtil;
712 private BuildEventProtocolOptions bepOptions;
713 private boolean publishLifecycleEvents;
714 private Duration closeTimeout;
lpino2bf89062019-02-21 03:24:49 -0800715 private Sleeper sleeper;
716 private Clock clock;
717 private ArtifactGroupNamer artifactGroupNamer;
718 private EventBus eventBus;
719
720 Builder besClient(BuildEventServiceClient value) {
721 this.besClient = value;
722 return this;
723 }
724
725 Builder localFileUploader(BuildEventArtifactUploader value) {
726 this.localFileUploader = value;
727 return this;
728 }
729
730 Builder besProtoUtil(BuildEventServiceProtoUtil value) {
731 this.besProtoUtil = value;
732 return this;
733 }
734
735 Builder bepOptions(BuildEventProtocolOptions value) {
736 this.bepOptions = value;
737 return this;
738 }
739
740 Builder publishLifecycleEvents(boolean value) {
741 this.publishLifecycleEvents = value;
742 return this;
743 }
744
745 Builder closeTimeout(Duration value) {
746 this.closeTimeout = value;
747 return this;
748 }
749
750 Builder clock(Clock value) {
751 this.clock = value;
752 return this;
753 }
754
lpino2bf89062019-02-21 03:24:49 -0800755 Builder sleeper(Sleeper value) {
756 this.sleeper = value;
757 return this;
758 }
759
760 Builder artifactGroupNamer(ArtifactGroupNamer value) {
761 this.artifactGroupNamer = value;
762 return this;
763 }
764
765 Builder eventBus(EventBus value) {
766 this.eventBus = value;
767 return this;
768 }
769
770 BuildEventServiceUploader build() {
771 return new BuildEventServiceUploader(
772 checkNotNull(besClient),
773 checkNotNull(localFileUploader),
774 checkNotNull(besProtoUtil),
775 checkNotNull(bepOptions),
776 publishLifecycleEvents,
777 checkNotNull(closeTimeout),
lpino2bf89062019-02-21 03:24:49 -0800778 checkNotNull(sleeper),
779 checkNotNull(clock),
780 checkNotNull(artifactGroupNamer),
781 checkNotNull(eventBus));
782 }
783 }
lpino83a1bfd2019-02-14 15:30:18 -0800784}
785