Write/QueryWriteStatus logging refinement/addition

Improve the logging of WriteRequests to include offset and finish_write
information. Offsets are logged for the initial and non-sequential per
successive write request. Each finish_write true request is logged with
the effective size of the resource at the completion of the write
request, including the current offset and payload. Clarified comments
for WriteDetails, and corrected some comment inconsistencies.

Add logging for QueryWriteStatus calls which occur on progressive writes
to determine an offset to begin a write call on a retry.

Closes #12928.

PiperOrigin-RevId: 355545331
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
index 46a9b5f..b9fced4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
@@ -19,6 +19,7 @@
         "//src/main/java/com/google/devtools/build/lib/util/io",
         "//src/main/protobuf:remote_execution_log_java_proto",
         "//third_party:flogger",
+        "//third_party:guava",
         "//third_party:jsr305",
         "//third_party/grpc:grpc-jar",
         "//third_party/protobuf:protobuf_java",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
index c673894..8cffbae 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
@@ -74,6 +74,8 @@
       return new ReadHandler(); // <ReadRequest, ReadResponse>
     } else if (method == ByteStreamGrpc.getWriteMethod()) {
       return new WriteHandler(); // <WriteRequest, WriteResponse>
+    } else if (method == ByteStreamGrpc.getQueryWriteStatusMethod()) {
+      return new QueryWriteStatusHandler(); // <QueryWriteStatusRequest, QueryWriteStatusResponse>
     } else if (method == CapabilitiesGrpc.getGetCapabilitiesMethod()) {
       return new GetCapabilitiesHandler(); // <GetCapabilitiesRequest, ServerCapabilities>
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java
new file mode 100644
index 0000000..26ec94f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/QueryWriteStatusHandler.java
@@ -0,0 +1,41 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote.logging;
+
+import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
+import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
+
+/** LoggingHandler for {@link google.bytestream.QueryWriteStatus} gRPC call. */
+public class QueryWriteStatusHandler
+    implements LoggingHandler<QueryWriteStatusRequest, QueryWriteStatusResponse> {
+  private final QueryWriteStatusDetails.Builder builder = QueryWriteStatusDetails.newBuilder();
+
+  @Override
+  public void handleReq(QueryWriteStatusRequest message) {
+    builder.setRequest(message);
+  }
+
+  @Override
+  public void handleResp(QueryWriteStatusResponse message) {
+    builder.setResponse(message);
+  }
+
+  @Override
+  public RpcCallDetails getDetails() {
+    return RpcCallDetails.newBuilder().setQueryWriteStatus(builder).build();
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java
index 67e9e37..b506295 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java
@@ -16,24 +16,40 @@
 
 import com.google.bytestream.ByteStreamProto.WriteRequest;
 import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.collect.Iterables;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
+import java.util.ArrayList;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
 
 /** LoggingHandler for {@link google.bytestream.Write} gRPC call. */
 public class WriteHandler implements LoggingHandler<WriteRequest, WriteResponse> {
   private final WriteDetails.Builder builder = WriteDetails.newBuilder();
   private final Set<String> resources = new LinkedHashSet<>();
+  private final List<Long> offsets = new ArrayList<>();
+  private final List<Long> finishWrites = new ArrayList<>();
+  private long bytesSentInSequence = 0;
   private long numWrites = 0;
   private long bytesSent = 0;
 
   @Override
   public void handleReq(WriteRequest message) {
     resources.add(message.getResourceName());
+    long writeOffset = message.getWriteOffset();
+    if (numWrites == 0 || Iterables.getLast(offsets) + bytesSentInSequence != writeOffset) {
+      offsets.add(writeOffset);
+      bytesSentInSequence = 0;
+    }
+    int size = message.getData().size();
+    if (message.getFinishWrite()) {
+      finishWrites.add(writeOffset + size);
+    }
 
     numWrites++;
-    bytesSent += message.getData().size();
+    bytesSent += size;
+    bytesSentInSequence += size;
   }
 
   @Override
@@ -44,6 +60,8 @@
   @Override
   public RpcCallDetails getDetails() {
     builder.addAllResourceNames(resources);
+    builder.addAllOffsets(offsets);
+    builder.addAllFinishWrites(finishWrites);
     builder.setNumWrites(numWrites);
     builder.setBytesSent(bytesSent);
     return RpcCallDetails.newBuilder().setWrite(builder).build();
diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto
index 84a07d6..b3f265a 100644
--- a/src/main/protobuf/remote_execution_log.proto
+++ b/src/main/protobuf/remote_execution_log.proto
@@ -16,11 +16,11 @@
 
 package remote_logging;
 
-import "build/bazel/remote/execution/v2/remote_execution.proto";
-import "google/protobuf/timestamp.proto";
 import "google/bytestream/bytestream.proto";
 import "google/longrunning/operations.proto";
+import "google/protobuf/timestamp.proto";
 import "google/rpc/status.proto";
+import "build/bazel/remote/execution/v2/remote_execution.proto";
 
 option java_package = "com.google.devtools.build.lib.remote.logging";
 
@@ -83,7 +83,7 @@
 // Details for a call to
 // build.bazel.remote.execution.v2.ActionCache.UpdateActionResult.
 message UpdateActionResultDetails {
-  // The build.bazel.remote.execution.v2.GetActionResultRequest sent by
+  // The build.bazel.remote.execution.v2.UpdateActionResultRequest sent by
   // the call.
   build.bazel.remote.execution.v2.UpdateActionResultRequest request = 1;
 
@@ -128,10 +128,24 @@
 message WriteDetails {
   // The names of resources requested to be written to in this call in the order
   // they were first requested in. If the ByteStream protocol is followed
-  // according to specification, this should only contain have a single element,
-  // which is the resource name specified in the first message of the stream.
+  // according to specification, this should contain at most two elements:
+  // The resource name specified in the first message of the stream, and an
+  // empty string specified in each successive request if num_writes > 1.
   repeated string resource_names = 1;
 
+  // The offsets sent for the initial request and any non-sequential offsets
+  // specified over the course of the call. If the ByteStream protocol is
+  // followed according to specification, this should contain a single element
+  // which is the starting point for the write call.
+  repeated int64 offsets = 5;
+
+  // The effective final size for each request sent with finish_write true
+  // specified over the course of the call. If the ByteStream protocol is
+  // followed according to specification, this should contain a single element
+  // which is the total size of the written resource, including the initial
+  // offset.
+  repeated int64 finish_writes = 6;
+
   // The number of writes performed in this call.
   int64 num_writes = 2;
 
@@ -142,6 +156,15 @@
   google.bytestream.WriteResponse response = 4;
 }
 
+// Details for a call to google.bytestream.QueryWriteStatus.
+message QueryWriteStatusDetails {
+  // The google.bytestream.QueryWriteStatusRequest sent by the call.
+  google.bytestream.QueryWriteStatusRequest request = 1;
+
+  // The received google.bytestream.QueryWriteStatusResponse.
+  google.bytestream.QueryWriteStatusResponse response = 2;
+}
+
 // Contains details for specific types of calls.
 message RpcCallDetails {
   reserved 1 to 4, 11;
@@ -152,6 +175,7 @@
     FindMissingBlobsDetails find_missing_blobs = 10;
     ReadDetails read = 5;
     WriteDetails write = 6;
+    QueryWriteStatusDetails query_write_status = 14;
     GetCapabilitiesDetails get_capabilities = 12;
     UpdateActionResultDetails update_action_result = 13;
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
index 17ee021..3b1ca9a 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
@@ -49,6 +49,8 @@
 import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
 import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
 import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
+import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
+import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
 import com.google.bytestream.ByteStreamProto.ReadRequest;
 import com.google.bytestream.ByteStreamProto.ReadResponse;
 import com.google.bytestream.ByteStreamProto.WriteRequest;
@@ -58,6 +60,7 @@
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails;
@@ -854,6 +857,10 @@
                         WriteDetails.newBuilder()
                             .addResourceNames("test1")
                             .addResourceNames("test2")
+                            .addOffsets(0)
+                            .addOffsets(0)
+                            .addOffsets(0)
+                            // finish write is empty
                             .setResponse(response)
                             .setBytesSent(9)
                             .setNumWrites(3)))
@@ -866,6 +873,81 @@
   }
 
   @Test
+  public void testWriteCallOffsetAndFinishWriteCompounding() {
+    WriteRequest request1 =
+        WriteRequest.newBuilder()
+            .setResourceName("test1")
+            .setData(ByteString.copyFromUtf8("abc"))
+            .setWriteOffset(10)
+            .build();
+    WriteRequest request2 =
+        WriteRequest.newBuilder()
+            .setData(ByteString.copyFromUtf8("def"))
+            .setWriteOffset(request1.getWriteOffset() + request1.getData().size())
+            .build();
+    WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest writeRequest) {}
+
+              @Override
+              public void onError(Throwable throwable) {}
+
+              @Override
+              public void onCompleted() {
+                streamObserver.onNext(response);
+                streamObserver.onCompleted();
+              }
+            };
+          }
+        });
+    ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
+    @SuppressWarnings("unchecked")
+    StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+
+    clock.advanceMillis(10000);
+    // Request three writes, the first identical with the third, but offset correctly and
+    // finish_writing
+    StreamObserver<WriteRequest> requester = stub.write(responseObserver);
+    requester.onNext(request1);
+    clock.advanceMillis(100);
+    requester.onNext(request2);
+    clock.advanceMillis(200);
+    requester.onNext(
+        request1.toBuilder()
+            .setWriteOffset(request2.getWriteOffset() + request2.getData().size())
+            .setFinishWrite(true)
+            .build());
+    clock.advanceMillis(100);
+    requester.onCompleted();
+
+    LogEntry expectedEntry =
+        LogEntry.newBuilder()
+            .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
+            .setDetails(
+                RpcCallDetails.newBuilder()
+                    .setWrite(
+                        WriteDetails.newBuilder()
+                            .addResourceNames("test1")
+                            .addResourceNames("")
+                            .addOffsets(request1.getWriteOffset())
+                            .addFinishWrites(
+                                10 + request1.getData().size() * 2 + request2.getData().size())
+                            .setResponse(response)
+                            .setBytesSent(9)
+                            .setNumWrites(3)))
+            .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(10))
+            .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
+            .build();
+    verify(logStream).write(expectedEntry);
+  }
+
+  @Test
   public void testWriteCallFail() {
     WriteRequest request =
         WriteRequest.newBuilder()
@@ -881,7 +963,6 @@
             return Mockito.mock(StreamObserver.class);
           }
         });
-
     ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
     @SuppressWarnings("unchecked")
     StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
@@ -894,7 +975,6 @@
     requester.onError(error.asRuntimeException());
 
     Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
-
     LogEntry expectedEntry =
         LogEntry.newBuilder()
             .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
@@ -907,12 +987,50 @@
                     .setWrite(
                         WriteDetails.newBuilder()
                             .addResourceNames("test")
+                            .addOffsets(0)
                             .setNumWrites(1)
                             .setBytesSent(3)))
             .setStartTime(Timestamp.newBuilder().setSeconds(10000000))
             .setEndTime(Timestamp.newBuilder().setSeconds(20000000))
             .build();
+    verify(logStream).write(expectedEntry);
+  }
 
+  @Test
+  public void testQueryWriteStatusCallOk() {
+    QueryWriteStatusRequest request =
+        QueryWriteStatusRequest.newBuilder().setResourceName("test").build();
+    QueryWriteStatusResponse response =
+        QueryWriteStatusResponse.newBuilder().setCommittedSize(10).build();
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public void queryWriteStatus(
+              QueryWriteStatusRequest request,
+              StreamObserver<QueryWriteStatusResponse> responseObserver) {
+            clock.advanceMillis(22222);
+            responseObserver.onNext(response);
+            responseObserver.onCompleted();
+          }
+        });
+    ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel);
+
+    clock.advanceMillis(11111);
+    stub.queryWriteStatus(request);
+
+    LogEntry expectedEntry =
+        LogEntry.newBuilder()
+            .setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName())
+            .setDetails(
+                RpcCallDetails.newBuilder()
+                    .setQueryWriteStatus(
+                        QueryWriteStatusDetails.newBuilder()
+                            .setRequest(request)
+                            .setResponse(response)))
+            .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
+            .build();
     verify(logStream).write(expectedEntry);
   }
 }