This CL adds log entry protos and logging handlers for bytestream Read and Write so that they are logged. I'm open to suggestions for the logging format for these calls, since we don't want to log the actual contents of reads/writes because of their size.
PiperOrigin-RevId: 193047886
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
index d2998ae..dc1caa6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/BUILD
@@ -16,6 +16,8 @@
"//src/main/protobuf:remote_execution_log_java_proto",
"//third_party:guava",
"//third_party/grpc:grpc-jar",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
"@googleapis//:google_longrunning_operations_java_proto",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
index fa657ba..a805eae 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptor.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote.logging;
+import com.google.bytestream.ByteStreamGrpc;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
@@ -59,6 +60,10 @@
return new GetActionResultHandler();
} else if (method == ContentAddressableStorageGrpc.getFindMissingBlobsMethod()) {
return new FindMissingBlobsHandler();
+ } else if (method == ByteStreamGrpc.getReadMethod()) {
+ return new ReadHandler();
+ } else if (method == ByteStreamGrpc.getWriteMethod()) {
+ return new WriteHandler();
}
return null;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/ReadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/ReadHandler.java
new file mode 100644
index 0000000..8c43f40
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/ReadHandler.java
@@ -0,0 +1,45 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote.logging;
+
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
+
+/** LoggingHandler for {@link google.bytestream.Read} gRPC call. */
+public class ReadHandler implements LoggingHandler<ReadRequest, ReadResponse> {
+ private final ReadDetails.Builder builder = ReadDetails.newBuilder();
+ private long numReads = 0;
+ private long bytesRead = 0;
+
+ @Override
+ public void handleReq(ReadRequest message) {
+ builder.setRequest(message);
+ }
+
+ @Override
+ public void handleResp(ReadResponse message) {
+ numReads++;
+ bytesRead += message.getData().size();
+ }
+
+ @Override
+ public RpcCallDetails getDetails() {
+ builder.setNumReads(numReads);
+ builder.setBytesRead(bytesRead);
+ return RpcCallDetails.newBuilder().setRead(builder).build();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java
new file mode 100644
index 0000000..67e9e37
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/logging/WriteHandler.java
@@ -0,0 +1,51 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote.logging;
+
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** LoggingHandler for {@link google.bytestream.Write} gRPC call. */
+public class WriteHandler implements LoggingHandler<WriteRequest, WriteResponse> {
+ private final WriteDetails.Builder builder = WriteDetails.newBuilder();
+ private final Set<String> resources = new LinkedHashSet<>();
+ private long numWrites = 0;
+ private long bytesSent = 0;
+
+ @Override
+ public void handleReq(WriteRequest message) {
+ resources.add(message.getResourceName());
+
+ numWrites++;
+ bytesSent += message.getData().size();
+ }
+
+ @Override
+ public void handleResp(WriteResponse message) {
+ builder.setResponse(message);
+ }
+
+ @Override
+ public RpcCallDetails getDetails() {
+ builder.addAllResourceNames(resources);
+ builder.setNumWrites(numWrites);
+ builder.setBytesSent(bytesSent);
+ return RpcCallDetails.newBuilder().setWrite(builder).build();
+ }
+}
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 02cc158..cb44e63 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -147,6 +147,7 @@
name = "remote_execution_log_proto",
srcs = ["remote_execution_log.proto"],
deps = [
+ "@googleapis//:google_bytestream_bytestream_proto",
"@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_proto",
"@googleapis//:google_longrunning_operations_proto",
"@googleapis//:google_rpc_status_proto",
diff --git a/src/main/protobuf/remote_execution_log.proto b/src/main/protobuf/remote_execution_log.proto
index 7bf9814..1e4fa9f 100644
--- a/src/main/protobuf/remote_execution_log.proto
+++ b/src/main/protobuf/remote_execution_log.proto
@@ -16,6 +16,7 @@
package remote_logging;
+import "google/bytestream/bytestream.proto";
import "google/devtools/remoteexecution/v1test/remote_execution.proto";
import "google/longrunning/operations.proto";
import "google/rpc/status.proto";
@@ -84,6 +85,36 @@
google.devtools.remoteexecution.v1test.FindMissingBlobsResponse response = 2;
}
+// Details for a call to google.bytestream.Read.
+message ReadDetails {
+ // The google.bytestream.ReadRequest sent.
+ google.bytestream.ReadRequest request = 1;
+
+ // The number of reads performed in this call.
+ int64 num_reads = 2;
+
+ // The total number of bytes read totalled over all stream responses.
+ int64 bytes_read = 3;
+}
+
+// Details for a call to google.bytestream.Write.
+message WriteDetails {
+ // The names of resources requested to be written to in this call in the order
+ // they were first requested in. If the ByteStream protocol is followed
+ // according to specification, this should only contain have a single element,
+ // which is the resource name specified in the first message of the stream.
+ repeated string resource_names = 1;
+
+ // The number of writes performed in this call.
+ int64 num_writes = 2;
+
+ // The total number of bytes sent over the stream.
+ int64 bytes_sent = 3;
+
+ // The received google.bytestream.WriteResponse.
+ google.bytestream.WriteResponse response = 4;
+}
+
// Contains details for specific types of calls.
message RpcCallDetails {
oneof details {
@@ -91,5 +122,7 @@
GetActionResultDetails get_action_result = 2;
WatchDetails watch = 3;
FindMissingBlobsDetails find_missing_blobs = 4;
+ ReadDetails read = 5;
+ WriteDetails write = 6;
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
index 0519b6e..4c1c398 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/logging/LoggingInterceptorTest.java
@@ -34,8 +34,10 @@
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.FindMissingBlobsDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.GetActionResultDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.LogEntry;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.ReadDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.RpcCallDetails;
import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WatchDetails;
+import com.google.devtools.build.lib.remote.logging.RemoteExecutionLog.WriteDetails;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.util.io.AsynchronousFileOutputStream;
import com.google.devtools.remoteexecution.v1test.Action;
@@ -557,4 +559,195 @@
.build();
verify(logStream).write(expectedEntry);
}
+
+ @Test
+ public void testReadCallOk() {
+ ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
+ ReadResponse response1 =
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
+ ReadResponse response2 =
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build();
+
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ responseObserver.onNext(response1);
+ responseObserver.onNext(response2);
+ responseObserver.onCompleted();
+ }
+ });
+
+ Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
+
+ // Read both responses.
+ while (replies.hasNext()) {
+ replies.next();
+ }
+
+ LogEntry expectedEntry =
+ LogEntry.newBuilder()
+ .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
+ .setDetails(
+ RpcCallDetails.newBuilder()
+ .setRead(
+ ReadDetails.newBuilder()
+ .setRequest(request)
+ .setNumReads(2)
+ .setBytesRead(6)))
+ .setStatus(com.google.rpc.Status.getDefaultInstance())
+ .build();
+ verify(logStream).write(expectedEntry);
+ }
+
+ @Test
+ public void testReadCallFail() {
+ ReadRequest request = ReadRequest.newBuilder().setResourceName("test-resource").build();
+ ReadResponse response1 =
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build();
+ Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");
+
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ responseObserver.onNext(response1);
+ responseObserver.onError(error.asRuntimeException());
+ }
+ });
+ Iterator<ReadResponse> replies = ByteStreamGrpc.newBlockingStub(loggedChannel).read(request);
+ assertThrows(
+ StatusRuntimeException.class,
+ () -> {
+ while (replies.hasNext()) {
+ replies.next();
+ }
+ });
+
+ LogEntry expectedEntry =
+ LogEntry.newBuilder()
+ .setMethodName(ByteStreamGrpc.getReadMethod().getFullMethodName())
+ .setDetails(
+ RpcCallDetails.newBuilder()
+ .setRead(
+ ReadDetails.newBuilder()
+ .setRequest(request)
+ .setNumReads(1)
+ .setBytesRead(3)))
+ .setStatus(
+ com.google.rpc.Status.newBuilder()
+ .setCode(error.getCode().value())
+ .setMessage(error.getDescription()))
+ .build();
+ verify(logStream).write(expectedEntry);
+ }
+
+ @Test
+ public void testWriteCallOk() {
+ WriteRequest request1 =
+ WriteRequest.newBuilder()
+ .setResourceName("test1")
+ .setData(ByteString.copyFromUtf8("abc"))
+ .build();
+ WriteRequest request2 =
+ WriteRequest.newBuilder()
+ .setResourceName("test2")
+ .setData(ByteString.copyFromUtf8("def"))
+ .build();
+ WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest writeRequest) {}
+
+ @Override
+ public void onError(Throwable throwable) {}
+
+ @Override
+ public void onCompleted() {
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
+ }
+ });
+
+ ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
+ @SuppressWarnings("unchecked")
+ StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+
+ // Request three writes, the first identical with the third.
+ StreamObserver<WriteRequest> requester = stub.write(responseObserver);
+ requester.onNext(request1);
+ requester.onNext(request2);
+ requester.onNext(request1);
+ requester.onCompleted();
+
+ LogEntry expectedEntry =
+ LogEntry.newBuilder()
+ .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
+ .setDetails(
+ RpcCallDetails.newBuilder()
+ .setWrite(
+ WriteDetails.newBuilder()
+ .addResourceNames("test1")
+ .addResourceNames("test2")
+ .setResponse(response)
+ .setBytesSent(9)
+ .setNumWrites(3)))
+ .setStatus(com.google.rpc.Status.getDefaultInstance())
+ .build();
+
+ verify(logStream).write(expectedEntry);
+ }
+
+ @Test
+ public void testWriteCallFail() {
+ WriteRequest request =
+ WriteRequest.newBuilder()
+ .setResourceName("test")
+ .setData(ByteString.copyFromUtf8("abc"))
+ .build();
+ Status error = Status.DEADLINE_EXCEEDED.withDescription("timeout");
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ return Mockito.mock(StreamObserver.class);
+ }
+ });
+
+ ByteStreamStub stub = ByteStreamGrpc.newStub(loggedChannel);
+ @SuppressWarnings("unchecked")
+ StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
+
+ // Write both responses.
+ StreamObserver<WriteRequest> requester = stub.write(responseObserver);
+ requester.onNext(request);
+ requester.onError(error.asRuntimeException());
+
+ Status expectedCancel = Status.CANCELLED.withCause(error.asRuntimeException());
+
+ LogEntry expectedEntry =
+ LogEntry.newBuilder()
+ .setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
+ .setStatus(
+ com.google.rpc.Status.newBuilder()
+ .setCode(expectedCancel.getCode().value())
+ .setMessage(expectedCancel.getCause().toString()))
+ .setDetails(
+ RpcCallDetails.newBuilder()
+ .setWrite(
+ WriteDetails.newBuilder()
+ .addResourceNames("test")
+ .setNumWrites(1)
+ .setBytesSent(3)))
+ .build();
+
+ verify(logStream).write(expectedEntry);
+ }
}