blob: 17ee021c73a839d467e3345954787d0d3f0f8ca5 [file] [log] [blame]
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.logging;
import static com.google.common.collect.Iterators.advance;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheBlockingStub;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CapabilitiesGrpc;
import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesBlockingStub;
import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesImplBase;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
import build.bazel.remote.execution.v2.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.ExecuteRequest;
import build.bazel.remote.execution.v2.ExecutionCapabilities;
import build.bazel.remote.execution.v2.ExecutionGrpc;
import build.bazel.remote.execution.v2.ExecutionGrpc.ExecutionImplBase;
import build.bazel.remote.execution.v2.FindMissingBlobsRequest;
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
import build.bazel.remote.execution.v2.GetCapabilitiesRequest;
import build.bazel.remote.execution.v2.OutputFile;
import build.bazel.remote.execution.v2.ServerCapabilities;
import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import build.bazel.remote.execution.v2.WaitExecutionRequest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
import com.google.bytestream.ByteStreamProto.ReadResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ExecuteDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.FindMissingBlobsDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetCapabilitiesDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.UpdateActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WaitExecutionDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.testutil.ManualClock;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
import com.google.longrunning.Operation;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.util.MutableHandlerRegistry;
import java.util.Iterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
/** Tests for {@link com.google.devtools.build.lib.remote.logging.LoggingInterceptor} */
@RunWith(JUnit4.class)
public class LoggingInterceptorTest {
private final String fakeServerName = "fake server for " + getClass();
private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private Server fakeServer;
private Channel loggedChannel;
private LoggingInterceptor interceptor;
private AsynchronousFileOutputStream logStream;
private ManualClock clock;
// This returns a logging interceptor where all calls are handled by the given handler.
@SuppressWarnings({"rawtypes", "unchecked"})
private LoggingInterceptor getInterceptorWithAlwaysThisHandler(
LoggingHandler handler, AsynchronousFileOutputStream outputFile) {
return new LoggingInterceptor(outputFile, clock) {
@Override
public <ReqT, RespT> LoggingHandler<ReqT, RespT> selectHandler(
MethodDescriptor<ReqT, RespT> method) {
return handler;
}
};
}
@Before
public final void setUp() throws Exception {
// Use a mutable service registry for later registering the service impl for each test case.
fakeServer =
InProcessServerBuilder.forName(fakeServerName)
.fallbackHandlerRegistry(serviceRegistry)
.directExecutor()
.build()
.start();
logStream = Mockito.mock(AsynchronousFileOutputStream.class);
clock = new ManualClock();
interceptor = new LoggingInterceptor(logStream, clock);
loggedChannel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
}
@After
public void tearDown() throws Exception {
fakeServer.shutdownNow();
fakeServer.awaitTermination();
}
@Test
public void testCallOk() {
ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
ReadResponse response =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
clock.advanceMillis(1234);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
@SuppressWarnings("unchecked")
LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
RpcCallDetails details = RpcCallDetails.getDefaultInstance();
Mockito.when(handler.getDetails()).thenReturn(details);
AsynchronousFileOutputStream output = Mockito.mock(AsynchronousFileOutputStream.class);
LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, output);
Channel channel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(12).setNanos(300000000))
.setEndTime(Timestamp.newBuilder().setSeconds(13).setNanos(534000000))
.build();
clock.advanceMillis(12300);
stub.read(request).next();
verify(handler).handleReq(request);
verify(handler).handleResp(response);
verify(handler).getDetails();
verify(output).write(expectedEntry);
}
@Test
public void testCallOkMultipleResponses() {
ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
ReadResponse response1 =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
ReadResponse response2 =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
clock.advanceMillis(50);
responseObserver.onNext(response1);
clock.advanceMillis(1500);
responseObserver.onNext(response2);
responseObserver.onCompleted();
}
});
@SuppressWarnings("unchecked")
LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
RpcCallDetails details = RpcCallDetails.getDefaultInstance();
Mockito.when(handler.getDetails()).thenReturn(details);
AsynchronousFileOutputStream output = Mockito.mock(AsynchronousFileOutputStream.class);
LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, output);
Channel channel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);
// Read both responses.
advance(stub.read(request), 2);
ArgumentCaptor<ReadResponse> resultCaptor = ArgumentCaptor.forClass(ReadResponse.class);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.getDefaultInstance())
.setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(550000000))
.build();
verify(handler).handleReq(request);
verify(handler, times(2)).handleResp(resultCaptor.capture());
assertThat(resultCaptor.getAllValues().get(0)).isEqualTo(response1);
assertThat(resultCaptor.getAllValues().get(1)).isEqualTo(response2);
verify(handler).getDetails();
verify(output).write(expectedEntry);
}
@Test
public void testCallOkMultipleRequests() {
WriteRequest request1 =
WriteRequest.newBuilder()
.setResourceName("test")
.setData(ByteString.copyFromUtf8("abc"))
.build();
WriteRequest request2 =
WriteRequest.newBuilder()
.setResourceName("test")
.setData(ByteString.copyFromUtf8("def"))
.build();
WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onCompleted() {
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});
@SuppressWarnings("unchecked")
LoggingHandler<WriteRequest, WriteResponse> handler = Mockito.mock(LoggingHandler.class);
RpcCallDetails details = RpcCallDetails.getDefaultInstance();
Mockito.when(handler.getDetails()).thenReturn(details);
AsynchronousFileOutputStream output = Mockito.mock(AsynchronousFileOutputStream.class);
LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, output);
Channel channel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamStub stub = ByteStreamGrpc.newStub(channel);
clock.advanceMillis(1000);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
// Write both responses.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
requester.onNext(request2);
clock.advanceMillis(1000);
requester.onCompleted();
ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(1))
.setEndTime(Timestamp.newBuilder().setSeconds(2))
.build();
verify(handler, times(2)).handleReq(resultCaptor.capture());
assertThat(resultCaptor.getAllValues().get(0)).isEqualTo(request1);
assertThat(resultCaptor.getAllValues().get(1)).isEqualTo(request2);
verify(handler).handleResp(response);
verify(handler).getDetails();
verify(output).write(expectedEntry);
}
@Test
public void testCallWithError() {
ReadRequest request = ReadRequest.newBuilder().setResourceName("test").build();
Status error = Status.NOT_FOUND.withDescription("not found");
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
@SuppressWarnings("unchecked")
LoggingHandler<ReadRequest, ReadResponse> handler = Mockito.mock(LoggingHandler.class);
RpcCallDetails details = RpcCallDetails.getDefaultInstance();
Mockito.when(handler.getDetails()).thenReturn(details);
AsynchronousFileOutputStream output = Mockito.mock(AsynchronousFileOutputStream.class);
LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, output);
Channel channel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamBlockingStub stub = ByteStreamGrpc.newBlockingStub(channel);
clock.advanceMillis(1500);
assertThrows(StatusRuntimeException.class, () -> stub.read(request).next());
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(details)
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
.setStartTime(Timestamp.newBuilder().setSeconds(1).setNanos(500000000))
.setEndTime(Timestamp.newBuilder().setSeconds(1).setNanos(600000000))
.build();
verify(handler).handleReq(request);
verify(handler, never()).handleResp(any());
verify(handler).getDetails();
verify(output).write(expectedEntry);
}
@Test
public void testExecuteCallOk() {
ExecuteRequest request =
ExecuteRequest.newBuilder()
.setInstanceName("test-instance")
.setActionDigest(DigestUtil.buildDigest("test", 8))
.build();
Operation response1 = Operation.newBuilder().setName("test-name").build();
Operation response2 =
Operation.newBuilder()
.setName("test-name")
.setDone(true)
.setResponse(Any.pack(request))
.build();
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
responseObserver.onNext(response1);
clock.advanceMillis(2200);
responseObserver.onNext(response2);
clock.advanceMillis(1100);
responseObserver.onCompleted();
}
});
clock.advanceMillis(50000);
Iterator<Operation> replies =
ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);
// Read both responses.
while (replies.hasNext()) {
replies.next();
}
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setExecute(
ExecuteDetails.newBuilder()
.setRequest(request)
.addResponses(response1)
.addResponses(response2)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(50))
.setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testExecuteCallFail() {
ExecuteRequest request =
ExecuteRequest.newBuilder()
.setInstanceName("test-instance")
.setActionDigest(DigestUtil.buildDigest("test", 8))
.build();
Status error = Status.NOT_FOUND.withDescription("not found");
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
clock.advanceMillis(1100);
responseObserver.onError(error.asRuntimeException());
}
});
clock.advanceMillis(20000000000001L);
Iterator<Operation> replies = ExecutionGrpc.newBlockingStub(loggedChannel).execute(request);
assertThrows(StatusRuntimeException.class, () -> replies.hasNext());
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getExecuteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setExecute(ExecuteDetails.newBuilder().setRequest(request)))
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
.setStartTime(Timestamp.newBuilder().setSeconds(20000000000L).setNanos(1000000))
.setEndTime(Timestamp.newBuilder().setSeconds(20000000001L).setNanos(101000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testFindMissingBlobsCallOk() {
Digest testDigest = DigestUtil.buildDigest("test", 8);
FindMissingBlobsRequest request =
FindMissingBlobsRequest.newBuilder()
.addBlobDigests(testDigest)
.setInstanceName("test-instance")
.build();
FindMissingBlobsResponse response =
FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(testDigest).build();
serviceRegistry.addService(
new ContentAddressableStorageImplBase() {
@Override
public void findMissingBlobs(
FindMissingBlobsRequest request,
StreamObserver<FindMissingBlobsResponse> responseObserver) {
clock.advanceMillis(200);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
ContentAddressableStorageBlockingStub stub =
ContentAddressableStorageGrpc.newBlockingStub(loggedChannel);
clock.advanceMillis(14900);
stub.findMissingBlobs(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(
ContentAddressableStorageGrpc.getFindMissingBlobsMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setFindMissingBlobs(
FindMissingBlobsDetails.newBuilder()
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(14).setNanos(900000000))
.setEndTime(Timestamp.newBuilder().setSeconds(15).setNanos(100000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testGetActionResultCallOk() {
Digest testDigest = DigestUtil.buildDigest("test", 8);
GetActionResultRequest request =
GetActionResultRequest.newBuilder()
.setActionDigest(testDigest)
.setInstanceName("test-instance")
.build();
ActionResult response =
ActionResult.newBuilder()
.addOutputFiles(OutputFile.newBuilder().setDigest(testDigest).setPath("root/test"))
.setExitCode(1)
.build();
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void getActionResult(
GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
clock.advanceMillis(22222);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);
clock.advanceMillis(11111);
stub.getActionResult(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ActionCacheGrpc.getGetActionResultMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setGetActionResult(
GetActionResultDetails.newBuilder()
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
.setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testUpdateActionResultCallOk() {
Digest testDigest = DigestUtil.buildDigest("test", 8);
ActionResult actionResult =
ActionResult.newBuilder()
.addOutputFiles(OutputFile.newBuilder().setDigest(testDigest).setPath("root/test"))
.setExitCode(1)
.build();
UpdateActionResultRequest request =
UpdateActionResultRequest.newBuilder()
.setActionDigest(testDigest)
.setInstanceName("test-instance")
.setActionResult(actionResult)
.build();
serviceRegistry.addService(
new ActionCacheImplBase() {
@Override
public void updateActionResult(
UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
clock.advanceMillis(22222);
responseObserver.onNext(actionResult);
responseObserver.onCompleted();
}
});
ActionCacheBlockingStub stub = ActionCacheGrpc.newBlockingStub(loggedChannel);
clock.advanceMillis(11111);
stub.updateActionResult(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ActionCacheGrpc.getUpdateActionResultMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setUpdateActionResult(
UpdateActionResultDetails.newBuilder()
.setRequest(request)
.setResponse(actionResult)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
.setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testGetCapabilitiesCallOk() {
GetCapabilitiesRequest request =
GetCapabilitiesRequest.newBuilder()
.setInstanceName("test-instance")
.build();
ServerCapabilities response =
ServerCapabilities.newBuilder()
.setExecutionCapabilities(
ExecutionCapabilities.newBuilder().setExecEnabled(true).build())
.build();
serviceRegistry.addService(
new CapabilitiesImplBase() {
@Override
public void getCapabilities(
GetCapabilitiesRequest request, StreamObserver<ServerCapabilities> responseObserver) {
clock.advanceMillis(22222);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
});
CapabilitiesBlockingStub stub = CapabilitiesGrpc.newBlockingStub(loggedChannel);
clock.advanceMillis(11111);
stub.getCapabilities(request);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(CapabilitiesGrpc.getGetCapabilitiesMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setGetCapabilities(
GetCapabilitiesDetails.newBuilder()
.setRequest(request)
.setResponse(response)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(11).setNanos(111000000))
.setEndTime(Timestamp.newBuilder().setSeconds(33).setNanos(333000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testWaitExecutionCallOk() {
WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
Operation response1 = Operation.newBuilder().setName("test-name").build();
Operation response2 =
Operation.newBuilder()
.setName("test-name")
.setDone(true)
.setResponse(Any.pack(request))
.build();
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void waitExecution(
WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
responseObserver.onNext(response1);
clock.advanceMillis(2200);
responseObserver.onNext(response2);
clock.advanceMillis(1100);
responseObserver.onCompleted();
}
});
clock.advanceMillis(50000);
Iterator<Operation> replies =
ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);
// Read both responses.
while (replies.hasNext()) {
replies.next();
}
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setWaitExecution(
WaitExecutionDetails.newBuilder()
.setRequest(request)
.addResponses(response1)
.addResponses(response2)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(50))
.setEndTime(Timestamp.newBuilder().setSeconds(53).setNanos(300000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testWaitExecutionCallFail() {
WaitExecutionRequest request = WaitExecutionRequest.newBuilder().setName("test-name").build();
Operation response = Operation.newBuilder().setName("test-name").build();
Status error = Status.DEADLINE_EXCEEDED.withDescription("timed out");
serviceRegistry.addService(
new ExecutionImplBase() {
@Override
public void waitExecution(
WaitExecutionRequest request, StreamObserver<Operation> responseObserver) {
clock.advanceMillis(100);
responseObserver.onNext(response);
clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
clock.advanceMillis(2000);
Iterator<Operation> replies =
ExecutionGrpc.newBlockingStub(loggedChannel).waitExecution(request);
assertThat(replies.hasNext()).isTrue();
assertThat(replies.next()).isEqualTo(response);
assertThrows(StatusRuntimeException.class, () -> replies.hasNext());
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ExecutionGrpc.getWaitExecutionMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setWaitExecution(
WaitExecutionDetails.newBuilder()
.setRequest(request)
.addResponses(response)))
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
.setStartTime(Timestamp.newBuilder().setSeconds(2))
.setEndTime(Timestamp.newBuilder().setSeconds(2).setNanos(200000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testReadCallOk() {
ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
ReadResponse response1 =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
ReadResponse response2 =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
responseObserver.onNext(response1);
responseObserver.onNext(response2);
clock.advanceMillis(2000);
responseObserver.onCompleted();
}
});
clock.advanceMillis(500000);
Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
// Read both responses.
while (replies.hasNext()) {
replies.next();
}
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setRead(
ReadDetails.newBuilder()
.setRequest(request)
.setNumReads(2)
.setBytesRead(6)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(500))
.setEndTime(Timestamp.newBuilder().setSeconds(502))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testReadCallFail() {
ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
ReadResponse response1 =
ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
responseObserver.onNext(response1);
clock.advanceMillis(100);
responseObserver.onError(error.asRuntimeException());
}
});
Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
assertThat(replies.hasNext()).isTrue();
assertThat(replies.next()).isEqualTo(response1);
assertThrows(StatusRuntimeException.class, () -> replies.hasNext());
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setRead(
ReadDetails.newBuilder()
.setRequest(request)
.setNumReads(1)
.setBytesRead(3)))
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(error.getCode().value())
.setMessage(error.getDescription()))
.setStartTime(Timestamp.getDefaultInstance())
.setEndTime(Timestamp.newBuilder().setNanos(100000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testWriteCallOk() {
WriteRequest request1 =
WriteRequest.newBuilder()
.setResourceName("test1")
.setData(ByteString.copyFromUtf8("abc"))
.build();
WriteRequest request2 =
WriteRequest.newBuilder()
.setResourceName("test2")
.setData(ByteString.copyFromUtf8("def"))
.build();
WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onCompleted() {
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});
ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
clock.advanceMillis(10000);
// Request three writes, the first identical with the third.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
clock.advanceMillis(100);
requester.onNext(request2);
clock.advanceMillis(200);
requester.onNext(request1);
clock.advanceMillis(100);
requester.onCompleted();
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(
RpcCallDetails.newBuilder()
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test1")
.addResourceNames("test2")
.setResponse(response)
.setBytesSent(9)
.setNumWrites(3)))
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(10))
.setEndTime(Timestamp.newBuilder().setSeconds(10).setNanos(400000000))
.build();
verify(logStream).write(expectedEntry);
}
@Test
public void testWriteCallFail() {
WriteRequest request =
WriteRequest.newBuilder()
.setResourceName("test")
.setData(ByteString.copyFromUtf8("abc"))
.build();
Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
@SuppressWarnings("unchecked")
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return Mockito.mock(StreamObserver.class);
}
});
ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
clock.advanceMillis(10000000000L);
// Write both responses.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request);
clock.advanceMillis(10000000000L);
requester.onError(error.asRuntimeException());
Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(expectedCancel.getCode().value())
.setMessage(expectedCancel.getCause().toString()))
.setDetails(
RpcCallDetails.newBuilder()
.setWrite(
WriteDetails.newBuilder()
.addResourceNames("test")
.setNumWrites(1)
.setBytesSent(3)))
.setStartTime(Timestamp.newBuilder().setSeconds(10000000))
.setEndTime(Timestamp.newBuilder().setSeconds(20000000))
.build();
verify(logStream).write(expectedEntry);
}
}