bes: Update RPC PublishBuildToolEventStream.

Change the return type for RPC PublishBuildToolEventStream from OrderedBuildEvent to PublishBuildToolEventStreamRequest.

Change-Id: I8421e9d2d8a5e6720d7b8d6de4417bee71c0fa68
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
index 7ecfcb0..85259f8 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceProtoUtil.java
@@ -28,6 +28,7 @@
 import com.google.devtools.build.v1.BuildStatus;
 import com.google.devtools.build.v1.BuildStatus.Result;
 import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
 import com.google.devtools.build.v1.PublishLifecycleEventRequest;
 import com.google.devtools.build.v1.StreamId;
 import com.google.devtools.build.v1.StreamId.BuildComponent;
@@ -56,61 +57,66 @@
 
   public PublishLifecycleEventRequest buildEnqueued() {
     return lifecycleEvent(projectId, 1,
-            com.google.devtools.build.v1.BuildEvent.newBuilder()
-                .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
-                .setBuildEnqueued(BuildEnqueued.newBuilder()))
+        com.google.devtools.build.v1.BuildEvent.newBuilder()
+            .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+            .setBuildEnqueued(BuildEnqueued.newBuilder()))
         .build();
   }
 
   public PublishLifecycleEventRequest buildFinished(Result result) {
     return lifecycleEvent(projectId, 2,
-            com.google.devtools.build.v1.BuildEvent.newBuilder()
-                .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
-                .setBuildFinished(
-                    BuildFinished.newBuilder()
-                        .setStatus(BuildStatus.newBuilder().setResult(result))))
+        com.google.devtools.build.v1.BuildEvent.newBuilder()
+            .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+            .setBuildFinished(
+                BuildFinished.newBuilder()
+                    .setStatus(BuildStatus.newBuilder().setResult(result))))
         .build();
   }
 
   public PublishLifecycleEventRequest invocationStarted() {
     return lifecycleEvent(projectId, 1,
-            com.google.devtools.build.v1.BuildEvent.newBuilder()
-                .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
-                .setInvocationAttemptStarted(
-                    InvocationAttemptStarted.newBuilder().setAttemptNumber(1)))
+        com.google.devtools.build.v1.BuildEvent.newBuilder()
+            .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+            .setInvocationAttemptStarted(
+                InvocationAttemptStarted.newBuilder().setAttemptNumber(1)))
         .build();
   }
 
   public PublishLifecycleEventRequest invocationFinished(Result result) {
     return lifecycleEvent(projectId, 2,
-            com.google.devtools.build.v1.BuildEvent.newBuilder()
-                .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
-                .setInvocationAttemptFinished(
-                    InvocationAttemptFinished.newBuilder()
-                        .setInvocationStatus(BuildStatus.newBuilder().setResult(result))))
+        com.google.devtools.build.v1.BuildEvent.newBuilder()
+            .setEventTime(Timestamps.fromMillis(clock.currentTimeMillis()))
+            .setInvocationAttemptFinished(
+                InvocationAttemptFinished.newBuilder()
+                    .setInvocationStatus(BuildStatus.newBuilder().setResult(result))))
         .build();
   }
 
-  /** Utility method used to create a OrderedBuildEvent that delimits the end of the stream. */
-  public OrderedBuildEvent streamFinished() {
+  /**
+   * Utility method used to create a PublishBuildToolEventStreamRequest that delimits the end of the
+   * stream.
+   */
+  public PublishBuildToolEventStreamRequest streamFinished() {
     return streamFinished(streamSequenceNumber.getAndIncrement());
   }
 
-  /** Utility method used to create a OrderedBuildEvent from an packed bazel event */
-  public OrderedBuildEvent bazelEvent(Any packedEvent) {
+  /**
+   * Utility method used to create a PublishBuildToolEventStreamRequest from an packed bazel event
+   */
+  public PublishBuildToolEventStreamRequest bazelEvent(Any packedEvent) {
     return bazelEvent(streamSequenceNumber.getAndIncrement(), packedEvent);
   }
 
   @VisibleForTesting
-  public OrderedBuildEvent bazelEvent(int sequenceNumber, Any packedEvent) {
-    return orderedBuildEvent(
+  public PublishBuildToolEventStreamRequest bazelEvent(int sequenceNumber, Any packedEvent) {
+    return publishBuildToolEventStreamRequest(
         sequenceNumber,
         com.google.devtools.build.v1.BuildEvent.newBuilder().setBazelEvent(packedEvent));
   }
 
   @VisibleForTesting
-  public OrderedBuildEvent streamFinished(int sequenceNumber) {
-    return orderedBuildEvent(
+  public PublishBuildToolEventStreamRequest streamFinished(int sequenceNumber) {
+    return publishBuildToolEventStreamRequest(
         sequenceNumber,
         BuildEvent.newBuilder()
             .setComponentStreamFinished(
@@ -118,8 +124,9 @@
   }
 
   @VisibleForTesting
-  public OrderedBuildEvent orderedBuildEvent(int sequenceNumber, BuildEvent.Builder besEvent) {
-    return OrderedBuildEvent.newBuilder()
+  public PublishBuildToolEventStreamRequest publishBuildToolEventStreamRequest(
+      int sequenceNumber, BuildEvent.Builder besEvent) {
+    return PublishBuildToolEventStreamRequest.newBuilder()
         .setSequenceNumber(sequenceNumber)
         .setEvent(besEvent.setEventTime(Timestamps.fromMillis(clock.currentTimeMillis())))
         .setStreamId(streamId(besEvent.getEventCase()))
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index b7c8ce7..ce2053c8 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -49,7 +49,7 @@
 import com.google.devtools.build.lib.util.Clock;
 import com.google.devtools.build.lib.util.ExitCode;
 import com.google.devtools.build.v1.BuildStatus.Result;
-import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
 import com.google.devtools.build.v1.PublishLifecycleEventRequest;
 import com.google.protobuf.Any;
@@ -93,9 +93,9 @@
 
   private final PathConverter pathConverter;
   /** Contains all pendingAck events that might be retried in case of failures. */
-  private ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck;
+  private ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck;
   /** Contains all events should be sent ordered by sequence number. */
-  private final BlockingDeque<OrderedBuildEvent> pendingSend;
+  private final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend;
   /** Holds the result status of the BuildEventStreamProtos BuildFinished event. */
   private Result invocationResult;
   /** Used to block until all events have been uploaded. */
@@ -304,7 +304,8 @@
     }
   }
 
-  private synchronized void sendOrderedBuildEvent(OrderedBuildEvent serialisedEvent) {
+  private synchronized void sendOrderedBuildEvent(
+      PublishBuildToolEventStreamRequest serialisedEvent) {
     if (uploadComplete != null && uploadComplete.isDone()) {
       maybeReportUploadError();
       return;
@@ -399,7 +400,7 @@
    */
   private Status publishEventStream() throws Exception {
     // Reschedule unacked messages if required, keeping its original order.
-    OrderedBuildEvent unacked;
+    PublishBuildToolEventStreamRequest unacked;
     while ((unacked = pendingAck.pollLast()) != null) {
       pendingSend.addFirst(unacked);
     }
@@ -411,11 +412,11 @@
 
   /** Method responsible for a single Streaming RPC. */
   private static ListenableFuture<Status> publishEventStream(
-      final ConcurrentLinkedDeque<OrderedBuildEvent> pendingAck,
-      final BlockingDeque<OrderedBuildEvent> pendingSend,
+      final ConcurrentLinkedDeque<PublishBuildToolEventStreamRequest> pendingAck,
+      final BlockingDeque<PublishBuildToolEventStreamRequest> pendingSend,
       final BuildEventServiceClient besClient)
       throws Exception {
-    OrderedBuildEvent event;
+    PublishBuildToolEventStreamRequest event;
     ListenableFuture<Status> streamDone = besClient.openStream(ackCallback(pendingAck, besClient));
     try {
       do {
@@ -432,12 +433,13 @@
     return streamDone;
   }
 
-  private static boolean isLastEvent(OrderedBuildEvent event) {
+  private static boolean isLastEvent(PublishBuildToolEventStreamRequest event) {
     return event != null && event.getEvent().getEventCase() == COMPONENT_STREAM_FINISHED;
   }
 
   private static Function<PublishBuildToolEventStreamResponse, Void> ackCallback(
-      final Deque<OrderedBuildEvent> pendingAck, final BuildEventServiceClient besClient) {
+      final Deque<PublishBuildToolEventStreamRequest> pendingAck,
+      final BuildEventServiceClient besClient) {
     return ack -> {
       long pendingSeq = pendingAck.isEmpty() ? -1 : pendingAck.peekFirst().getSequenceNumber();
       long ackSeq = ack.getSequenceNumber();
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
index 6feb53c..5721118 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceClient.java
@@ -16,7 +16,7 @@
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.v1.OrderedBuildEvent;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
 import com.google.devtools.build.v1.PublishLifecycleEventRequest;
 import io.grpc.Status;
@@ -38,7 +38,7 @@
    * guarantee that all callback calls have been received.
    *
    * @param ackCallback Consumer called every time a ack message is received.
-   * @return Listenable future that blocks until the the onDone callback is called.
+   * @return Listenable future that blocks until the onDone callback is called.
    * @throws Exception
    */
   ListenableFuture<Status> openStream(
@@ -50,7 +50,7 @@
    * @param buildEvent Event that should be sent.
    * @throws Exception
    */
-  void sendOverStream(OrderedBuildEvent buildEvent) throws Exception;
+  void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception;
 
   /**
    * Closes the currently opened opened stream. This method does not block. Callers should block on
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
index 86e33e0..5838379 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/client/BuildEventServiceGrpcClient.java
@@ -27,10 +27,10 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import com.google.devtools.build.v1.OrderedBuildEvent;
 import com.google.devtools.build.v1.PublishBuildEventGrpc;
 import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub;
 import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub;
+import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
 import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
 import com.google.devtools.build.v1.PublishLifecycleEventRequest;
 import io.grpc.CallCredentials;
@@ -71,7 +71,7 @@
   private final PublishBuildEventStub besAsync;
   private final PublishBuildEventBlockingStub besBlocking;
   private final ManagedChannel channel;
-  private final AtomicReference<StreamObserver<OrderedBuildEvent>> streamReference;
+  private final AtomicReference<StreamObserver<PublishBuildToolEventStreamRequest>> streamReference;
 
   public BuildEventServiceGrpcClient(String serverSpec, boolean tlsEnabled,
       @Nullable String tlsCertificateFile, @Nullable String tlsAuthorityOverride,
@@ -116,7 +116,7 @@
     return streamFinished;
   }
 
-  private StreamObserver<OrderedBuildEvent> createStream(
+  private StreamObserver<PublishBuildToolEventStreamRequest> createStream(
       final Function<PublishBuildToolEventStreamResponse, Void> ack,
       final SettableFuture<Status> streamFinished) {
     return besAsync.publishBuildToolEventStream(
@@ -141,14 +141,14 @@
   }
 
   @Override
-  public void sendOverStream(OrderedBuildEvent buildEvent) throws Exception {
+  public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) throws Exception {
     checkNotNull(streamReference.get(), "Attempting to send over a closed or unopened stream")
         .onNext(buildEvent);
   }
 
   @Override
   public void closeStream() {
-    StreamObserver<OrderedBuildEvent> stream;
+    StreamObserver<PublishBuildToolEventStreamRequest> stream;
     if ((stream = streamReference.getAndSet(null)) != null) {
       stream.onCompleted();
     }
@@ -156,7 +156,7 @@
 
   @Override
   public void abortStream(Status status) {
-    StreamObserver<OrderedBuildEvent> stream;
+    StreamObserver<PublishBuildToolEventStreamRequest> stream;
     if ((stream = streamReference.getAndSet(null)) != null) {
       stream.onError(status.asException());
     }
diff --git a/third_party/googleapis/google/devtools/build/v1/publish_build_event.proto b/third_party/googleapis/google/devtools/build/v1/publish_build_event.proto
index 7d7b1cd..f6e05ee 100644
--- a/third_party/googleapis/google/devtools/build/v1/publish_build_event.proto
+++ b/third_party/googleapis/google/devtools/build/v1/publish_build_event.proto
@@ -60,7 +60,8 @@
 
   // Publish build tool events belonging to the same stream to a backend job
   // using bidirectional streaming.
-  rpc PublishBuildToolEventStream(stream OrderedBuildEvent) returns (stream PublishBuildToolEventStreamResponse) {
+  rpc PublishBuildToolEventStream(stream PublishBuildToolEventStreamRequest)
+      returns (stream PublishBuildToolEventStreamResponse) {
     option (google.api.http) = { post: "/v1/events:publish" body: "*" };
   }
 }