This change adds timestamps (call start and end time) to the remote grpc log.

PiperOrigin-RevId: 193937177
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 2a34533..d9ab87a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -133,7 +133,7 @@
       LoggingInterceptor logger = null;
       if (!remoteOptions.experimentalRemoteGrpcLog.isEmpty()) {
         rpcLogFile = new AsynchronousFileOutputStream(remoteOptions.experimentalRemoteGrpcLog);
-        logger = new LoggingInterceptor(rpcLogFile);
+        logger = new LoggingInterceptor(rpcLogFile, env.getRuntime().getClock());
       }
 
       RemoteRetrier retrier =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
index dc1caa6..0a1d806 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
@@ -12,10 +12,12 @@
     tags = ["bazel"],
     deps = [
         "//src/main/java/com/google/devtools/build/lib:io",
+        "//src/main/java/com/google/devtools/build/lib/clock",
         "//src/main/java/com/google/devtools/build/lib/remote/util",
         "//src/main/protobuf:remote_execution_log_java_proto",
         "//third_party:guava",
         "//third_party/grpc:grpc-jar",
+        "//third_party/protobuf:protobuf_java",
         "@googleapis//:google_bytestream_bytestream_java_grpc",
         "@googleapis//:google_bytestream_bytestream_java_proto",
         "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
index a805eae..bf1b393 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
@@ -15,6 +15,7 @@
 package com.google.devtools.build.lib.remote.logging;
 
 import com.google.bytestream.ByteStreamGrpc;
+import com.google.devtools.build.lib.clock.Clock;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
@@ -22,6 +23,7 @@
 import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
 import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
 import com.google.devtools.remoteexecution.v1test.RequestMetadata;
+import com.google.protobuf.Timestamp;
 import com.google.watcher.v1.WatcherGrpc;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
@@ -32,15 +34,18 @@
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import io.grpc.Status;
+import java.time.Instant;
 import javax.annotation.Nullable;
 
 /** Client interceptor for logging details of certain gRPC calls. */
 public class LoggingInterceptor implements ClientInterceptor {
-  AsynchronousFileOutputStream rpcLogFile;
+  private final AsynchronousFileOutputStream rpcLogFile;
+  private final Clock clock;
 
   /** Constructs a LoggingInterceptor which logs RPC calls to the given file. */
-  public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile) {
+  public LoggingInterceptor(AsynchronousFileOutputStream rpcLogFile, Clock clock) {
     this.rpcLogFile = rpcLogFile;
+    this.clock = clock;
   }
 
   /**
@@ -80,6 +85,15 @@
     }
   }
 
+  /** Get current time as a Timestamp. */
+  private Timestamp getCurrentTimestamp() {
+    Instant time = Instant.ofEpochMilli(clock.currentTimeMillis());
+    return Timestamp.newBuilder()
+        .setSeconds(time.getEpochSecond())
+        .setNanos(time.getNano())
+        .build();
+  }
+
   /**
    * Wraps client call to log call details by building a {@link LogEntry} and writing it to the RPC
    * log file.
@@ -100,6 +114,7 @@
 
     @Override
     public void start(Listener<RespT> responseListener, Metadata headers) {
+      entryBuilder.setStartTime(getCurrentTimestamp());
       RequestMetadata metadata = TracingMetadataUtils.requestMetadataFromHeaders(headers);
       if (metadata != null) {
         entryBuilder.setMetadata(metadata);
@@ -115,6 +130,7 @@
 
             @Override
             public void onClose(Status status, Metadata trailers) {
+              entryBuilder.setEndTime(getCurrentTimestamp());
               entryBuilder.setStatus(makeStatusProto(status));
               entryBuilder.setDetails(handler.getDetails());
               rpcLogFile.write(entryBuilder.build());
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 684dcb2..85aaa24 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -148,6 +148,7 @@
     name = "remote_execution_log_proto",
     srcs = ["remote_execution_log.proto"],
     deps = [
+        "@com_google_protobuf//:well_known_types_timestamp_proto",
         "@googleapis//:google_bytestream_bytestream_proto",
         "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_proto",
         "@googleapis//:google_longrunning_operations_proto",
diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto
index 1e4fa9f..32d7694 100644
--- a/src/main/protobuf/remote_execution_log.proto
+++ b/src/main/protobuf/remote_execution_log.proto
@@ -16,6 +16,7 @@
 
 package remote_logging;
 
+import "google/protobuf/timestamp.proto";
 import "google/bytestream/bytestream.proto";
 import "google/devtools/remoteexecution/v1test/remote_execution.proto";
 import "google/longrunning/operations.proto";
@@ -39,6 +40,12 @@
 
   // Method specific details for this call.
   RpcCallDetails details = 4;
+
+  // Time the call started.
+  google.protobuf.Timestamp start_time = 5;
+
+  // Time the call closed.
+  google.protobuf.Timestamp end_time = 6;
 }
 
 // Details for a call to
diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
index 4c1c398..c459978 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
@@ -39,6 +39,7 @@
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails;
 import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.testutil.ManualClock;
 import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
 import com.google.devtools.remoteexecution.v1test.Action;
 import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
@@ -60,6 +61,7 @@
 import com.google.longrunning.Operation;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
 import com.google.watcher.v1.Change;
 import com.google.watcher.v1.ChangeBatch;
 import com.google.watcher.v1.Request;
@@ -93,12 +95,13 @@
   private Channel loggedChannel;
   private LoggingInterceptor interceptor;
   private AsynchronousFileOutputStream logStream;
+  private ManualClock clock;
 
   // This returns a logging interceptor where all calls are handled by the given handler.
   @SuppressWarnings({"rawtypes", "unchecked"})
   private LoggingInterceptor getInterceptorWithAlwaysThisHandler(
       LoggingHandler handler, AsynchronousFileOutputStream outputFile) {
-    return new LoggingInterceptor(outputFile) {
+    return new LoggingInterceptor(outputFile, clock) {
       @Override
       public <ReqT, RespT> LoggingHandler<ReqT, RespT> selectHandler(
           MethodDescriptor<ReqT, RespT> method) {
@@ -117,7 +120,8 @@
             .build()
             .start();
     logStream = Mockito.mock(AsynchronousFileOutputStream.class);
-    interceptor = new LoggingInterceptor(logStream);
+    clock = new ManualClock();
+    interceptor = new LoggingInterceptor(logStream, clock);
     loggedChannel =
         ClientInterceptors.intercept(
             InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
@@ -139,6 +143,7 @@
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            clock.advanceMillis(1234);
             responseObserver.onNext(response);
             responseObserver.onCompleted();
           }
@@ -161,8 +166,11 @@
             .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
             .setDetails(details)
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(12).setNanos(300000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(13).setNanos(534000000))
             .build();
 
+    clock.advanceMillis(12300);
     stub.read(request).next();
     verify(handler).handleReq(request);
     verify(handler).handleResp(response);
@@ -181,7 +189,9 @@
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            clock.advanceMillis(50);
             responseObserver.onNext(response1);
+            clock.advanceMillis(1500);
             responseObserver.onNext(response2);
             responseObserver.onCompleted();
           }
@@ -209,6 +219,8 @@
             .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
             .setDetails(details)
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.getDefaultInstance())
+            .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(550000000))
             .build();
 
     verify(handler).handleReq(request);
@@ -264,12 +276,14 @@
             InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
     ByteStreamStub stub = ByteStreamGrpc.newStub(channel);
 
+    clock.advanceMillis(1000);
     @SuppressWarnings("unchecked")
     StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
     // Write both responses.
     StreamObserver<WriteRequest> requester = stub.write(responseObserver);
     requester.onNext(request1);
     requester.onNext(request2);
+    clock.advanceMillis(1000);
     requester.onCompleted();
 
     ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class);
@@ -279,6 +293,8 @@
             .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
             .setDetails(details)
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(1))
+            .setEndTime(Timestamp.newBuilder().setSeconds(2))
             .build();
 
     verify(handler, times(2)).handleReq(resultCaptor.capture());
@@ -298,6 +314,7 @@
         new ByteStreamImplBase() {
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+            clock.advanceMillis(100);
             responseObserver.onError(error.asRuntimeException());
           }
         });
@@ -314,6 +331,7 @@
             InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
     ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);
 
+    clock.advanceMillis(1500);
     assertThrows(StatusRuntimeException.class, () -> stub.read(request).next());
 
     LogEntry expectedEntry =
@@ -324,6 +342,8 @@
                 com.google.rpc.Status.newBuilder()
                     .setCode(error.getCode().value())
                     .setMessage(error.getDescription()))
+            .setStartTime(Timestamp.newBuilder().setSeconds(1).setNanos(500000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(600000000))
             .build();
 
     verify(handler).handleReq(request);
@@ -345,11 +365,13 @@
           @Override
           public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
             responseObserver.onNext(response);
+            clock.advanceMillis(100);
             responseObserver.onCompleted();
           }
         });
 
     ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
+    clock.advanceMillis(15000);
     stub.execute(request);
     LogEntry expectedEntry =
         LogEntry.newBuilder()
@@ -359,6 +381,8 @@
                     .setExecute(
                         ExecuteDetails.newBuilder().setRequest(request).setResponse(response)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(15))
+            .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -375,11 +399,12 @@
         new ExecutionImplBase() {
           @Override
           public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+            clock.advanceMillis(1100);
             responseObserver.onError(error.asRuntimeException());
           }
         });
     ExecutionBlockingStub stub = ExecutionGrpc.newBlockingStub(loggedChannel);
-
+    clock.advanceMillis(20000000000001L);
     assertThrows(StatusRuntimeException.class, () -> stub.execute(request));
     LogEntry expectedEntry =
         LogEntry.newBuilder()
@@ -391,6 +416,8 @@
                 com.google.rpc.Status.newBuilder()
                     .setCode(error.getCode().value())
                     .setMessage(error.getDescription()))
+            .setStartTime(Timestamp.newBuilder().setSeconds(20000000000L).setNanos(1000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(20000000001L).setNanos(101000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -411,6 +438,7 @@
           public void findMissingBlobs(
               FindMissingBlobsRequest request,
               StreamObserver<FindMissingBlobsResponse> responseObserver) {
+            clock.advanceMillis(200);
             responseObserver.onNext(response);
             responseObserver.onCompleted();
           }
@@ -419,6 +447,7 @@
     ContentAddressableStorageBlockingStub stub =
         ContentAddressableStorageGrpc.newBlockingStub(loggedChannel);
 
+    clock.advanceMillis(14900);
     stub.findMissingBlobs(request);
     LogEntry expectedEntry =
         LogEntry.newBuilder()
@@ -431,6 +460,8 @@
                             .setRequest(request)
                             .setResponse(response)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(14).setNanos(900000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -454,12 +485,14 @@
           @Override
           public void getActionResult(
               GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+            clock.advanceMillis(22222);
             responseObserver.onNext(response);
             responseObserver.onCompleted();
           }
         });
     ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);
 
+    clock.advanceMillis(11111);
     stub.getActionResult(request);
     LogEntry expectedEntry =
         LogEntry.newBuilder()
@@ -471,6 +504,8 @@
                             .setRequest(request)
                             .setResponse(response)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -493,11 +528,14 @@
           @Override
           public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
             responseObserver.onNext(response1);
+            clock.advanceMillis(2200);
             responseObserver.onNext(response2);
+            clock.advanceMillis(1100);
             responseObserver.onCompleted();
           }
         });
 
+    clock.advanceMillis(50000);
     Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
 
     // Read both responses.
@@ -516,6 +554,8 @@
                             .addResponses(response1)
                             .addResponses(response2)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(50))
+            .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -533,10 +573,14 @@
         new WatcherImplBase() {
           @Override
           public void watch(Request request, StreamObserver<ChangeBatch> responseObserver) {
+            clock.advanceMillis(100);
             responseObserver.onNext(response);
+            clock.advanceMillis(100);
             responseObserver.onError(error.asRuntimeException());
           }
         });
+
+    clock.advanceMillis(2000);
     Iterator<ChangeBatch> replies = WatcherGrpc.newBlockingStub(loggedChannel).watch(request);
     assertThrows(
         StatusRuntimeException.class,
@@ -556,6 +600,8 @@
                 com.google.rpc.Status.newBuilder()
                     .setCode(error.getCode().value())
                     .setMessage(error.getDescription()))
+            .setStartTime(Timestamp.newBuilder().setSeconds(2))
+            .setEndTime(Timestamp.newBuilder().setSeconds(2).setNanos(200000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -574,10 +620,12 @@
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
             responseObserver.onNext(response1);
             responseObserver.onNext(response2);
+            clock.advanceMillis(2000);
             responseObserver.onCompleted();
           }
         });
 
+    clock.advanceMillis(500000);
     Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
 
     // Read both responses.
@@ -596,6 +644,8 @@
                             .setNumReads(2)
                             .setBytesRead(6)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(500))
+            .setEndTime(Timestamp.newBuilder().setSeconds(502))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -612,6 +662,7 @@
           @Override
           public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
             responseObserver.onNext(response1);
+            clock.advanceMillis(100);
             responseObserver.onError(error.asRuntimeException());
           }
         });
@@ -638,6 +689,8 @@
                 com.google.rpc.Status.newBuilder()
                     .setCode(error.getCode().value())
                     .setMessage(error.getDescription()))
+            .setStartTime(Timestamp.getDefaultInstance())
+            .setEndTime(Timestamp.newBuilder().setNanos(100000000))
             .build();
     verify(logStream).write(expectedEntry);
   }
@@ -679,11 +732,15 @@
     @SuppressWarnings("unchecked")
     StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
 
+    clock.advanceMillis(10000);
     // Request three writes, the first identical with the third.
     StreamObserver<WriteRequest> requester = stub.write(responseObserver);
     requester.onNext(request1);
+    clock.advanceMillis(100);
     requester.onNext(request2);
+    clock.advanceMillis(200);
     requester.onNext(request1);
+    clock.advanceMillis(100);
     requester.onCompleted();
 
     LogEntry expectedEntry =
@@ -699,6 +756,8 @@
                             .setBytesSent(9)
                             .setNumWrites(3)))
             .setStatus(com.google.rpc.Status.getDefaultInstance())
+            .setStartTime(Timestamp.newBuilder().setSeconds(10))
+            .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
             .build();
 
     verify(logStream).write(expectedEntry);
@@ -724,10 +783,12 @@
     ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
     @SuppressWarnings("unchecked")
     StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+    clock.advanceMillis(10000000000L);
 
     // Write both responses.
     StreamObserver<WriteRequest> requester = stub.write(responseObserver);
     requester.onNext(request);
+    clock.advanceMillis(10000000000L);
     requester.onError(error.asRuntimeException());
 
     Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
@@ -746,6 +807,8 @@
                             .addResourceNames("test")
                             .setNumWrites(1)
                             .setBytesSent(3)))
+            .setStartTime(Timestamp.newBuilder().setSeconds(10000000))
+            .setEndTime(Timestamp.newBuilder().setSeconds(20000000))
             .build();
 
     verify(logStream).write(expectedEntry);