blob: 115ba2c8b70759fd0edd315d1e62be41b80cd5d4 [file] [log] [blame]
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice.client;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.v1.PublishBuildEventGrpc;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventBlockingStub;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventStub;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import javax.annotation.Nullable;
/** Implementation of BuildEventServiceClient that uploads data using gRPC. */
public abstract class BuildEventServiceGrpcClient implements BuildEventServiceClient {
/** Max wait time for a single non-streaming RPC to finish */
private static final Duration RPC_TIMEOUT = Duration.ofSeconds(15);
private final PublishBuildEventStub besAsync;
private final PublishBuildEventBlockingStub besBlocking;
public BuildEventServiceGrpcClient(Channel channel, @Nullable CallCredentials callCredentials) {
this(
withCallCredentials(PublishBuildEventGrpc.newStub(channel), callCredentials),
withCallCredentials(PublishBuildEventGrpc.newBlockingStub(channel), callCredentials));
}
@VisibleForTesting
protected BuildEventServiceGrpcClient(
PublishBuildEventStub besAsync, PublishBuildEventBlockingStub besBlocking) {
this.besAsync = besAsync;
this.besBlocking = besBlocking;
}
private static <T extends AbstractStub<T>> T withCallCredentials(
T stub, @Nullable CallCredentials callCredentials) {
stub = callCredentials != null ? stub.withCallCredentials(callCredentials) : stub;
return stub;
}
@Override
public void publish(PublishLifecycleEventRequest lifecycleEvent)
throws StatusException, InterruptedException {
throwIfInterrupted();
try {
besBlocking
.withDeadlineAfter(RPC_TIMEOUT.toMillis(), MILLISECONDS)
.publishLifecycleEvent(lifecycleEvent);
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
throw e.getStatus().asException();
}
}
private static class BESGrpcStreamContext implements StreamContext {
private final StreamObserver<PublishBuildToolEventStreamRequest> stream;
private final SettableFuture<Status> streamStatus;
public BESGrpcStreamContext(PublishBuildEventStub besAsync, AckCallback ackCallback) {
this.streamStatus = SettableFuture.create();
this.stream =
besAsync.publishBuildToolEventStream(
new StreamObserver<PublishBuildToolEventStreamResponse>() {
@Override
public void onNext(PublishBuildToolEventStreamResponse response) {
ackCallback.apply(response);
}
@Override
public void onError(Throwable t) {
Status error = Status.fromThrowable(t);
if (error.getCode() == Status.CANCELLED.getCode()
&& error.getCause() != null
&& Status.fromThrowable(error.getCause()).getCode()
!= Status.UNKNOWN.getCode()) {
// gRPC likes to wrap Status(Runtime)Exceptions in StatusRuntimeExceptions. If
// the status is cancelled and has a Status(Runtime)Exception as a cause it
// means the error was generated client side.
error = Status.fromThrowable(error.getCause());
}
streamStatus.set(error);
}
@Override
public void onCompleted() {
streamStatus.set(Status.OK);
}
});
}
@Override
public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent)
throws InterruptedException {
throwIfInterrupted();
try {
stream.onNext(buildEvent);
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
streamStatus.set(Status.fromThrowable(e));
}
}
@Override
public void halfCloseStream() {
stream.onCompleted();
}
@Override
public void abortStream(Status status) {
stream.onError(status.asException());
}
@Override
public ListenableFuture<Status> getStatus() {
return streamStatus;
}
}
@Override
public StreamContext openStream(AckCallback ackCallback) throws InterruptedException {
try {
return new BESGrpcStreamContext(besAsync, ackCallback);
} catch (StatusRuntimeException e) {
Throwables.throwIfInstanceOf(Throwables.getRootCause(e), InterruptedException.class);
ListenableFuture<Status> status = Futures.immediateFuture(Status.fromThrowable(e));
return new StreamContext() {
@Override
public ListenableFuture<Status> getStatus() {
return status;
}
@Override
public void sendOverStream(PublishBuildToolEventStreamRequest buildEvent) {}
@Override
public void halfCloseStream() {}
@Override
public void abortStream(Status status) {}
};
}
}
@Override
public String userReadableError(Throwable t) {
if (t instanceof StatusException) {
Throwable rootCause = Throwables.getRootCause(t);
String message = ((StatusException) t).getStatus().getCode().name();
message += ": " + rootCause.getMessage();
return message;
} else {
return t.getMessage();
}
}
@Override
public abstract void shutdown();
private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
}