// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.devtools.build.lib.remote.logging;

import static com.google.common.collect.Iterators.advance;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CapabilitiesGrpc;
import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesBlockingStub;
import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesImplBase;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecutionCapabilities;
import build.bazel.remote.execution.v2.ExecutionGrpc;
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
import build.bazel.remote.execution.v2.GetCapabilitiesRequest;
import build.bazel.remote.execution.v2.OutputFile;
import build.bazel.remote.execution.v2.ServerCapabilities;
import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ExecuteDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.FindMissingBlobsDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.QueryWriteStatusDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WaitExecutionDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.testutil.ManualClock;
import com.google.devtools.build.lib.util.io.AsynchronousMessageOutputStream;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.util.Iterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/** Tests for {@link com.google.devtools.build.lib.remote.logging.LoggingInterceptor} */
@RunWith(JUnit4.class)
public class LoggingInterceptorTest {
  private final String fakeServerName = "fake server for " + getClass();
  private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
  private Server fakeServer;
  private Channel loggedChannel;
  private LoggingInterceptor interceptor;
  private ManualClock clock;

  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
  @Mock private AsynchronousMessageOutputStream<LogEntry> logStream;

  // This returns a logging interceptor where all calls are handled by the given handler.
  @SuppressWarnings({"rawtypes", "unchecked"})
  private LoggingInterceptor getInterceptorWithAlwaysThisHandler(
      LoggingHandler handler, AsynchronousMessageOutputStream<LogEntry> outputFile) {
    return new LoggingInterceptor(outputFile, clock) {
      @Override
      public <ReqT, RespT> LoggingHandler<ReqT, RespT> selectHandler(
          MethodDescriptor<ReqT, RespT> method) {
        return handler;
      }
    };
  }

  @Before
  public final void setUp() throws Exception {
    // Use a mutable service registry for later registering the service impl for each test case.
    fakeServer =
        InProcessServerBuilder.forName(fakeServerName)
            .fallbackHandlerRegistry(serviceRegistry)
            .directExecutor()
            .build()
            .start();
    clock = new ManualClock();
    interceptor = new LoggingInterceptor(logStream, clock);
    loggedChannel =
        ClientInterceptors.intercept(
            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
  }

  @After
  public void tearDown() throws Exception {
    fakeServer.shutdownNow();
    fakeServer.awaitTermination();
  }

  @Test
  public void testCallOk() {
    ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
    ReadResponse response =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();

    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
            clock.advanceMillis(1234);
            responseObserver.onNext(response);
            responseObserver.onCompleted();
          }
        });

    @SuppressWarnings("unchecked")
    LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
    RpcCallDetails details = RpcCallDetails.getDefaultInstance();
    Mockito.when(handler.getDetails()).thenReturn(details);

    LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, logStream);
    Channel channel =
        ClientInterceptors.intercept(
            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
    ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
            .setDetails(details)
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(12).setNanos(300000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(13).setNanos(534000000))
            .build();

    clock.advanceMillis(12300);
    stub.read(request).next();
    verify(handler).handleReq(request);
    verify(handler).handleResp(response);
    verify(handler).getDetails();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testCallOkMultipleResponses() {
    ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
    ReadResponse response1 =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
    ReadResponse response2 =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build();
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
            clock.advanceMillis(50);
            responseObserver.onNext(response1);
            clock.advanceMillis(1500);
            responseObserver.onNext(response2);
            responseObserver.onCompleted();
          }
        });

    @SuppressWarnings("unchecked")
    LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
    RpcCallDetails details = RpcCallDetails.getDefaultInstance();
    Mockito.when(handler.getDetails()).thenReturn(details);

    LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, logStream);
    Channel channel =
        ClientInterceptors.intercept(
            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
    ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);

    // Read both responses.
    advance(stub.read(request), 2);

    ArgumentCaptor<ReadResponse> resultCaptor = ArgumentCaptor.forClass(ReadResponse.class);

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
            .setDetails(details)
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.getDefaultInstance())
            .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(550000000))
            .build();

    verify(handler).handleReq(request);
    verify(handler, times(2)).handleResp(resultCaptor.capture());
    assertThat(resultCaptor.getAllValues().get(0)).isEqualTo(response1);
    assertThat(resultCaptor.getAllValues().get(1)).isEqualTo(response2);
    verify(handler).getDetails();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testCallOkMultipleRequests() {
    WriteRequest request1 =
        WriteRequest.newBuilder()
            .setResourceName("test")
            .setData(ByteString.copyFromUtf8("abc"))
            .build();
    WriteRequest request2 =
        WriteRequest.newBuilder()
            .setResourceName("test")
            .setData(ByteString.copyFromUtf8("def"))
            .build();
    WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
            return new StreamObserver<WriteRequest>() {
              @Override
              public void onNext(WriteRequest writeRequest) {}

              @Override
              public void onError(Throwable throwable) {}

              @Override
              public void onCompleted() {
                streamObserver.onNext(response);
                streamObserver.onCompleted();
              }
            };
          }
        });

    @SuppressWarnings("unchecked")
    LoggingHandler<WriteRequest, WriteResponse> handler = Mockito.mock(LoggingHandler.class);
    RpcCallDetails details = RpcCallDetails.getDefaultInstance();
    Mockito.when(handler.getDetails()).thenReturn(details);

    LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, logStream);
    Channel channel =
        ClientInterceptors.intercept(
            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
    ByteStreamStub stub = ByteStreamGrpc.newStub(channel);

    clock.advanceMillis(1000);
    @SuppressWarnings("unchecked")
    StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
    // Write both responses.
    StreamObserver<WriteRequest> requester = stub.write(responseObserver);
    requester.onNext(request1);
    requester.onNext(request2);
    clock.advanceMillis(1000);
    requester.onCompleted();

    ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class);

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
            .setDetails(details)
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(1))
            .setEndTime(Timestamp.newBuilder().setSeconds(2))
            .build();

    verify(handler, times(2)).handleReq(resultCaptor.capture());
    assertThat(resultCaptor.getAllValues().get(0)).isEqualTo(request1);
    assertThat(resultCaptor.getAllValues().get(1)).isEqualTo(request2);
    verify(handler).handleResp(response);
    verify(handler).getDetails();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testCallWithError() {
    ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
    Status error = Status.NOT_FOUND.withDescription("not found");

    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
            clock.advanceMillis(100);
            responseObserver.onError(error.asRuntimeException());
          }
        });

    @SuppressWarnings("unchecked")
    LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
    RpcCallDetails details = RpcCallDetails.getDefaultInstance();
    Mockito.when(handler.getDetails()).thenReturn(details);

    LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, logStream);
    Channel channel =
        ClientInterceptors.intercept(
            InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
    ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);

    clock.advanceMillis(1500);
    assertThrows(StatusRuntimeException.class, () -> stub.read(request).next());

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
            .setDetails(details)
            .setStatus(
                com.google.rpc.Status.newBuilder()
                    .setCode(error.getCode().value())
                    .setMessage(error.getDescription()))
            .setStartTime(Timestamp.newBuilder().setSeconds(1).setNanos(500000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(600000000))
            .build();

    verify(handler).handleReq(request);
    verify(handler, never()).handleResp(any());
    verify(handler).getDetails();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testExecuteCallOk() {
    ExecuteRequest request =
        ExecuteRequest.newBuilder()
            .setInstanceName("test-instance")
            .setActionDigest(DigestUtil.buildDigest("test", 8))
            .build();
    Operation response1 = Operation.newBuilder().setName("test-name").build();
    Operation response2 =
        Operation.newBuilder()
            .setName("test-name")
            .setDone(true)
            .setResponse(Any.pack(request))
            .build();

    serviceRegistry.addService(
        new ExecutionImplBase() {
          @Override
          public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
            responseObserver.onNext(response1);
            clock.advanceMillis(2200);
            responseObserver.onNext(response2);
            clock.advanceMillis(1100);
            responseObserver.onCompleted();
          }
        });

    clock.advanceMillis(50000);
    Iterator<Operation> replies =
        ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);

    // Read both responses.
    while (replies.hasNext()) {
      replies.next();
    }

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setExecute(
                        ExecuteDetails.newBuilder()
                            .setRequest(request)
                            .addResponses(response1)
                            .addResponses(response2)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(50))
            .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testExecuteCallFail() {
    ExecuteRequest request =
        ExecuteRequest.newBuilder()
            .setInstanceName("test-instance")
            .setActionDigest(DigestUtil.buildDigest("test", 8))
            .build();
    Status error = Status.NOT_FOUND.withDescription("not found");
    serviceRegistry.addService(
        new ExecutionImplBase() {
          @Override
          public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
            clock.advanceMillis(1100);
            responseObserver.onError(error.asRuntimeException());
          }
        });
    clock.advanceMillis(20000000000001L);
    Iterator<Operation> replies = ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);
    assertThrows(StatusRuntimeException.class, () -> replies.hasNext());
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setExecute(ExecuteDetails.newBuilder().setRequest(request)))
            .setStatus(
                com.google.rpc.Status.newBuilder()
                    .setCode(error.getCode().value())
                    .setMessage(error.getDescription()))
            .setStartTime(Timestamp.newBuilder().setSeconds(20000000000L).setNanos(1000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(20000000001L).setNanos(101000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testFindMissingBlobsCallOk() {
    Digest testDigest = DigestUtil.buildDigest("test", 8);
    FindMissingBlobsRequest request =
        FindMissingBlobsRequest.newBuilder()
            .addBlobDigests(testDigest)
            .setInstanceName("test-instance")
            .build();
    FindMissingBlobsResponse response =
        FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(testDigest).build();
    serviceRegistry.addService(
        new ContentAddressableStorageImplBase() {
          @Override
          public void findMissingBlobs(
              FindMissingBlobsRequest request,
              StreamObserver<FindMissingBlobsResponse> responseObserver) {
            clock.advanceMillis(200);
            responseObserver.onNext(response);
            responseObserver.onCompleted();
          }
        });

    ContentAddressableStorageBlockingStub stub =
        ContentAddressableStorageGrpc.newBlockingStub(loggedChannel);

    clock.advanceMillis(14900);
    stub.findMissingBlobs(request);
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(
                ContentAddressableStorageGrpc.getFindMissingBlobsMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setFindMissingBlobs(
                        FindMissingBlobsDetails.newBuilder()
                            .setRequest(request)
                            .setResponse(response)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(14).setNanos(900000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testGetActionResultCallOk() {
    Digest testDigest = DigestUtil.buildDigest("test", 8);
    GetActionResultRequest request =
        GetActionResultRequest.newBuilder()
            .setActionDigest(testDigest)
            .setInstanceName("test-instance")
            .build();
    ActionResult response =
        ActionResult.newBuilder()
            .addOutputFiles(OutputFile.newBuilder().setDigest(testDigest).setPath("root/test"))
            .setExitCode(1)
            .build();

    serviceRegistry.addService(
        new ActionCacheImplBase() {
          @Override
          public void getActionResult(
              GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
            clock.advanceMillis(22222);
            responseObserver.onNext(response);
            responseObserver.onCompleted();
          }
        });
    ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);

    clock.advanceMillis(11111);
    stub.getActionResult(request);
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ActionCacheGrpc.getGetActionResultMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setGetActionResult(
                        GetActionResultDetails.newBuilder()
                            .setRequest(request)
                            .setResponse(response)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testUpdateActionResultCallOk() {
    Digest testDigest = DigestUtil.buildDigest("test", 8);
    ActionResult actionResult =
        ActionResult.newBuilder()
            .addOutputFiles(OutputFile.newBuilder().setDigest(testDigest).setPath("root/test"))
            .setExitCode(1)
            .build();

    UpdateActionResultRequest request =
        UpdateActionResultRequest.newBuilder()
            .setActionDigest(testDigest)
            .setInstanceName("test-instance")
            .setActionResult(actionResult)
            .build();

    serviceRegistry.addService(
        new ActionCacheImplBase() {
          @Override
          public void updateActionResult(
              UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
            clock.advanceMillis(22222);
            responseObserver.onNext(actionResult);
            responseObserver.onCompleted();
          }
        });
    ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);

    clock.advanceMillis(11111);
    stub.updateActionResult(request);
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ActionCacheGrpc.getUpdateActionResultMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setUpdateActionResult(
                        UpdateActionResultDetails.newBuilder()
                            .setRequest(request)
                            .setResponse(actionResult)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testGetCapabilitiesCallOk() {
    GetCapabilitiesRequest request =
        GetCapabilitiesRequest.newBuilder()
            .setInstanceName("test-instance")
            .build();
    ServerCapabilities response =
        ServerCapabilities.newBuilder()
            .setExecutionCapabilities(
                ExecutionCapabilities.newBuilder().setExecEnabled(true).build())
            .build();
    serviceRegistry.addService(
        new CapabilitiesImplBase() {
          @Override
          public void getCapabilities(
              GetCapabilitiesRequest request, StreamObserver<ServerCapabilities> responseObserver) {
            clock.advanceMillis(22222);
            responseObserver.onNext(response);
            responseObserver.onCompleted();
          }
        });
    CapabilitiesBlockingStub stub = CapabilitiesGrpc.newBlockingStub(loggedChannel);

    clock.advanceMillis(11111);
    stub.getCapabilities(request);
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(CapabilitiesGrpc.getGetCapabilitiesMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setGetCapabilities(
                        GetCapabilitiesDetails.newBuilder()
                            .setRequest(request)
                            .setResponse(response)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testWaitExecutionCallOk() {
    WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
    Operation response1 = Operation.newBuilder().setName("test-name").build();
    Operation response2 =
        Operation.newBuilder()
            .setName("test-name")
            .setDone(true)
            .setResponse(Any.pack(request))
            .build();

    serviceRegistry.addService(
        new ExecutionImplBase() {
          @Override
          public void waitExecution(
              WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
            responseObserver.onNext(response1);
            clock.advanceMillis(2200);
            responseObserver.onNext(response2);
            clock.advanceMillis(1100);
            responseObserver.onCompleted();
          }
        });

    clock.advanceMillis(50000);
    Iterator<Operation> replies =
        ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);

    // Read both responses.
    while (replies.hasNext()) {
      replies.next();
    }

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setWaitExecution(
                        WaitExecutionDetails.newBuilder()
                            .setRequest(request)
                            .addResponses(response1)
                            .addResponses(response2)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(50))
            .setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testWaitExecutionCallFail() {
    WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
    Operation response = Operation.newBuilder().setName("test-name").build();
    Status error = Status.DEADLINE_EXCEEDED.withDescription("timed out");

    serviceRegistry.addService(
        new ExecutionImplBase() {
          @Override
          public void waitExecution(
              WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
            clock.advanceMillis(100);
            responseObserver.onNext(response);
            clock.advanceMillis(100);
            responseObserver.onError(error.asRuntimeException());
          }
        });

    clock.advanceMillis(2000);
    Iterator<Operation> replies =
        ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);
    assertThat(replies.hasNext()).isTrue();
    assertThat(replies.next()).isEqualTo(response);
    assertThrows(StatusRuntimeException.class, () -> replies.hasNext());

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setWaitExecution(
                        WaitExecutionDetails.newBuilder()
                            .setRequest(request)
                            .addResponses(response)))
            .setStatus(
                com.google.rpc.Status.newBuilder()
                    .setCode(error.getCode().value())
                    .setMessage(error.getDescription()))
            .setStartTime(Timestamp.newBuilder().setSeconds(2))
            .setEndTime(Timestamp.newBuilder().setSeconds(2).setNanos(200000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testReadCallOk() {
    ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
    ReadResponse response1 =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
    ReadResponse response2 =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build();

    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
            responseObserver.onNext(response1);
            responseObserver.onNext(response2);
            clock.advanceMillis(2000);
            responseObserver.onCompleted();
          }
        });

    clock.advanceMillis(500000);
    Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);

    // Read both responses.
    while (replies.hasNext()) {
      replies.next();
    }

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setRead(
                        ReadDetails.newBuilder()
                            .setRequest(request)
                            .setNumReads(2)
                            .setBytesRead(6)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(500))
            .setEndTime(Timestamp.newBuilder().setSeconds(502))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testReadCallFail() {
    ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
    ReadResponse response1 =
        ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
    Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");

    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
            responseObserver.onNext(response1);
            clock.advanceMillis(100);
            responseObserver.onError(error.asRuntimeException());
          }
        });
    Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
    assertThat(replies.hasNext()).isTrue();
    assertThat(replies.next()).isEqualTo(response1);
    assertThrows(StatusRuntimeException.class, () -> replies.hasNext());

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setRead(
                        ReadDetails.newBuilder()
                            .setRequest(request)
                            .setNumReads(1)
                            .setBytesRead(3)))
            .setStatus(
                com.google.rpc.Status.newBuilder()
                    .setCode(error.getCode().value())
                    .setMessage(error.getDescription()))
            .setStartTime(Timestamp.getDefaultInstance())
            .setEndTime(Timestamp.newBuilder().setNanos(100000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testWriteCallOk() {
    WriteRequest request1 =
        WriteRequest.newBuilder()
            .setResourceName("test1")
            .setData(ByteString.copyFromUtf8("abc"))
            .build();
    WriteRequest request2 =
        WriteRequest.newBuilder()
            .setResourceName("test2")
            .setData(ByteString.copyFromUtf8("def"))
            .build();
    WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
            return new StreamObserver<WriteRequest>() {
              @Override
              public void onNext(WriteRequest writeRequest) {}

              @Override
              public void onError(Throwable throwable) {}

              @Override
              public void onCompleted() {
                streamObserver.onNext(response);
                streamObserver.onCompleted();
              }
            };
          }
        });

    ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
    @SuppressWarnings("unchecked")
    StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);

    clock.advanceMillis(10000);
    // Request three writes, the first identical with the third.
    StreamObserver<WriteRequest> requester = stub.write(responseObserver);
    requester.onNext(request1);
    clock.advanceMillis(100);
    requester.onNext(request2);
    clock.advanceMillis(200);
    requester.onNext(request1);
    clock.advanceMillis(100);
    requester.onCompleted();

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setWrite(
                        WriteDetails.newBuilder()
                            .addResourceNames("test1")
                            .addResourceNames("test2")
                            .addOffsets(0)
                            .addOffsets(0)
                            .addOffsets(0)
                            // finish write is empty
                            .setResponse(response)
                            .setBytesSent(9)
                            .setNumWrites(3)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(10))
            .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
            .build();

    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testWriteCallOffsetAndFinishWriteCompounding() {
    WriteRequest request1 =
        WriteRequest.newBuilder()
            .setResourceName("test1")
            .setData(ByteString.copyFromUtf8("abc"))
            .setWriteOffset(10)
            .build();
    WriteRequest request2 =
        WriteRequest.newBuilder()
            .setData(ByteString.copyFromUtf8("def"))
            .setWriteOffset(request1.getWriteOffset() + request1.getData().size())
            .build();
    WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
            return new StreamObserver<WriteRequest>() {
              @Override
              public void onNext(WriteRequest writeRequest) {}

              @Override
              public void onError(Throwable throwable) {}

              @Override
              public void onCompleted() {
                streamObserver.onNext(response);
                streamObserver.onCompleted();
              }
            };
          }
        });
    ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
    @SuppressWarnings("unchecked")
    StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);

    clock.advanceMillis(10000);
    // Request three writes, the first identical with the third, but offset correctly and
    // finish_writing
    StreamObserver<WriteRequest> requester = stub.write(responseObserver);
    requester.onNext(request1);
    clock.advanceMillis(100);
    requester.onNext(request2);
    clock.advanceMillis(200);
    requester.onNext(
        request1.toBuilder()
            .setWriteOffset(request2.getWriteOffset() + request2.getData().size())
            .setFinishWrite(true)
            .build());
    clock.advanceMillis(100);
    requester.onCompleted();

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setWrite(
                        WriteDetails.newBuilder()
                            .addResourceNames("test1")
                            .addResourceNames("")
                            .addOffsets(request1.getWriteOffset())
                            .addFinishWrites(
                                10 + request1.getData().size() * 2 + request2.getData().size())
                            .setResponse(response)
                            .setBytesSent(9)
                            .setNumWrites(3)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(10))
            .setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testWriteCallFail() {
    WriteRequest request =
        WriteRequest.newBuilder()
            .setResourceName("test")
            .setData(ByteString.copyFromUtf8("abc"))
            .build();
    Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          @SuppressWarnings("unchecked")
          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
            return Mockito.mock(StreamObserver.class);
          }
        });
    ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
    @SuppressWarnings("unchecked")
    StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
    clock.advanceMillis(10000000000L);

    // Write both responses.
    StreamObserver<WriteRequest> requester = stub.write(responseObserver);
    requester.onNext(request);
    clock.advanceMillis(10000000000L);
    requester.onError(error.asRuntimeException());

    Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
            .setStatus(
                com.google.rpc.Status.newBuilder()
                    .setCode(expectedCancel.getCode().value())
                    .setMessage(expectedCancel.getCause().toString()))
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setWrite(
                        WriteDetails.newBuilder()
                            .addResourceNames("test")
                            .addOffsets(0)
                            .setNumWrites(1)
                            .setBytesSent(3)))
            .setStartTime(Timestamp.newBuilder().setSeconds(10000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(20000000))
            .build();
    verify(logStream).write(expectedEntry);
  }

  @Test
  public void testQueryWriteStatusCallOk() {
    QueryWriteStatusRequest request =
        QueryWriteStatusRequest.newBuilder().setResourceName("test").build();
    QueryWriteStatusResponse response =
        QueryWriteStatusResponse.newBuilder().setCommittedSize(10).build();
    serviceRegistry.addService(
        new ByteStreamImplBase() {
          @Override
          public void queryWriteStatus(
              QueryWriteStatusRequest request,
              StreamObserver<QueryWriteStatusResponse> responseObserver) {
            clock.advanceMillis(22222);
            responseObserver.onNext(response);
            responseObserver.onCompleted();
          }
        });
    ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(loggedChannel);

    clock.advanceMillis(11111);
    stub.queryWriteStatus(request);

    LogEntry expectedEntry =
        LogEntry.newBuilder()
            .setMethodName(ByteStreamGrpc.getQueryWriteStatusMethod().getFullMethodName())
            .setDetails(
                RpcCallDetails.newBuilder()
                    .setQueryWriteStatus(
                        QueryWriteStatusDetails.newBuilder()
                            .setRequest(request)
                            .setResponse(response)))
            .setStatus(com.google.rpc.Status.getDefaultInstance())
            .setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
            .setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
            .build();
    verify(logStream).write(expectedEntry);
  }
}
