Support streaming upload to BEP backends.
Events may now report remote uploads which do not necessarily correspond to an in-disk file.
PiperOrigin-RevId: 281082666
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 4023912..e03162e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -29,6 +29,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand;
@@ -64,7 +65,10 @@
import java.util.Deque;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.logging.Level;
@@ -100,6 +104,11 @@
private final EventBus eventBus;
private boolean startedClose = false;
+ private final ScheduledExecutorService timeoutExecutor =
+ MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("bes-uploader-timeout-%d").build()));
+
/**
* The event queue contains two types of events: - Build events, sorted by sequence number, that
* should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to
@@ -309,6 +318,7 @@
throw e;
} finally {
localFileUploader.shutdown();
+ MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
closeFuture.set(null);
}
}
@@ -398,7 +408,7 @@
SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
ackQueue.addLast(buildEvent);
- PathConverter pathConverter = waitForLocalFileUploads(buildEvent);
+ PathConverter pathConverter = waitForUploads(buildEvent);
BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent =
createSerializedRegularBuildEvent(pathConverter, buildEvent);
@@ -599,16 +609,28 @@
}
}
- private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
+ private PathConverter waitForUploads(SendRegularBuildEventCommand orderedBuildEvent)
throws LocalFileUploadException, InterruptedException {
try {
- // Wait for the local file upload to have been completed.
+ // Wait for the local file and pending remote uploads to complete.
+ ListenableFuture<?> remoteUploads =
+ Futures.successfulAsList(orderedBuildEvent.getEvent().remoteUploads());
+
+ if (localFileUploader.timeout() != null) {
+ remoteUploads =
+ Futures.withTimeout(
+ remoteUploads,
+ localFileUploader.timeout().toMillis(),
+ TimeUnit.MILLISECONDS,
+ timeoutExecutor);
+ }
+ // TODO(bazel-team): Consider failing softy if remote upload fails.
+ remoteUploads.get();
return orderedBuildEvent.localFileUploadProgress().get();
} catch (ExecutionException e) {
logger.log(
Level.WARNING,
- String.format(
- "Failed to upload local files referenced by build event: %s", e.getMessage()),
+ String.format("Failed to upload files referenced by build event: %s", e.getMessage()),
e);
Throwables.throwIfUnchecked(e.getCause());
throw new LocalFileUploadException(e.getCause());
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
index 0eb70ec..c92b637 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
@@ -18,6 +18,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.vfs.Path;
import java.util.Collection;
@@ -115,6 +116,15 @@
}
/**
+ * Returns a collection of URI futures corresponding to in-flight file uploads.
+ *
+ * <p>The files here are considered "remote" in that they may not correspond to on-disk files.
+ */
+ default Collection<ListenableFuture<String>> remoteUploads() {
+ return ImmutableList.of();
+ }
+
+ /**
* Provide a binary representation of the event.
*
* <p>Provide a presentation of the event according to the specified binary format, as appropriate
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
index 872692b..d941f7e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
@@ -14,11 +14,18 @@
package com.google.devtools.build.lib.buildeventstream;
import com.google.common.collect.Maps;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
import com.google.devtools.build.lib.vfs.Path;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
import java.util.Collection;
import java.util.Map;
+import javax.annotation.Nullable;
/** Uploads artifacts referenced by the Build Event Protocol (BEP). */
public interface BuildEventArtifactUploader {
@@ -33,6 +40,40 @@
*/
ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files);
+ /** The timeout on individual file uploads, or null if none. */
+ @Nullable
+ default Duration timeout() {
+ return null;
+ }
+
+ /** The context associated with an in-flight remote upload. */
+ interface UploadContext {
+
+ /** The {@link OutputStream} to stream the file contents to. */
+ OutputStream getOutputStream();
+
+ /** The future URI of the completed upload. */
+ ListenableFuture<String> uriFuture();
+ }
+
+ /** Initiate a streaming upload to the remote storage. */
+ default UploadContext startUpload(LocalFileType type) {
+ return EMPTY_UPLOAD;
+ }
+
+ UploadContext EMPTY_UPLOAD =
+ new UploadContext() {
+ @Override
+ public OutputStream getOutputStream() {
+ return ByteStreams.nullOutputStream();
+ }
+
+ @Override
+ public ListenableFuture<String> uriFuture() {
+ return Futures.immediateFailedFuture(new IOException("No available uploader"));
+ }
+ };
+
/**
* Shutdown any resources associated with the uploader.
*/
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
index fb8fc2e..dab2dd3 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
@@ -42,6 +42,16 @@
public String buildEventUploadStrategy;
@Option(
+ name = "experimental_stream_log_file_uploads",
+ defaultValue = "false",
+ documentationCategory = OptionDocumentationCategory.LOGGING,
+ effectTags = {OptionEffectTag.AFFECTS_OUTPUTS},
+ help =
+ "Stream log file uploads directly to the remote storage rather than writing them to"
+ + " disk.")
+ public boolean streamingLogFileUploads;
+
+ @Option(
name = "experimental_build_event_expand_filesets",
defaultValue = "false",
documentationCategory = OptionDocumentationCategory.LOGGING,
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
index a1720d9..9932b15 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
@@ -15,33 +15,38 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileCompression;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/** Event reporting on statistics about the build. */
public class BuildToolLogs implements BuildEventWithOrderConstraint {
/** These values are posted as byte strings to the BEP. */
private final Collection<Pair<String, ByteString>> directValues;
- /**
- * These values are posted as URIs to the BEP; if these reference files, those are not uploaded.
- */
- private final Collection<Pair<String, String>> directUris;
+ /** These values are posted as Future URIs to the BEP. */
+ private final Collection<Pair<String, ListenableFuture<String>>> futureUris;
/**
* These values are local files that are uploaded if required, and turned into URIs as part of the
* process.
*/
private final Collection<LogFileEntry> logFiles;
+ private static final Logger logger = Logger.getLogger(BuildToolLogs.class.getName());
+
public BuildToolLogs(
Collection<Pair<String, ByteString>> directValues,
- Collection<Pair<String, String>> directUris,
+ Collection<Pair<String, ListenableFuture<String>>> futureUris,
Collection<LogFileEntry> logFiles) {
this.directValues = directValues;
- this.directUris = directUris;
+ this.futureUris = futureUris;
this.logFiles = logFiles;
}
@@ -65,6 +70,15 @@
}
@Override
+ public Collection<ListenableFuture<String>> remoteUploads() {
+ ImmutableList.Builder<ListenableFuture<String>> remoteUploads = ImmutableList.builder();
+ for (Pair<String, ListenableFuture<String>> uploadPair : futureUris) {
+ remoteUploads.add(uploadPair.getSecond());
+ }
+ return remoteUploads.build();
+ }
+
+ @Override
public BuildEventStreamProtos.BuildEvent asStreamProto(BuildEventContext converters) {
BuildEventStreamProtos.BuildToolLogs.Builder toolLogs =
BuildEventStreamProtos.BuildToolLogs.newBuilder();
@@ -75,12 +89,18 @@
.setContents(direct.getSecond())
.build());
}
- for (Pair<String, String> direct : directUris) {
- toolLogs.addLog(
- BuildEventStreamProtos.File.newBuilder()
- .setName(direct.getFirst())
- .setUri(direct.getSecond())
- .build());
+ for (Pair<String, ListenableFuture<String>> directFuturePair : futureUris) {
+ String name = directFuturePair.getFirst();
+ ListenableFuture<String> directFuture = directFuturePair.getSecond();
+ try {
+ toolLogs.addLog(
+ BuildEventStreamProtos.File.newBuilder()
+ .setName(name)
+ .setUri(Futures.getDone(directFuture))
+ .build());
+ } catch (ExecutionException e) {
+ logger.log(Level.WARNING, "Skipping build tool log upload " + name, e);
+ }
}
for (LogFileEntry logFile : logFiles) {
String uri = converters.pathConverter().apply(logFile.localPath);
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
index 1105be6..2beae6a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.buildeventstream;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
/** Wrapper class for a build event marking it as the final event in the protocol. */
@@ -40,6 +41,11 @@
}
@Override
+ public Collection<ListenableFuture<String>> remoteUploads() {
+ return event.remoteUploads();
+ }
+
+ @Override
public BuildEventStreamProtos.BuildEvent asStreamProto(BuildEventContext converters) {
return BuildEventStreamProtos.BuildEvent.newBuilder(event.asStreamProto(converters))
.setLastMessage(true)
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 8dbc050..e317cdf 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -237,14 +237,16 @@
BuildEvent event, ArtifactGroupNamer namer) {
checkNotNull(event);
+ ListenableFuture<PathConverter> converterFuture =
+ uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
return Futures.transform(
- uploader.uploadReferencedLocalFiles(event.referencedLocalFiles()),
- pathConverter -> {
+ Futures.allAsList(converterFuture, Futures.successfulAsList(event.remoteUploads())),
+ results -> {
BuildEventContext context =
new BuildEventContext() {
@Override
public PathConverter pathConverter() {
- return pathConverter;
+ return Futures.getUnchecked(converterFuture);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java b/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
index ab393e3..7b13a12 100644
--- a/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
+++ b/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
@@ -17,6 +17,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.analysis.ConfiguredTarget;
import com.google.devtools.build.lib.analysis.config.BuildConfigurationCollection;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileCompression;
@@ -294,7 +296,7 @@
*/
public static final class BuildToolLogCollection {
private final List<Pair<String, ByteString>> directValues = new ArrayList<>();
- private final List<Pair<String, String>> directUris = new ArrayList<>();
+ private final List<Pair<String, ListenableFuture<String>>> futureUris = new ArrayList<>();
private final List<LogFileEntry> localFiles = new ArrayList<>();
private boolean frozen;
@@ -316,7 +318,13 @@
public BuildToolLogCollection addUri(String name, String uri) {
Preconditions.checkState(!frozen);
- this.directUris.add(Pair.of(name, uri));
+ this.futureUris.add(Pair.of(name, Futures.immediateFuture(uri)));
+ return this;
+ }
+
+ public BuildToolLogCollection addUriFuture(String name, ListenableFuture<String> uriFuture) {
+ Preconditions.checkState(!frozen);
+ this.futureUris.add(Pair.of(name, uriFuture));
return this;
}
@@ -333,7 +341,7 @@
public BuildToolLogs toEvent() {
Preconditions.checkState(frozen);
- return new BuildToolLogs(directValues, directUris, localFiles);
+ return new BuildToolLogs(directValues, futureUris, localFiles);
}
/** For debugging. */
@@ -341,7 +349,7 @@
public String toString() {
return MoreObjects.toStringHelper(this)
.add("directValues", directValues)
- .add("directUris", directUris)
+ .add("futureUris", futureUris)
.add("localFiles", localFiles)
.toString();
}