Refactor the build event service transport

- use an internal event wrapper to unify handling
- rewrite the test to be more explicit about event names and ordering

This a part split out of unknown commit.

PiperOrigin-RevId: 200015904
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
index a6c23c3..217fcbd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -103,19 +103,8 @@
         .build();
   }
 
-  /**
-   * Utility method used to create a PublishBuildToolEventStreamRequest that delimits the end of the
-   * stream.
-   */
-  public PublishBuildToolEventStreamRequest streamFinished() {
-    return streamFinished(streamSequenceNumber.getAndIncrement());
-  }
-
-  /**
-   * Utility method used to create a PublishBuildToolEventStreamRequest from an packed bazel event
-   */
-  public PublishBuildToolEventStreamRequest bazelEvent(Any packedEvent) {
-    return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent);
+  public int nextSequenceNumber() {
+    return streamSequenceNumber.getAndIncrement();
   }
 
   @VisibleForTesting
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index c19361e..37355cc 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -18,7 +18,6 @@
 import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
 import static com.google.devtools.build.lib.events.EventKind.ERROR;
 import static com.google.devtools.build.lib.events.EventKind.INFO;
-import static com.google.devtools.build.v1.BuildEvent.EventCase.COMPONENT_STREAM_FINISHED;
 import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_FAILED;
 import static com.google.devtools.build.v1.BuildStatus.Result.COMMAND_SUCCEEDED;
 import static com.google.devtools.build.v1.BuildStatus.Result.UNKNOWN_STATUS;
@@ -26,7 +25,6 @@
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,12 +32,11 @@
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
+import com.google.devtools.build.lib.buildeventstream.BuildCompletingEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
 import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
 import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
-import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEvent.PayloadCase;
-import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildFinished;
 import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
 import com.google.devtools.build.lib.buildeventstream.PathConverter;
 import com.google.devtools.build.lib.clock.Clock;
@@ -69,6 +66,8 @@
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
@@ -99,9 +98,9 @@
   private final PathConverter pathConverter;
   private final Sleeper sleeper;
   /** Contains all pendingAck events that might be retried in case of failures. */
-  private ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck;
+  private ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck;
   /** Contains all events should be sent ordered by sequence number. */
-  private final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend;
+  private final BlockingDeque<InternalOrderedBuildEvent> pendingSend;
   /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
   private Result invocationResult;
   /** Used to block until all events have been uploaded. */
@@ -120,7 +119,7 @@
    * Returns the number of ACKs received since the last time {@link #publishEventStream()} was
    * retried due to a failure.
    */
-  private volatile int acksReceivedSinceLastRetry;
+  private final AtomicInteger acksReceivedSinceLastRetry = new AtomicInteger();
 
   public BuildEventServiceTransport(
       BuildEventServiceClient besClient,
@@ -173,8 +172,9 @@
       Sleeper sleeper,
       @Nullable String besResultsUrl) {
     this.besClient = besClient;
-    this.besProtoUtil = new BuildEventServiceProtoUtil(
-        buildRequestId, invocationId, projectId, command, clock, keywords);
+    this.besProtoUtil =
+        new BuildEventServiceProtoUtil(
+            buildRequestId, invocationId, projectId, command, clock, keywords);
     this.publishLifecycleEvents = publishLifecycleEvents;
     this.moduleEnvironment = moduleEnvironment;
     this.commandLineReporter = commandLineReporter;
@@ -200,13 +200,13 @@
 
   @Override
   public ListenableFuture<Void> close() {
-    return close(/*now=*/false);
+    return close(/*now=*/ false);
   }
 
   @Override
   @SuppressWarnings("FutureReturnValueIgnored")
   public void closeNow() {
-    close(/*now=*/true);
+    close(/*now=*/ true);
   }
 
   private synchronized ListenableFuture<Void> close(boolean now) {
@@ -234,7 +234,8 @@
     uploaderExecutorService.execute(
         () -> {
           try {
-            sendOrderedBuildEvent(besProtoUtil.streamFinished());
+            sendOrderedBuildEvent(
+                new LastInternalOrderedBuildEvent(besProtoUtil.nextSequenceNumber()));
 
             if (errorsReported) {
               // If we encountered errors before and have already reported them, then we should
@@ -244,7 +245,7 @@
 
             report(INFO, "Waiting for Build Event Protocol upload to finish.");
             try {
-              if (Duration.ZERO.equals(uploadTimeout)) {
+              if (uploadTimeout.isZero()) {
                 uploadComplete.get();
               } else {
                 uploadComplete.get(uploadTimeout.toMillis(), MILLISECONDS);
@@ -297,33 +298,18 @@
 
   @Override
   public synchronized void sendBuildEvent(BuildEvent event, final ArtifactGroupNamer namer) {
-    BuildEventStreamProtos.BuildEvent eventProto = event.asStreamProto(
-        new BuildEventContext() {
-          @Override
-          public PathConverter pathConverter() {
-            return pathConverter;
-          }
-
-          @Override
-          public ArtifactGroupNamer artifactGroupNamer() {
-            return namer;
-          }
-
-          @Override
-          public BuildEventProtocolOptions getOptions() {
-            return protocolOptions;
-          }
-        });
-    if (PayloadCase.FINISHED.equals(eventProto.getPayloadCase())) {
-      BuildFinished finished = eventProto.getFinished();
-      if (finished.hasExitCode() && finished.getExitCode().getCode() == 0) {
+    if (event instanceof BuildCompletingEvent) {
+      BuildCompletingEvent completingEvent = (BuildCompletingEvent) event;
+      if (completingEvent.getExitCode() != null
+          && completingEvent.getExitCode().getNumericExitCode() == 0) {
         invocationResult = COMMAND_SUCCEEDED;
       } else {
         invocationResult = COMMAND_FAILED;
       }
     }
 
-    sendOrderedBuildEvent(besProtoUtil.bazelEvent(Any.pack(eventProto)));
+    sendOrderedBuildEvent(
+        new DefaultInternalOrderedBuildEvent(event, namer, besProtoUtil.nextSequenceNumber()));
   }
 
   private String errorMessageFromException(Throwable t) {
@@ -341,10 +327,10 @@
       }
       return message;
     } else if (t instanceof ExecutionException) {
-      message = format(UPLOAD_FAILED_MESSAGE,
-          t.getCause() != null
-              ? besClient.userReadableError(t.getCause())
-              : t.getMessage());
+      message =
+          format(
+              UPLOAD_FAILED_MESSAGE,
+              t.getCause() != null ? besClient.userReadableError(t.getCause()) : t.getMessage());
       return message;
     } else {
       message = format(UPLOAD_FAILED_MESSAGE, besClient.userReadableError(t));
@@ -356,8 +342,9 @@
     String message = errorMessageFromException(t);
 
     report(ERROR, message);
-    moduleEnvironment.exit(new AbruptExitException(
-        "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR));
+    moduleEnvironment.exit(
+        new AbruptExitException(
+            "BuildEventServiceTransport internal error", ExitCode.PUBLISH_ERROR));
   }
 
   private void maybeReportUploadError() {
@@ -372,14 +359,13 @@
     }
   }
 
-  private synchronized void sendOrderedBuildEvent(
-      PublishBuildToolEventStreamRequest serialisedEvent) {
+  private synchronized void sendOrderedBuildEvent(InternalOrderedBuildEvent evtAndNamer) {
     if (uploadComplete != null && uploadComplete.isDone()) {
       maybeReportUploadError();
       return;
     }
 
-    pendingSend.add(serialisedEvent);
+    pendingSend.add(evtAndNamer);
     if (uploadComplete == null) {
       uploadComplete = uploaderExecutorService.submit(new BuildEventServiceUpload());
     }
@@ -389,9 +375,7 @@
     return invocationResult;
   }
 
-  /**
-   * Method responsible for sending all requests to BuildEventService.
-   */
+  /** Method responsible for sending all requests to BuildEventService. */
   private class BuildEventServiceUpload implements Callable<Void> {
     @Override
     public Void call() throws Exception {
@@ -468,7 +452,7 @@
    */
   private void publishEventStream() throws Exception {
     // Reschedule unacked messages if required, keeping its original order.
-    PublishBuildToolEventStreamRequest unacked;
+    InternalOrderedBuildEvent unacked;
     while ((unacked = pendingAck.pollLast()) != null) {
       pendingSend.addFirst(unacked);
     }
@@ -478,35 +462,36 @@
 
   /** Method responsible for a single Streaming RPC. */
   private void publishEventStream(
-      final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck,
-      final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
+      final ConcurrentLinkedDeque<InternalOrderedBuildEvent> pendingAck,
+      final BlockingDeque<InternalOrderedBuildEvent> pendingSend,
       final BuildEventServiceClient besClient)
       throws Exception {
     logger.log(
         Level.INFO,
         String.format(
             "Starting PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
-    ListenableFuture<Status> streamDone = besClient
-        .openStream(ackCallback(pendingAck, besClient));
+    ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
     logger.log(
         Level.INFO,
         String.format(
             "Started PublishBuildToolEventStream() RPC pendingSendCount=%s", pendingSend.size()));
     try {
-      @Nullable PublishBuildToolEventStreamRequest event;
+      @Nullable InternalOrderedBuildEvent orderedBuildEvent;
       do {
-        event = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
-        if (event != null) {
-          pendingAck.add(event);
-          besClient.sendOverStream(event);
+        orderedBuildEvent = pendingSend.pollFirst(STREAMING_RPC_POLL_IN_SECS, TimeUnit.SECONDS);
+        if (orderedBuildEvent != null) {
+          pendingAck.add(orderedBuildEvent);
+          besClient.sendOverStream(orderedBuildEvent.serialize(pathConverter));
         }
+
         checkState(besClient.isStreamActive(), "Stream was closed prematurely.");
-      } while (!isLastEvent(event));
+      } while (orderedBuildEvent == null || !orderedBuildEvent.isLastEvent());
       logger.log(
           Level.INFO,
           String.format(
               "Will end publishEventStream() isLastEvent: %s isStreamActive: %s",
-              isLastEvent(event), besClient.isStreamActive()));
+              orderedBuildEvent != null && orderedBuildEvent.isLastEvent(),
+              besClient.isStreamActive()));
     } catch (InterruptedException e) {
       // By convention the interrupted flag should have been cleared,
       // but just to be sure clear it.
@@ -539,27 +524,17 @@
       throw e;
     } catch (TimeoutException e) {
       String additionalDetail = "Build Event Protocol upload timed out waiting for ACK messages";
-      logger
-          .log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
+      logger.log(Level.WARNING, "Cancelling publishBuildToolEventStream RPC: " + additionalDetail);
       besClient.abortStream(Status.CANCELLED.augmentDescription(additionalDetail));
       throw e;
     }
   }
 
-  private static boolean isLastEvent(@Nullable PublishBuildToolEventStreamRequest event) {
-    return event != null
-        && event.getOrderedBuildEvent().getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
-  }
-
-  @SuppressWarnings("NonAtomicVolatileUpdate")
   private Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
-      final Deque<PublishBuildToolEventStreamRequest> pendingAck,
-      final BuildEventServiceClient besClient) {
+      final Deque<InternalOrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
     return ack -> {
-      long pendingSeq =
-          pendingAck.isEmpty()
-              ? -1
-              : pendingAck.peekFirst().getOrderedBuildEvent().getSequenceNumber();
+      Preconditions.checkNotNull(ack);
+      long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
       long ackSeq = ack.getSequenceNumber();
       if (pendingSeq != ackSeq) {
         besClient.abortStream(
@@ -567,20 +542,17 @@
                 format("Expected ACK %s but was %s.", pendingSeq, ackSeq)));
         return null;
       }
-      PublishBuildToolEventStreamRequest event = pendingAck.removeFirst();
-      if (isLastEvent(event)) {
+      InternalOrderedBuildEvent event = pendingAck.removeFirst();
+      if (event.isLastEvent()) {
         logger.log(Level.INFO, "Last ACK received.");
         besClient.closeStream();
       }
-      acksReceivedSinceLastRetry++;
+      acksReceivedSinceLastRetry.incrementAndGet();
       return null;
     };
   }
 
-  /**
-   * Executes a {@link Callable} retrying on exception thrown.
-   */
-  // TODO(eduardocolaco): Implement transient/persistent failures
+  /** Executes a {@link Callable} retrying on exception thrown. */
   private void retryOnException(Callable<?> c) throws Exception {
     final int maxRetries = 5;
     final long initialDelayMillis = 0;
@@ -589,16 +561,17 @@
     int tries = 0;
     while (tries <= maxRetries) {
       try {
-        acksReceivedSinceLastRetry = 0;
+        acksReceivedSinceLastRetry.set(0);
         c.call();
         lastKnownError = null;
         return;
       } catch (InterruptedException e) {
         throw e;
       } catch (Exception e) {
-        if (acksReceivedSinceLastRetry > 0) {
-          logger.fine(String.format("ACKs received since last retry %d.",
-              acksReceivedSinceLastRetry));
+        if (acksReceivedSinceLastRetry.get() > 0) {
+          logger.fine(
+              String.format(
+                  "ACKs received since last retry %d.", acksReceivedSinceLastRetry.get()));
           tries = 0;
         }
         tries++;
@@ -635,4 +608,88 @@
       return t;
     }
   }
+
+  /**
+   * Representation of an {@link com.google.devtools.build.v1.OrderedBuildEvent} internal to the
+   * {@link BuildEventServiceTransport}. This class wraps around the {@link
+   * com.google.devtools.build.v1.OrderedBuildEvent} to simplify the retry logic, so that we don't
+   * have to separately store events before the first send attempt (non-serialized) and after
+   * (serialized).
+   */
+  private interface InternalOrderedBuildEvent {
+    boolean isLastEvent();
+
+    int getSequenceNumber();
+
+    PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter);
+  }
+
+  private class DefaultInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
+    private final BuildEvent event;
+    private final ArtifactGroupNamer artifactGroupNamer;
+    private final int sequenceNumber;
+
+    DefaultInternalOrderedBuildEvent(
+        BuildEvent event, ArtifactGroupNamer artifactGroupNamer, int sequenceNumber) {
+      this.event = Preconditions.checkNotNull(event);
+      this.artifactGroupNamer = Preconditions.checkNotNull(artifactGroupNamer);
+      this.sequenceNumber = sequenceNumber;
+    }
+
+    @Override
+    public boolean isLastEvent() {
+      return false;
+    }
+
+    @Override
+    public int getSequenceNumber() {
+      return sequenceNumber;
+    }
+
+    @Override
+    public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+      BuildEventStreamProtos.BuildEvent eventProto =
+          event.asStreamProto(
+              new BuildEventContext() {
+                @Override
+                public PathConverter pathConverter() {
+                  return pathConverter;
+                }
+
+                @Override
+                public ArtifactGroupNamer artifactGroupNamer() {
+                  return artifactGroupNamer;
+                }
+
+                @Override
+                public BuildEventProtocolOptions getOptions() {
+                  return protocolOptions;
+                }
+              });
+      return besProtoUtil.bazelEvent(sequenceNumber, Any.pack(eventProto));
+    }
+  }
+
+  private class LastInternalOrderedBuildEvent implements InternalOrderedBuildEvent {
+    private final int sequenceNumber;
+
+    LastInternalOrderedBuildEvent(int sequenceNumber) {
+      this.sequenceNumber = sequenceNumber;
+    }
+
+    @Override
+    public boolean isLastEvent() {
+      return true;
+    }
+
+    @Override
+    public int getSequenceNumber() {
+      return sequenceNumber;
+    }
+
+    @Override
+    public PublishBuildToolEventStreamRequest serialize(PathConverter pathConverter) {
+      return besProtoUtil.streamFinished(sequenceNumber);
+    }
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index f4f688e..bdaf114 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -14,12 +14,12 @@
 
 package com.google.devtools.build.lib.buildeventservice.client;
 
-import com.google.common.base.Function;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
 import com.google.devtools.build.v1.PublishLifecycleEventRequest;
 import io.grpc.Status;
+import java.util.function.Function;
 
 /** Interface used to abstract both gRPC and Stubby BuildEventServiceBackend. */
 public interface BuildEventServiceClient {
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 31710a8..3c5dfbc 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -18,7 +18,6 @@
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
@@ -36,6 +35,7 @@
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 
 /** Implementation of BuildEventServiceClient that uploads data using gRPC. */