Support streaming upload to BEP backends.

Events may now report remote uploads which do not necessarily correspond to an in-disk file.

PiperOrigin-RevId: 281082666
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index 4023912..e03162e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -29,6 +29,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.AckReceivedCommand;
 import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.EventLoopCommand;
 import com.google.devtools.build.lib.buildeventservice.BuildEventServiceUploaderCommands.OpenStreamCommand;
@@ -64,7 +65,10 @@
 import java.util.Deque;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.logging.Level;
@@ -100,6 +104,11 @@
   private final EventBus eventBus;
   private boolean startedClose = false;
 
+  private final ScheduledExecutorService timeoutExecutor =
+      MoreExecutors.listeningDecorator(
+          Executors.newSingleThreadScheduledExecutor(
+              new ThreadFactoryBuilder().setNameFormat("bes-uploader-timeout-%d").build()));
+
   /**
    * The event queue contains two types of events: - Build events, sorted by sequence number, that
    * should be sent to the server - Command events that are used by {@link #publishBuildEvents()} to
@@ -309,6 +318,7 @@
       throw e;
     } finally {
       localFileUploader.shutdown();
+      MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
       closeFuture.set(null);
     }
   }
@@ -398,7 +408,7 @@
               SendRegularBuildEventCommand buildEvent = (SendRegularBuildEventCommand) event;
               ackQueue.addLast(buildEvent);
 
-              PathConverter pathConverter = waitForLocalFileUploads(buildEvent);
+              PathConverter pathConverter = waitForUploads(buildEvent);
 
               BuildEventStreamProtos.BuildEvent serializedRegularBuildEvent =
                   createSerializedRegularBuildEvent(pathConverter, buildEvent);
@@ -599,16 +609,28 @@
     }
   }
 
-  private PathConverter waitForLocalFileUploads(SendRegularBuildEventCommand orderedBuildEvent)
+  private PathConverter waitForUploads(SendRegularBuildEventCommand orderedBuildEvent)
       throws LocalFileUploadException, InterruptedException {
     try {
-      // Wait for the local file upload to have been completed.
+      // Wait for the local file and pending remote uploads to complete.
+      ListenableFuture<?> remoteUploads =
+          Futures.successfulAsList(orderedBuildEvent.getEvent().remoteUploads());
+
+      if (localFileUploader.timeout() != null) {
+        remoteUploads =
+            Futures.withTimeout(
+                remoteUploads,
+                localFileUploader.timeout().toMillis(),
+                TimeUnit.MILLISECONDS,
+                timeoutExecutor);
+      }
+      // TODO(bazel-team): Consider failing softy if remote upload fails.
+      remoteUploads.get();
       return orderedBuildEvent.localFileUploadProgress().get();
     } catch (ExecutionException e) {
       logger.log(
           Level.WARNING,
-          String.format(
-              "Failed to upload local files referenced by build event: %s", e.getMessage()),
+          String.format("Failed to upload files referenced by build event: %s", e.getMessage()),
           e);
       Throwables.throwIfUnchecked(e.getCause());
       throw new LocalFileUploadException(e.getCause());
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
index 0eb70ec..c92b637 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEvent.java
@@ -18,6 +18,7 @@
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.vfs.Path;
 import java.util.Collection;
@@ -115,6 +116,15 @@
   }
 
   /**
+   * Returns a collection of URI futures corresponding to in-flight file uploads.
+   *
+   * <p>The files here are considered "remote" in that they may not correspond to on-disk files.
+   */
+  default Collection<ListenableFuture<String>> remoteUploads() {
+    return ImmutableList.of();
+  }
+
+  /**
    * Provide a binary representation of the event.
    *
    * <p>Provide a presentation of the event according to the specified binary format, as appropriate
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
index 872692b..d941f7e 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
@@ -14,11 +14,18 @@
 package com.google.devtools.build.lib.buildeventstream;
 
 import com.google.common.collect.Maps;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
 import com.google.devtools.build.lib.vfs.Path;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import javax.annotation.Nullable;
 
 /** Uploads artifacts referenced by the Build Event Protocol (BEP). */
 public interface BuildEventArtifactUploader {
@@ -33,6 +40,40 @@
    */
   ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files);
 
+  /** The timeout on individual file uploads, or null if none. */
+  @Nullable
+  default Duration timeout() {
+    return null;
+  }
+
+  /** The context associated with an in-flight remote upload. */
+  interface UploadContext {
+
+    /** The {@link OutputStream} to stream the file contents to. */
+    OutputStream getOutputStream();
+
+    /** The future URI of the completed upload. */
+    ListenableFuture<String> uriFuture();
+  }
+
+  /** Initiate a streaming upload to the remote storage. */
+  default UploadContext startUpload(LocalFileType type) {
+    return EMPTY_UPLOAD;
+  }
+
+  UploadContext EMPTY_UPLOAD =
+      new UploadContext() {
+        @Override
+        public OutputStream getOutputStream() {
+          return ByteStreams.nullOutputStream();
+        }
+
+        @Override
+        public ListenableFuture<String> uriFuture() {
+          return Futures.immediateFailedFuture(new IOException("No available uploader"));
+        }
+      };
+
   /**
    * Shutdown any resources associated with the uploader.
    */
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
index fb8fc2e..dab2dd3 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventProtocolOptions.java
@@ -42,6 +42,16 @@
   public String buildEventUploadStrategy;
 
   @Option(
+      name = "experimental_stream_log_file_uploads",
+      defaultValue = "false",
+      documentationCategory = OptionDocumentationCategory.LOGGING,
+      effectTags = {OptionEffectTag.AFFECTS_OUTPUTS},
+      help =
+          "Stream log file uploads directly to the remote storage rather than writing them to"
+              + " disk.")
+  public boolean streamingLogFileUploads;
+
+  @Option(
       name = "experimental_build_event_expand_filesets",
       defaultValue = "false",
       documentationCategory = OptionDocumentationCategory.LOGGING,
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
index a1720d9..9932b15 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
@@ -15,33 +15,38 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileCompression;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
 import com.google.devtools.build.lib.util.Pair;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
 import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /** Event reporting on statistics about the build. */
 public class BuildToolLogs implements BuildEventWithOrderConstraint {
   /** These values are posted as byte strings to the BEP. */
   private final Collection<Pair<String, ByteString>> directValues;
-  /**
-   * These values are posted as URIs to the BEP; if these reference files, those are not uploaded.
-   */
-  private final Collection<Pair<String, String>> directUris;
+  /** These values are posted as Future URIs to the BEP. */
+  private final Collection<Pair<String, ListenableFuture<String>>> futureUris;
   /**
    * These values are local files that are uploaded if required, and turned into URIs as part of the
    * process.
    */
   private final Collection<LogFileEntry> logFiles;
 
+  private static final Logger logger = Logger.getLogger(BuildToolLogs.class.getName());
+
   public BuildToolLogs(
       Collection<Pair<String, ByteString>> directValues,
-      Collection<Pair<String, String>> directUris,
+      Collection<Pair<String, ListenableFuture<String>>> futureUris,
       Collection<LogFileEntry> logFiles) {
     this.directValues = directValues;
-    this.directUris = directUris;
+    this.futureUris = futureUris;
     this.logFiles = logFiles;
   }
 
@@ -65,6 +70,15 @@
   }
 
   @Override
+  public Collection<ListenableFuture<String>> remoteUploads() {
+    ImmutableList.Builder<ListenableFuture<String>> remoteUploads = ImmutableList.builder();
+    for (Pair<String, ListenableFuture<String>> uploadPair : futureUris) {
+      remoteUploads.add(uploadPair.getSecond());
+    }
+    return remoteUploads.build();
+  }
+
+  @Override
   public BuildEventStreamProtos.BuildEvent asStreamProto(BuildEventContext converters) {
     BuildEventStreamProtos.BuildToolLogs.Builder toolLogs =
         BuildEventStreamProtos.BuildToolLogs.newBuilder();
@@ -75,12 +89,18 @@
               .setContents(direct.getSecond())
               .build());
     }
-    for (Pair<String, String> direct : directUris) {
-      toolLogs.addLog(
-          BuildEventStreamProtos.File.newBuilder()
-              .setName(direct.getFirst())
-              .setUri(direct.getSecond())
-              .build());
+    for (Pair<String, ListenableFuture<String>> directFuturePair : futureUris) {
+      String name = directFuturePair.getFirst();
+      ListenableFuture<String> directFuture = directFuturePair.getSecond();
+      try {
+        toolLogs.addLog(
+            BuildEventStreamProtos.File.newBuilder()
+                .setName(name)
+                .setUri(Futures.getDone(directFuture))
+                .build());
+      } catch (ExecutionException e) {
+        logger.log(Level.WARNING, "Skipping build tool log upload " + name, e);
+      }
     }
     for (LogFileEntry logFile : logFiles) {
       String uri = converters.pathConverter().apply(logFile.localPath);
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
index 1105be6..2beae6a 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/LastBuildEvent.java
@@ -14,6 +14,7 @@
 
 package com.google.devtools.build.lib.buildeventstream;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.Collection;
 
 /** Wrapper class for a build event marking it as the final event in the protocol. */
@@ -40,6 +41,11 @@
   }
 
   @Override
+  public Collection<ListenableFuture<String>> remoteUploads() {
+    return event.remoteUploads();
+  }
+
+  @Override
   public BuildEventStreamProtos.BuildEvent asStreamProto(BuildEventContext converters) {
     return BuildEventStreamProtos.BuildEvent.newBuilder(event.asStreamProto(converters))
         .setLastMessage(true)
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index 8dbc050..e317cdf 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -237,14 +237,16 @@
       BuildEvent event, ArtifactGroupNamer namer) {
     checkNotNull(event);
 
+    ListenableFuture<PathConverter> converterFuture =
+        uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
     return Futures.transform(
-        uploader.uploadReferencedLocalFiles(event.referencedLocalFiles()),
-        pathConverter -> {
+        Futures.allAsList(converterFuture, Futures.successfulAsList(event.remoteUploads())),
+        results -> {
           BuildEventContext context =
               new BuildEventContext() {
                 @Override
                 public PathConverter pathConverter() {
-                  return pathConverter;
+                  return Futures.getUnchecked(converterFuture);
                 }
 
                 @Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java b/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
index ab393e3..7b13a12 100644
--- a/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
+++ b/src/main/java/com/google/devtools/build/lib/buildtool/BuildResult.java
@@ -17,6 +17,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.analysis.ConfiguredTarget;
 import com.google.devtools.build.lib.analysis.config.BuildConfigurationCollection;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileCompression;
@@ -294,7 +296,7 @@
    */
   public static final class BuildToolLogCollection {
     private final List<Pair<String, ByteString>> directValues = new ArrayList<>();
-    private final List<Pair<String, String>> directUris = new ArrayList<>();
+    private final List<Pair<String, ListenableFuture<String>>> futureUris = new ArrayList<>();
     private final List<LogFileEntry> localFiles = new ArrayList<>();
     private boolean frozen;
 
@@ -316,7 +318,13 @@
 
     public BuildToolLogCollection addUri(String name, String uri) {
       Preconditions.checkState(!frozen);
-      this.directUris.add(Pair.of(name, uri));
+      this.futureUris.add(Pair.of(name, Futures.immediateFuture(uri)));
+      return this;
+    }
+
+    public BuildToolLogCollection addUriFuture(String name, ListenableFuture<String> uriFuture) {
+      Preconditions.checkState(!frozen);
+      this.futureUris.add(Pair.of(name, uriFuture));
       return this;
     }
 
@@ -333,7 +341,7 @@
 
     public BuildToolLogs toEvent() {
       Preconditions.checkState(frozen);
-      return new BuildToolLogs(directValues, directUris, localFiles);
+      return new BuildToolLogs(directValues, futureUris, localFiles);
     }
 
     /** For debugging. */
@@ -341,7 +349,7 @@
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("directValues", directValues)
-          .add("directUris", directUris)
+          .add("futureUris", futureUris)
           .add("localFiles", localFiles)
           .toString();
     }