Refactor the build event service transport
- use an internal event wrapper to unify handling
- rewrite the test to be more explicit about event names and ordering
This a part split out of unknown commit.
PiperOrigin-RevId: 200015904
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
index a6c23c3..217fcbd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -103,19 +103,8 @@
.build();
}
- /**
- * Utility method used to create a PublishBuildToolEventStreamRequest that delimits the end of the
- * stream.
- */
- public PublishBuildToolEventStreamRequest streamFinished() {
- return streamFinished(streamSequenceNumber.getAndIncrement());
- }
-
- /**
- * Utility method used to create a PublishBuildToolEventStreamRequest from an packed bazel event
- */
- public PublishBuildToolEventStreamRequest bazelEvent(Any packedEvent) {
- return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent);
+ public int nextSequenceNumber() {
+ return streamSequenceNumber.getAndIncrement();
}
@VisibleForTesting
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index c19361e..37355cc 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -18,7 +18,6 @@
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.devtools.build.lib.events.EventKind.ERROR;
import static com.google.devtools.build.lib.events.EventKind.INFO;
-import static com.google.devtools.build.v1.BuildEvent.EventCase.COMPONENT_STREAM_FINISHED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
@@ -26,7 +25,6 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
@@ -34,12 +32,11 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
+import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
-import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEvent.PayloadCase;
-import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildFinished;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.clock.Clock;
@@ -69,6 +66,8 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -99,9 +98,9 @@
private final PathConverter pathConverter;
private final Sleeper sleeper;
/** Contains all pendingAck events that might be retried in case of failures. */
- private ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck;
+ private ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck;
/** Contains all events should be sent ordered by sequence number. */
- private final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend;
+ private final BlockingDeque<InternalOrderedBuildEvent> pendingSend;
/** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
private Result invocationResult;
/** Used to block until all events have been uploaded. */
@@ -120,7 +119,7 @@
* Returns the number of ACKs received since the last time {@link #publishEventStream()} was
* retried due to a failure.
*/
- private volatile int acksReceivedSinceLastRetry;
+ private final AtomicInteger acksReceivedSinceLastRetry = new AtomicInteger();
public BuildEventServiceTransport(
BuildEventServiceClient besClient,
@@ -173,8 +172,9 @@
Sleeper sleeper,
@Nullable String besResultsUrl) {
this.besClient = besClient;
- this.besProtoUtil = new BuildEventServiceProtoUtil(
- buildRequestId, invocationId, projectId, command, clock, keywords);
+ this.besProtoUtil =
+ new BuildEventServiceProtoUtil(
+ buildRequestId, invocationId, projectId, command, clock, keywords);
this.publishLifecycleEvents = publishLifecycleEvents;
this.moduleEnvironment = moduleEnvironment;
this.commandLineReporter = commandLineReporter;
@@ -200,13 +200,13 @@
@Override
public ListenableFuture<Void> close() {
- return close(/*now=*/false);
+ return close(/*now=*/ false);
}
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void closeNow() {
- close(/*now=*/true);
+ close(/*now=*/ true);
}
private synchronized ListenableFuture<Void> close(boolean now) {
@@ -234,7 +234,8 @@
uploaderExecutorService.execute(
() -> {
try {
- sendOrderedBuildEvent(besProtoUtil.streamFinished());
+ sendOrderedBuildEvent(
+ new LastInternalOrderedBuildEvent(besProtoUtil.nextSequenceNumber()));
if (errorsReported) {
// If we encountered errors before and have already reported them, then we should
@@ -244,7 +245,7 @@
report(INFO, "Waiting for Build Event Protocol upload to finish.");
try {
- if (Duration.ZERO.equals(uploadTimeout)) {
+ if (uploadTimeout.isZero()) {
uploadComplete.get();
} else {
uploadComplete.get(uploadTimeout.toMillis(), MILLISECONDS);
@@ -297,33 +298,18 @@
@Override
public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
- BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto(
- new BuildEventContext() {
- @Override
- public PathConverter pathConverter() {
- return pathConverter;
- }
-
- @Override
- public ArtifactGroupNamer artifactGroupNamer() {
- return namer;
- }
-
- @Override
- public BuildEventProtocolOptions getOptions() {
- return protocolOptions;
- }
- });
- if (PayloadCase.FINISHED.equals(eventProto.getPayloadCase())) {
- BuildFinished finished = eventProto.getFinished();
- if (finished.hasExitCode() && finished.getExitCode().getCode() == 0) {
+ if (event instanceof BuildCompletingEvent) {
+ BuildCompletingEvent completingEvent = (BuildCompletingEvent) event;
+ if (completingEvent.getExitCode() != null
+ && completingEvent.getExitCode().getNumericExitCode() == 0) {
invocationResult = COMMAND_SUCCEEDED;
} else {
invocationResult = COMMAND_FAILED;
}
}
- sendOrderedBuildEvent(besProtoUtil.bazelEvent(Any.pack(eventProto)));
+ sendOrderedBuildEvent(
+ new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber()));
}
private String errorMessageFromException(Throwable t) {
@@ -341,10 +327,10 @@
}
return message;
} else if (t instanceof ExecutionException) {
- message = format(UPLOAD_FAILED_MESSAGE,
- t.getCause() != null
- ? besClient.userReadableError(t.getCause())
- : t.getMessage());
+ message =
+ format(
+ UPLOAD_FAILED_MESSAGE,
+ t.getCause() != null ? besClient.userReadableError(t.getCause()) : t.getMessage());
return message;
} else {
message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t));
@@ -356,8 +342,9 @@
String message = errorMessageFromException(t);
report(ERROR, message);
- moduleEnvironment.exit(new AbruptExitException(
- "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR));
+ moduleEnvironment.exit(
+ new AbruptExitException(
+ "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR));
}
private void maybeReportUploadError() {
@@ -372,14 +359,13 @@
}
}
- private synchronized void sendOrderedBuildEvent(
- PublishBuildToolEventStreamRequest serialisedEvent) {
+ private synchronized void sendOrderedBuildEvent(InternalOrderedBuildEvent evtAndNamer) {
if (uploadComplete != null && uploadComplete.isDone()) {
maybeReportUploadError();
return;
}
- pendingSend.add(serialisedEvent);
+ pendingSend.add(evtAndNamer);
if (uploadComplete == null) {
uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload());
}
@@ -389,9 +375,7 @@
return invocationResult;
}
- /**
- * Method responsible for sending all requests to BuildEventService.
- */
+ /** Method responsible for sending all requests to BuildEventService. */
private class BuildEventServiceUpload implements Callable<Void> {
@Override
public Void call() throws Exception {
@@ -468,7 +452,7 @@
*/
private void publishEventStream() throws Exception {
// Reschedule unacked messages if required, keeping its original order.
- PublishBuildToolEventStreamRequest unacked;
+ InternalOrderedBuildEvent unacked;
while ((unacked = pendingAck.pollLast()) != null) {
pendingSend.addFirst(unacked);
}
@@ -478,35 +462,36 @@
/** Method responsible for a single Streaming RPC. */
private void publishEventStream(
- final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck,
- final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
+ final ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck,
+ final BlockingDeque<InternalOrderedBuildEvent> pendingSend,
final BuildEventServiceClient besClient)
throws Exception {
logger.log(
Level.INFO,
String.format(
"Starting PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
- ListenableFuture<Status> streamDone = besClient
- .openStream(ackCallback(pendingAck, besClient));
+ ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
logger.log(
Level.INFO,
String.format(
"Started PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
try {
- @Nullable PublishBuildToolEventStreamRequest event;
+ @Nullable InternalOrderedBuildEvent orderedBuildEvent;
do {
- event = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
- if (event != null) {
- pendingAck.add(event);
- besClient.sendOverStream(event);
+ orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
+ if (orderedBuildEvent != null) {
+ pendingAck.add(orderedBuildEvent);
+ besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
}
+
checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
- } while (!isLastEvent(event));
+ } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
logger.log(
Level.INFO,
String.format(
"Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
- isLastEvent(event), besClient.isStreamActive()));
+ orderedBuildEvent != null && orderedBuildEvent.isLastEvent(),
+ besClient.isStreamActive()));
} catch (InterruptedException e) {
// By convention the interrupted flag should have been cleared,
// but just to be sure clear it.
@@ -539,27 +524,17 @@
throw e;
} catch (TimeoutException e) {
String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
- logger
- .log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
+ logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
throw e;
}
}
- private static boolean isLastEvent(@Nullable PublishBuildToolEventStreamRequest event) {
- return event != null
- && event.getOrderedBuildEvent().getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
- }
-
- @SuppressWarnings("NonAtomicVolatileUpdate")
private Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
- final Deque<PublishBuildToolEventStreamRequest> pendingAck,
- final BuildEventServiceClient besClient) {
+ final Deque<InternalOrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
return ack -> {
- long pendingSeq =
- pendingAck.isEmpty()
- ? -1
- : pendingAck.peekFirst().getOrderedBuildEvent().getSequenceNumber();
+ Preconditions.checkNotNull(ack);
+ long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
long ackSeq = ack.getSequenceNumber();
if (pendingSeq != ackSeq) {
besClient.abortStream(
@@ -567,20 +542,17 @@
format("Expected ACK %s but was %s.", pendingSeq, ackSeq)));
return null;
}
- PublishBuildToolEventStreamRequest event = pendingAck.removeFirst();
- if (isLastEvent(event)) {
+ InternalOrderedBuildEvent event = pendingAck.removeFirst();
+ if (event.isLastEvent()) {
logger.log(Level.INFO, "Last ACK received.");
besClient.closeStream();
}
- acksReceivedSinceLastRetry++;
+ acksReceivedSinceLastRetry.incrementAndGet();
return null;
};
}
- /**
- * Executes a {@link Callable} retrying on exception thrown.
- */
- // TODO(eduardocolaco): Implement transient/persistent failures
+ /** Executes a {@link Callable} retrying on exception thrown. */
private void retryOnException(Callable<?> c) throws Exception {
final int maxRetries = 5;
final long initialDelayMillis = 0;
@@ -589,16 +561,17 @@
int tries = 0;
while (tries <= maxRetries) {
try {
- acksReceivedSinceLastRetry = 0;
+ acksReceivedSinceLastRetry.set(0);
c.call();
lastKnownError = null;
return;
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
- if (acksReceivedSinceLastRetry > 0) {
- logger.fine(String.format("ACKs received since last retry %d.",
- acksReceivedSinceLastRetry));
+ if (acksReceivedSinceLastRetry.get() > 0) {
+ logger.fine(
+ String.format(
+ "ACKs received since last retry %d.", acksReceivedSinceLastRetry.get()));
tries = 0;
}
tries++;
@@ -635,4 +608,88 @@
return t;
}
}
+
+ /**
+ * Representation of an {@link com.google.devtools.build.v1.OrderedBuildEvent} internal to the
+ * {@link BuildEventServiceTransport}. This class wraps around the {@link
+ * com.google.devtools.build.v1.OrderedBuildEvent} to simplify the retry logic, so that we don't
+ * have to separately store events before the first send attempt (non-serialized) and after
+ * (serialized).
+ */
+ private interface InternalOrderedBuildEvent {
+ boolean isLastEvent();
+
+ int getSequenceNumber();
+
+ PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
+ }
+
+ private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
+ private final BuildEvent event;
+ private final ArtifactGroupNamer artifactGroupNamer;
+ private final int sequenceNumber;
+
+ DefaultInternalOrderedBuildEvent(
+ BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) {
+ this.event = Preconditions.checkNotNull(event);
+ this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public boolean isLastEvent() {
+ return false;
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+ BuildEventStreamProtos.BuildEvent eventProto =
+ event.asStreamProto(
+ new BuildEventContext() {
+ @Override
+ public PathConverter pathConverter() {
+ return pathConverter;
+ }
+
+ @Override
+ public ArtifactGroupNamer artifactGroupNamer() {
+ return artifactGroupNamer;
+ }
+
+ @Override
+ public BuildEventProtocolOptions getOptions() {
+ return protocolOptions;
+ }
+ });
+ return besProtoUtil.bazelEvent(sequenceNumber, Any.pack(eventProto));
+ }
+ }
+
+ private class LastInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
+ private final int sequenceNumber;
+
+ LastInternalOrderedBuildEvent(int sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public boolean isLastEvent() {
+ return true;
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+ return besProtoUtil.streamFinished(sequenceNumber);
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index f4f688e..bdaf114 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -14,12 +14,12 @@
package com.google.devtools.build.lib.buildeventservice.client;
-import com.google.common.base.Function;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.Status;
+import java.util.function.Function;
/** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */
public interface BuildEventServiceClient {
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 31710a8..3c5dfbc 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -36,6 +35,7 @@
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import javax.annotation.Nullable;
/** Implementation of BuildEventServiceClient that uploads data using gRPC. */