blob: 675eba3be6e465cab8329a52ae830f73569bc2a1 [file] [log] [blame]
// Copyright 2025 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;
import static com.google.common.truth.Truth.assertThat;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.v1.BuildEvent.BuildComponentStreamFinished.FinishType;
import com.google.devtools.build.v1.PublishBuildEventGrpc.PublishBuildEventImplBase;
import com.google.devtools.build.v1.PublishBuildToolEventStreamRequest;
import com.google.devtools.build.v1.PublishBuildToolEventStreamResponse;
import com.google.devtools.build.v1.PublishLifecycleEventRequest;
import com.google.devtools.build.v1.StreamId;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* Trivial implementation of {@link PublishBuildEventImplBase} that can insert sleeps at critical
* junctures.
*/
public class DelayingPublishBuildEventService extends PublishBuildEventImplBase {
@GuardedBy("this")
private Duration delayBeforeClosingStream = Duration.ZERO;
@GuardedBy("this")
private Duration delayBeforeHalfClosingStream = Duration.ZERO;
@GuardedBy("this")
@Nullable
private String errorMessage = null;
@GuardedBy("this")
@Nullable
private Status errorCode = null;
private final AtomicInteger requestsReceived = new AtomicInteger(0);
private boolean errorEarlyInStream = false;
/**
* Synchronizing this method can lead to deadlocks -- it calls into {@link
* io.grpc.inprocess.InProcessTransport} which takes a locks on itself. Opposite order of locks
* happens for {@link #publishBuildToolEventStream} called while holding the lock on {@link
* io.grpc.inprocess.InProcessTransport}.
*/
@Override
public void publishLifecycleEvent(
PublishLifecycleEventRequest request, StreamObserver<Empty> responseObserver) {
RequestMetadata metadata = TracingMetadataUtils.fromCurrentContext();
assertThat(metadata.getToolInvocationId()).isNotEmpty();
assertThat(metadata.getCorrelatedInvocationsId()).isNotEmpty();
assertThat(metadata.getActionId()).isEqualTo("publish_lifecycle_event");
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public synchronized StreamObserver<PublishBuildToolEventStreamRequest>
publishBuildToolEventStream(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver) {
requestsReceived.incrementAndGet();
RequestMetadata metadata = TracingMetadataUtils.fromCurrentContext();
assertThat(metadata.getToolInvocationId()).isNotEmpty();
assertThat(metadata.getCorrelatedInvocationsId()).isNotEmpty();
assertThat(metadata.getActionId()).isEqualTo("publish_build_tool_event_stream");
if (errorMessage != null) {
return new ErroringPublishBuildStreamObserver(
responseObserver, errorMessage, errorCode, errorEarlyInStream);
}
DelayingPublishBuildStreamObserver observer =
new DelayingPublishBuildStreamObserver(
responseObserver, delayBeforeClosingStream, delayBeforeHalfClosingStream);
observer.startAckingThread();
return observer;
}
synchronized void setErrorMessage(String errorMessage) {
setErrorMessageAndCode(errorMessage, Status.DATA_LOSS);
}
synchronized void setErrorMessageAndCode(String errorMessage, Status code) {
this.errorMessage = errorMessage;
this.errorCode = code;
}
synchronized void setErrorEarlyInStream(boolean errorEarlyInStream) {
this.errorEarlyInStream = errorEarlyInStream;
}
public synchronized void setDelayBeforeClosingStream(Duration delay) {
this.delayBeforeClosingStream = delay;
}
synchronized void setDelayBeforeHalfClosingStream(Duration delay) {
this.delayBeforeHalfClosingStream = delay;
}
int getRequestsReceivedCount() {
return requestsReceived.get();
}
/**
* A {@link StreamObserver} that simulates a server that terminates the stream with an error,
* either immediately or when the client closes its end of the stream.
*/
private static final class ErroringPublishBuildStreamObserver
implements StreamObserver<PublishBuildToolEventStreamRequest> {
private final StreamObserver<PublishBuildToolEventStreamResponse> responseObserver;
private final String errorMessage;
private final Status errorCode;
private final boolean errorEarlyInStream;
ErroringPublishBuildStreamObserver(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver,
String errorMessage,
Status errorCode,
boolean errorEarlyInStream) {
this.responseObserver = responseObserver;
this.errorMessage = errorMessage;
this.errorCode = errorCode;
this.errorEarlyInStream = errorEarlyInStream;
}
@Override
public void onNext(PublishBuildToolEventStreamRequest value) {
if (errorEarlyInStream) {
responseObserver.onError(
new StatusRuntimeException(errorCode.withDescription(errorMessage)));
}
responseObserver.onNext(
PublishBuildToolEventStreamResponse.newBuilder()
.setStreamId(value.getOrderedBuildEventOrBuilder().getStreamId())
.setSequenceNumber(value.getOrderedBuildEvent().getSequenceNumber())
.build());
}
@Override
public void onError(Throwable t) {}
@Override
public void onCompleted() {
responseObserver.onError(new StatusRuntimeException(errorCode.withDescription(errorMessage)));
}
}
/**
* Trivial, in-memory implementation of a PublishBuildToolEventStream handler that can have
* pre-configured sleeps triggered at critical junctures.
*/
private static class DelayingPublishBuildStreamObserver
implements StreamObserver<PublishBuildToolEventStreamRequest> {
private final StreamObserver<PublishBuildToolEventStreamResponse> responseObserver;
private final Duration delayBeforeClosingStream;
private final Duration delayBeforeHalfClosingStream;
@GuardedBy("this")
private final SortedSet<Long> unackedSequenceNumbers = Sets.newTreeSet();
private final BlockingQueue<Long> ackQueue = new ArrayBlockingQueue<>(10);
@GuardedBy("this")
private Thread ackingThread = null;
@GuardedBy("this")
private StreamId streamId = null;
@GuardedBy("this")
private boolean finished = false;
private DelayingPublishBuildStreamObserver(
StreamObserver<PublishBuildToolEventStreamResponse> responseObserver,
Duration delayBeforeClosingStream,
Duration delayBeforeHalfClosingStream) {
this.responseObserver = responseObserver;
this.delayBeforeClosingStream = delayBeforeClosingStream;
this.delayBeforeHalfClosingStream = delayBeforeHalfClosingStream;
}
/** Creates the acking thread, safely callable after the constructor finishes. */
synchronized void startAckingThread() {
Preconditions.checkState(ackingThread == null, "startAckingThread() called twice");
ackingThread = new Thread(new AckingThread());
ackingThread.start();
}
@Override
public void onNext(PublishBuildToolEventStreamRequest req) {
List<Long> longsToPut = new ArrayList<>();
synchronized (this) {
if (!unackedSequenceNumbers.add(req.getOrderedBuildEvent().getSequenceNumber())) {
return; // dupe, ignore
}
streamId = MoreObjects.firstNonNull(streamId, req.getOrderedBuildEvent().getStreamId());
if (req.getOrderedBuildEvent().getEvent().getComponentStreamFinished().getType()
== FinishType.FINISH_TYPE_UNSPECIFIED) {
// We did not get the final event. Ack the *previous* event, if there is a previous event.
if (unackedSequenceNumbers.size() > 1) {
longsToPut.add(ackLowestSequenceNumber());
}
} else {
Uninterruptibles.sleepUninterruptibly(delayBeforeHalfClosingStream);
// final event. ack everything remaining.
while (!unackedSequenceNumbers.isEmpty()) {
longsToPut.add(ackLowestSequenceNumber());
}
if (finished) {
longsToPut.add(SENTINEL_VALUE);
}
}
}
for (Long seqNum : longsToPut) {
Uninterruptibles.putUninterruptibly(ackQueue, seqNum);
}
}
@GuardedBy("this")
private Long ackLowestSequenceNumber() {
Long firstUnacked = unackedSequenceNumbers.first();
unackedSequenceNumbers.remove(firstUnacked);
return firstUnacked;
}
@Override
public synchronized void onError(Throwable t) {
finished = true;
responseObserver.onError(t);
}
@Override
public void onCompleted() {
boolean putSentinel;
synchronized (this) {
finished = true;
putSentinel = unackedSequenceNumbers.isEmpty();
}
if (putSentinel) {
Uninterruptibles.putUninterruptibly(ackQueue, SENTINEL_VALUE);
}
}
static final Long SENTINEL_VALUE = -1L;
private class AckingThread implements Runnable {
@Override
public void run() {
while (true) {
Long firstUnacked = Uninterruptibles.takeUninterruptibly(ackQueue);
synchronized (DelayingPublishBuildStreamObserver.this) {
if (firstUnacked.equals(SENTINEL_VALUE)) {
Uninterruptibles.sleepUninterruptibly(delayBeforeClosingStream);
responseObserver.onCompleted();
return;
}
responseObserver.onNext(
PublishBuildToolEventStreamResponse.newBuilder()
.setStreamId(streamId)
.setSequenceNumber(firstUnacked)
.build());
}
}
}
}
}
}