blob: 75371197269f01c09507915c7bd8396c2d8d80b3 [file] [log] [blame]
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream.transports;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventContext;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.util.AbruptExitException;
import com.google.devtools.build.lib.util.ExitCode;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
/**
* Non-blocking file transport.
*
* <p>Implementors of this class need to implement {@code #sendBuildEvent(BuildEvent)} which
* serializes the build event and writes it to a file.
*/
abstract class FileTransport implements BuildEventTransport {
private static final Logger logger = Logger.getLogger(FileTransport.class.getName());
private final BuildEventProtocolOptions options;
private final BuildEventArtifactUploader uploader;
private final SequentialWriter writer;
private final ArtifactGroupNamer namer;
private final ScheduledExecutorService timeoutExecutor =
MoreExecutors.listeningDecorator(
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("file-uploader-timeout-%d").build()));
FileTransport(
BufferedOutputStream outputStream,
BuildEventProtocolOptions options,
BuildEventArtifactUploader uploader,
ArtifactGroupNamer namer) {
this.uploader = uploader;
this.options = options;
this.writer =
new SequentialWriter(outputStream, this::serializeEvent, uploader, timeoutExecutor);
this.namer = namer;
}
@ThreadSafe
@VisibleForTesting
static final class SequentialWriter implements Runnable {
private static final Logger logger = Logger.getLogger(SequentialWriter.class.getName());
private static final ListenableFuture<BuildEventStreamProtos.BuildEvent> CLOSE_EVENT_FUTURE =
Futures.immediateFailedFuture(
new IllegalStateException(
"A FileTransport is trying to write CLOSE_EVENT_FUTURE, this is a bug."));
private static final Duration FLUSH_INTERVAL =
Duration.ofMillis(
Long.parseLong(System.getProperty("EXPERIMENTAL_BEP_FILE_FLUSH_MILLIS", "250")));
private final Thread writerThread;
private final BufferedOutputStream out;
private final Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc;
private final BuildEventArtifactUploader uploader;
private final AtomicBoolean isClosed = new AtomicBoolean();
private final SettableFuture<Void> closeFuture = SettableFuture.create();
@VisibleForTesting
final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
new LinkedBlockingDeque<>();
private ScheduledExecutorService timeoutExecutor;
SequentialWriter(
BufferedOutputStream outputStream,
Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
BuildEventArtifactUploader uploader,
ScheduledExecutorService timeoutExecutor) {
checkNotNull(uploader);
this.out = checkNotNull(outputStream);
this.writerThread = new Thread(this, "bep-local-writer");
this.serializeFunc = checkNotNull(serializeFunc);
this.uploader = checkNotNull(uploader);
this.timeoutExecutor = checkNotNull(timeoutExecutor);
writerThread.start();
}
@Override
public void run() {
ListenableFuture<BuildEventStreamProtos.BuildEvent> buildEventF;
try {
Instant prevFlush = Instant.now();
while ((buildEventF = pendingWrites.poll(FLUSH_INTERVAL.toMillis(), TimeUnit.MILLISECONDS))
!= CLOSE_EVENT_FUTURE) {
if (buildEventF != null) {
BuildEventStreamProtos.BuildEvent buildEvent = buildEventF.get();
byte[] serialized = serializeFunc.apply(buildEvent);
out.write(serialized);
}
Instant now = Instant.now();
if (buildEventF == null || now.compareTo(prevFlush.plus(FLUSH_INTERVAL)) > 0) {
// Some users, e.g. Tulsi, expect prompt BEP stream flushes for interactive use.
out.flush();
prevFlush = now;
}
}
} catch (ExecutionException e) {
Throwables.throwIfUnchecked(e.getCause());
exitFailure(e);
} catch (IOException | InterruptedException | CancellationException e) {
exitFailure(e);
} finally {
try {
try {
out.flush();
out.close();
} finally {
uploader.shutdown();
timeoutExecutor.shutdown();
}
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to close BEP file output stream.", e);
}
closeFuture.set(null);
}
}
private void exitFailure(Throwable e) {
final String message;
// Print a more useful error message when the upload times out.
// An {@link ExecutionException} may be wrapping a {@link TimeoutException} if the
// Future was created with {@link Futures#withTimeout}.
if (e instanceof ExecutionException
&& e.getCause() instanceof TimeoutException) {
message = "Unable to write all BEP events to file due to timeout";
} else {
message =
String.format("Unable to write all BEP events to file due to '%s'", e.getMessage());
}
closeFuture.setException(
new AbruptExitException(message, ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, e));
pendingWrites.clear();
logger.log(Level.SEVERE, message, e);
}
private void closeNow() {
if (closeFuture.isDone()) {
return;
}
try {
pendingWrites.clear();
pendingWrites.put(CLOSE_EVENT_FUTURE);
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "Failed to immediately close the sequential writer.", e);
}
}
ListenableFuture<Void> close() {
if (isClosed.getAndSet(true)) {
return closeFuture;
} else if (closeFuture.isDone()) {
return closeFuture;
}
// Close abruptly if the closing future is cancelled.
closeFuture.addListener(
() -> {
if (closeFuture.isCancelled()) {
closeNow();
}
},
MoreExecutors.directExecutor());
try {
pendingWrites.put(CLOSE_EVENT_FUTURE);
} catch (InterruptedException e) {
closeNow();
logger.log(Level.SEVERE, "Failed to close the sequential writer.", e);
closeFuture.set(null);
}
return closeFuture;
}
private Duration getFlushInterval() {
return FLUSH_INTERVAL;
}
}
@Override
public void sendBuildEvent(BuildEvent event) {
if (writer.isClosed.get()) {
return;
}
try {
if (!writer.pendingWrites.add(asStreamProto(event, namer))) {
logger.log(Level.SEVERE, "Failed to add BEP event to the write queue");
}
} catch (RejectedExecutionException e) {
// If early shutdown races with this event, log but otherwise ignore.
logger.log(Level.WARNING, "Event upload started after shutdown");
}
}
protected abstract byte[] serializeEvent(BuildEventStreamProtos.BuildEvent buildEvent);
@Override
public ListenableFuture<Void> close() {
return writer.close();
}
/**
* Converts the given event into a proto object; this may trigger uploading of referenced files as
* a side effect. May return {@code null} if there was an interrupt. This method is not
* thread-safe.
*/
private ListenableFuture<BuildEventStreamProtos.BuildEvent> asStreamProto(
BuildEvent event, ArtifactGroupNamer namer) {
checkNotNull(event);
ListenableFuture<PathConverter> converterFuture =
uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
ListenableFuture<?> remoteUploads =
uploader.waitForRemoteUploads(event.remoteUploads(), timeoutExecutor);
return Futures.transform(
Futures.allAsList(converterFuture, remoteUploads),
results -> {
BuildEventContext context =
new BuildEventContext() {
@Override
public PathConverter pathConverter() {
return Futures.getUnchecked(converterFuture);
}
@Override
public ArtifactGroupNamer artifactGroupNamer() {
return namer;
}
@Override
public BuildEventProtocolOptions getOptions() {
return options;
}
};
return event.asStreamProto(context);
},
MoreExecutors.directExecutor());
}
@Override
public boolean mayBeSlow() {
return uploader.mayBeSlow();
}
@Override
public BuildEventArtifactUploader getUploader() {
return uploader;
}
/** Determines how often the {@link FileTransport} flushes events. */
Duration getFlushInterval() {
return writer.getFlushInterval();
}
}