Push remote upload consolidation into the Uploader when using --experimental_stream_log_file_uploads

PiperOrigin-RevId: 281536757
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 9263c04..58474d7 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -74,7 +74,7 @@
 
   @Override
   public BuildEventArtifactUploader getUploader() {
-    return besUploader.getLocalFileUploader();
+    return besUploader.getBuildEventUploader();
   }
 
   @Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index e03162e..8dacd2b 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -94,7 +94,7 @@
   private static final int DELAY_MILLIS = 1000;
 
   private final BuildEventServiceClient besClient;
-  private final BuildEventArtifactUploader localFileUploader;
+  private final BuildEventArtifactUploader buildEventUploader;
   private final BuildEventServiceProtoUtil besProtoUtil;
   private final BuildEventProtocolOptions buildEventProtocolOptions;
   private final boolean publishLifecycleEvents;
@@ -154,7 +154,7 @@
       ArtifactGroupNamer namer,
       EventBus eventBus) {
     this.besClient = besClient;
-    this.localFileUploader = localFileUploader;
+    this.buildEventUploader = localFileUploader;
     this.besProtoUtil = besProtoUtil;
     this.buildEventProtocolOptions = buildEventProtocolOptions;
     this.publishLifecycleEvents = publishLifecycleEvents;
@@ -168,8 +168,8 @@
         () -> halfCloseFuture.setFuture(closeFuture), MoreExecutors.directExecutor());
   }
 
-  BuildEventArtifactUploader getLocalFileUploader() {
-    return localFileUploader;
+  BuildEventArtifactUploader getBuildEventUploader() {
+    return buildEventUploader;
   }
 
   /** Enqueues an event for uploading to a BES backend. */
@@ -177,7 +177,7 @@
     // This needs to happen outside a synchronized block as it may trigger
     // stdout/stderr and lead to a deadlock. See b/109725432
     ListenableFuture<PathConverter> localFileUploadFuture =
-        localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
+        buildEventUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
 
     // The generation of the sequence number and the addition to the {@link #eventQueue} should be
     // atomic since BES expects the events in that exact order.
@@ -317,7 +317,7 @@
       logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
       throw e;
     } finally {
-      localFileUploader.shutdown();
+      buildEventUploader.shutdown();
       MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
       closeFuture.set(null);
     }
@@ -613,19 +613,9 @@
       throws LocalFileUploadException, InterruptedException {
     try {
       // Wait for the local file and pending remote uploads to complete.
-      ListenableFuture<?> remoteUploads =
-          Futures.successfulAsList(orderedBuildEvent.getEvent().remoteUploads());
-
-      if (localFileUploader.timeout() != null) {
-        remoteUploads =
-            Futures.withTimeout(
-                remoteUploads,
-                localFileUploader.timeout().toMillis(),
-                TimeUnit.MILLISECONDS,
-                timeoutExecutor);
-      }
-      // TODO(bazel-team): Consider failing softy if remote upload fails.
-      remoteUploads.get();
+      buildEventUploader
+          .waitForRemoteUploads(orderedBuildEvent.getEvent().remoteUploads(), timeoutExecutor)
+          .get();
       return orderedBuildEvent.localFileUploadProgress().get();
     } catch (ExecutionException e) {
       logger.log(
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
index 3d4b1c6..aeb6afd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
@@ -23,9 +23,9 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.time.Duration;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
@@ -42,12 +42,6 @@
    */
   ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files);
 
-  /** The timeout on individual file uploads, or null if none. */
-  @Nullable
-  default Duration timeout() {
-    return null;
-  }
-
   /** The context associated with an in-flight remote upload. */
   interface UploadContext {
 
@@ -109,4 +103,14 @@
     }
     return upload(localFileMap);
   }
+
+  /**
+   * Blocks on the completion of pending remote uploads, enforcing the relevant timeout if
+   * applicable.
+   */
+  default ListenableFuture<?> waitForRemoteUploads(
+      Collection<ListenableFuture<String>> remoteUploads,
+      ScheduledExecutorService timeoutExecutor) {
+    return Futures.allAsList(remoteUploads);
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
index 9932b15..b99a1b9 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
@@ -93,11 +93,17 @@
       String name = directFuturePair.getFirst();
       ListenableFuture<String> directFuture = directFuturePair.getSecond();
       try {
-        toolLogs.addLog(
-            BuildEventStreamProtos.File.newBuilder()
-                .setName(name)
-                .setUri(Futures.getDone(directFuture))
-                .build());
+        String uri =
+            directFuture.isDone() && !directFuture.isCancelled()
+                ? Futures.getDone(directFuture)
+                : null;
+        if (uri != null) {
+          toolLogs.addLog(
+              BuildEventStreamProtos.File.newBuilder()
+                  .setName(name)
+                  .setUri(Futures.getDone(directFuture))
+                  .build());
+        }
       } catch (ExecutionException e) {
         logger.log(Level.WARNING, "Skipping build tool log upload " + name, e);
       }
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index e317cdf..efef270 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -23,6 +23,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
 import com.google.devtools.build.lib.buildeventstream.BuildEvent;
 import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -40,7 +41,9 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +65,11 @@
   private final SequentialWriter writer;
   private final ArtifactGroupNamer namer;
 
+  private final ScheduledExecutorService timeoutExecutor =
+      MoreExecutors.listeningDecorator(
+          Executors.newSingleThreadScheduledExecutor(
+              new ThreadFactoryBuilder().setNameFormat("file-uploader-timeout-%d").build()));
+
   FileTransport(
       BufferedOutputStream outputStream,
       BuildEventProtocolOptions options,
@@ -69,7 +77,8 @@
       ArtifactGroupNamer namer) {
     this.uploader = uploader;
     this.options = options;
-    this.writer = new SequentialWriter(outputStream, this::serializeEvent, uploader);
+    this.writer =
+        new SequentialWriter(outputStream, this::serializeEvent, uploader, timeoutExecutor);
     this.namer = namer;
   }
 
@@ -96,18 +105,20 @@
     final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
         new LinkedBlockingDeque<>();
 
+    private ScheduledExecutorService timeoutExecutor;
+
     SequentialWriter(
         BufferedOutputStream outputStream,
         Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
-        BuildEventArtifactUploader uploader) {
-      checkNotNull(outputStream);
-      checkNotNull(serializeFunc);
+        BuildEventArtifactUploader uploader,
+        ScheduledExecutorService timeoutExecutor) {
       checkNotNull(uploader);
 
-      this.out = outputStream;
+      this.out = checkNotNull(outputStream);
       this.writerThread = new Thread(this, "bep-local-writer");
-      this.serializeFunc = serializeFunc;
-      this.uploader = uploader;
+      this.serializeFunc = checkNotNull(serializeFunc);
+      this.uploader = checkNotNull(uploader);
+      this.timeoutExecutor = checkNotNull(timeoutExecutor);
       writerThread.start();
     }
 
@@ -142,6 +153,7 @@
             out.close();
           } finally {
             uploader.shutdown();
+            timeoutExecutor.shutdown();
           }
         } catch (IOException e) {
           logger.log(Level.SEVERE, "Failed to close BEP file output stream.", e);
@@ -239,8 +251,10 @@
 
     ListenableFuture<PathConverter> converterFuture =
         uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
+    ListenableFuture<?> remoteUploads =
+        uploader.waitForRemoteUploads(event.remoteUploads(), timeoutExecutor);
     return Futures.transform(
-        Futures.allAsList(converterFuture, Futures.successfulAsList(event.remoteUploads())),
+        Futures.allAsList(converterFuture, remoteUploads),
         results -> {
           BuildEventContext context =
               new BuildEventContext() {