Refactor the build event service transport - use an internal event wrapper to unify handling - rewrite the test to be more explicit about event names and ordering This a part split out of unknown commit. PiperOrigin-RevId: 200015904
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java index a6c23c3..217fcbd 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -103,19 +103,8 @@ .build(); } - /** - * Utility method used to create a PublishBuildToolEventStreamRequest that delimits the end of the - * stream. - */ - public PublishBuildToolEventStreamRequest streamFinished() { - return streamFinished(streamSequenceNumber.getAndIncrement()); - } - - /** - * Utility method used to create a PublishBuildToolEventStreamRequest from an packed bazel event - */ - public PublishBuildToolEventStreamRequest bazelEvent(Any packedEvent) { - return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent); + public int nextSequenceNumber() { + return streamSequenceNumber.getAndIncrement(); } @VisibleForTesting
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index c19361e..37355cc 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -18,7 +18,6 @@ import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.devtools.build.lib.events.EventKind.ERROR; import static com.google.devtools.build.lib.events.EventKind.INFO; -import static com.google.devtools.build.v1.BuildEvent.EventCase.COMPONENT_STREAM_FINISHED; import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED; import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED; import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS; @@ -26,7 +25,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.util.concurrent.ListenableFuture; @@ -34,12 +32,11 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; +import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventContext; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; -import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEvent.PayloadCase; -import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildFinished; import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; import com.google.devtools.build.lib.buildeventstream.PathConverter; import com.google.devtools.build.lib.clock.Clock; @@ -69,6 +66,8 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -99,9 +98,9 @@ private final PathConverter pathConverter; private final Sleeper sleeper; /** Contains all pendingAck events that might be retried in case of failures. */ - private ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck; + private ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck; /** Contains all events should be sent ordered by sequence number. */ - private final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend; + private final BlockingDeque<InternalOrderedBuildEvent> pendingSend; /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */ private Result invocationResult; /** Used to block until all events have been uploaded. */ @@ -120,7 +119,7 @@ * Returns the number of ACKs received since the last time {@link #publishEventStream()} was * retried due to a failure. */ - private volatile int acksReceivedSinceLastRetry; + private final AtomicInteger acksReceivedSinceLastRetry = new AtomicInteger(); public BuildEventServiceTransport( BuildEventServiceClient besClient, @@ -173,8 +172,9 @@ Sleeper sleeper, @Nullable String besResultsUrl) { this.besClient = besClient; - this.besProtoUtil = new BuildEventServiceProtoUtil( - buildRequestId, invocationId, projectId, command, clock, keywords); + this.besProtoUtil = + new BuildEventServiceProtoUtil( + buildRequestId, invocationId, projectId, command, clock, keywords); this.publishLifecycleEvents = publishLifecycleEvents; this.moduleEnvironment = moduleEnvironment; this.commandLineReporter = commandLineReporter; @@ -200,13 +200,13 @@ @Override public ListenableFuture<Void> close() { - return close(/*now=*/false); + return close(/*now=*/ false); } @Override @SuppressWarnings("FutureReturnValueIgnored") public void closeNow() { - close(/*now=*/true); + close(/*now=*/ true); } private synchronized ListenableFuture<Void> close(boolean now) { @@ -234,7 +234,8 @@ uploaderExecutorService.execute( () -> { try { - sendOrderedBuildEvent(besProtoUtil.streamFinished()); + sendOrderedBuildEvent( + new LastInternalOrderedBuildEvent(besProtoUtil.nextSequenceNumber())); if (errorsReported) { // If we encountered errors before and have already reported them, then we should @@ -244,7 +245,7 @@ report(INFO, "Waiting for Build Event Protocol upload to finish."); try { - if (Duration.ZERO.equals(uploadTimeout)) { + if (uploadTimeout.isZero()) { uploadComplete.get(); } else { uploadComplete.get(uploadTimeout.toMillis(), MILLISECONDS); @@ -297,33 +298,18 @@ @Override public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) { - BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto( - new BuildEventContext() { - @Override - public PathConverter pathConverter() { - return pathConverter; - } - - @Override - public ArtifactGroupNamer artifactGroupNamer() { - return namer; - } - - @Override - public BuildEventProtocolOptions getOptions() { - return protocolOptions; - } - }); - if (PayloadCase.FINISHED.equals(eventProto.getPayloadCase())) { - BuildFinished finished = eventProto.getFinished(); - if (finished.hasExitCode() && finished.getExitCode().getCode() == 0) { + if (event instanceof BuildCompletingEvent) { + BuildCompletingEvent completingEvent = (BuildCompletingEvent) event; + if (completingEvent.getExitCode() != null + && completingEvent.getExitCode().getNumericExitCode() == 0) { invocationResult = COMMAND_SUCCEEDED; } else { invocationResult = COMMAND_FAILED; } } - sendOrderedBuildEvent(besProtoUtil.bazelEvent(Any.pack(eventProto))); + sendOrderedBuildEvent( + new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber())); } private String errorMessageFromException(Throwable t) { @@ -341,10 +327,10 @@ } return message; } else if (t instanceof ExecutionException) { - message = format(UPLOAD_FAILED_MESSAGE, - t.getCause() != null - ? besClient.userReadableError(t.getCause()) - : t.getMessage()); + message = + format( + UPLOAD_FAILED_MESSAGE, + t.getCause() != null ? besClient.userReadableError(t.getCause()) : t.getMessage()); return message; } else { message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t)); @@ -356,8 +342,9 @@ String message = errorMessageFromException(t); report(ERROR, message); - moduleEnvironment.exit(new AbruptExitException( - "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR)); + moduleEnvironment.exit( + new AbruptExitException( + "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR)); } private void maybeReportUploadError() { @@ -372,14 +359,13 @@ } } - private synchronized void sendOrderedBuildEvent( - PublishBuildToolEventStreamRequest serialisedEvent) { + private synchronized void sendOrderedBuildEvent(InternalOrderedBuildEvent evtAndNamer) { if (uploadComplete != null && uploadComplete.isDone()) { maybeReportUploadError(); return; } - pendingSend.add(serialisedEvent); + pendingSend.add(evtAndNamer); if (uploadComplete == null) { uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload()); } @@ -389,9 +375,7 @@ return invocationResult; } - /** - * Method responsible for sending all requests to BuildEventService. - */ + /** Method responsible for sending all requests to BuildEventService. */ private class BuildEventServiceUpload implements Callable<Void> { @Override public Void call() throws Exception { @@ -468,7 +452,7 @@ */ private void publishEventStream() throws Exception { // Reschedule unacked messages if required, keeping its original order. - PublishBuildToolEventStreamRequest unacked; + InternalOrderedBuildEvent unacked; while ((unacked = pendingAck.pollLast()) != null) { pendingSend.addFirst(unacked); } @@ -478,35 +462,36 @@ /** Method responsible for a single Streaming RPC. */ private void publishEventStream( - final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck, - final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend, + final ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck, + final BlockingDeque<InternalOrderedBuildEvent> pendingSend, final BuildEventServiceClient besClient) throws Exception { logger.log( Level.INFO, String.format( "Starting PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size())); - ListenableFuture<Status> streamDone = besClient - .openStream(ackCallback(pendingAck, besClient)); + ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient)); logger.log( Level.INFO, String.format( "Started PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size())); try { - @Nullable PublishBuildToolEventStreamRequest event; + @Nullable InternalOrderedBuildEvent orderedBuildEvent; do { - event = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); - if (event != null) { - pendingAck.add(event); - besClient.sendOverStream(event); + orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS); + if (orderedBuildEvent != null) { + pendingAck.add(orderedBuildEvent); + besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter)); } + checkState(besClient.isStreamActive(), "Stream was closed prematurely."); - } while (!isLastEvent(event)); + } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent()); logger.log( Level.INFO, String.format( "Will end publishEventStream() isLastEvent: %s isStreamActive: %s", - isLastEvent(event), besClient.isStreamActive())); + orderedBuildEvent != null && orderedBuildEvent.isLastEvent(), + besClient.isStreamActive())); } catch (InterruptedException e) { // By convention the interrupted flag should have been cleared, // but just to be sure clear it. @@ -539,27 +524,17 @@ throw e; } catch (TimeoutException e) { String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages"; - logger - .log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail); + logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail); besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail)); throw e; } } - private static boolean isLastEvent(@Nullable PublishBuildToolEventStreamRequest event) { - return event != null - && event.getOrderedBuildEvent().getEvent().getEventCase() == COMPONENT_STREAM_FINISHED; - } - - @SuppressWarnings("NonAtomicVolatileUpdate") private Function<PublishBuildToolEventStreamResponse, Void> ackCallback( - final Deque<PublishBuildToolEventStreamRequest> pendingAck, - final BuildEventServiceClient besClient) { + final Deque<InternalOrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) { return ack -> { - long pendingSeq = - pendingAck.isEmpty() - ? -1 - : pendingAck.peekFirst().getOrderedBuildEvent().getSequenceNumber(); + Preconditions.checkNotNull(ack); + long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber(); long ackSeq = ack.getSequenceNumber(); if (pendingSeq != ackSeq) { besClient.abortStream( @@ -567,20 +542,17 @@ format("Expected ACK %s but was %s.", pendingSeq, ackSeq))); return null; } - PublishBuildToolEventStreamRequest event = pendingAck.removeFirst(); - if (isLastEvent(event)) { + InternalOrderedBuildEvent event = pendingAck.removeFirst(); + if (event.isLastEvent()) { logger.log(Level.INFO, "Last ACK received."); besClient.closeStream(); } - acksReceivedSinceLastRetry++; + acksReceivedSinceLastRetry.incrementAndGet(); return null; }; } - /** - * Executes a {@link Callable} retrying on exception thrown. - */ - // TODO(eduardocolaco): Implement transient/persistent failures + /** Executes a {@link Callable} retrying on exception thrown. */ private void retryOnException(Callable<?> c) throws Exception { final int maxRetries = 5; final long initialDelayMillis = 0; @@ -589,16 +561,17 @@ int tries = 0; while (tries <= maxRetries) { try { - acksReceivedSinceLastRetry = 0; + acksReceivedSinceLastRetry.set(0); c.call(); lastKnownError = null; return; } catch (InterruptedException e) { throw e; } catch (Exception e) { - if (acksReceivedSinceLastRetry > 0) { - logger.fine(String.format("ACKs received since last retry %d.", - acksReceivedSinceLastRetry)); + if (acksReceivedSinceLastRetry.get() > 0) { + logger.fine( + String.format( + "ACKs received since last retry %d.", acksReceivedSinceLastRetry.get())); tries = 0; } tries++; @@ -635,4 +608,88 @@ return t; } } + + /** + * Representation of an {@link com.google.devtools.build.v1.OrderedBuildEvent} internal to the + * {@link BuildEventServiceTransport}. This class wraps around the {@link + * com.google.devtools.build.v1.OrderedBuildEvent} to simplify the retry logic, so that we don't + * have to separately store events before the first send attempt (non-serialized) and after + * (serialized). + */ + private interface InternalOrderedBuildEvent { + boolean isLastEvent(); + + int getSequenceNumber(); + + PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter); + } + + private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent { + private final BuildEvent event; + private final ArtifactGroupNamer artifactGroupNamer; + private final int sequenceNumber; + + DefaultInternalOrderedBuildEvent( + BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) { + this.event = Preconditions.checkNotNull(event); + this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer); + this.sequenceNumber = sequenceNumber; + } + + @Override + public boolean isLastEvent() { + return false; + } + + @Override + public int getSequenceNumber() { + return sequenceNumber; + } + + @Override + public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) { + BuildEventStreamProtos.BuildEvent eventProto = + event.asStreamProto( + new BuildEventContext() { + @Override + public PathConverter pathConverter() { + return pathConverter; + } + + @Override + public ArtifactGroupNamer artifactGroupNamer() { + return artifactGroupNamer; + } + + @Override + public BuildEventProtocolOptions getOptions() { + return protocolOptions; + } + }); + return besProtoUtil.bazelEvent(sequenceNumber, Any.pack(eventProto)); + } + } + + private class LastInternalOrderedBuildEvent implements InternalOrderedBuildEvent { + private final int sequenceNumber; + + LastInternalOrderedBuildEvent(int sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public boolean isLastEvent() { + return true; + } + + @Override + public int getSequenceNumber() { + return sequenceNumber; + } + + @Override + public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) { + return besProtoUtil.streamFinished(sequenceNumber); + } + } }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java index f4f688e..bdaf114 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -14,12 +14,12 @@ package com.google.devtools.build.lib.buildeventservice.client; -import com.google.common.base.Function; import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest; import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse; import com.google.devtools.build.v1.PublishLifecycleEventRequest; import io.grpc.Status; +import java.util.function.Function; /** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */ public interface BuildEventServiceClient {
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java index 31710a8..3c5dfbc 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -36,6 +35,7 @@ import io.grpc.stub.StreamObserver; import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import javax.annotation.Nullable; /** Implementation of BuildEventServiceClient that uploads data using gRPC. */