ByteStreamUploader: Open files at the last possible moment.

Open files to upload only when the network is ready to accept data. Empirically, this greatly reduces the need to set `--bep_maximum_open_remote_upload_files` because the small-to-medium-sized files that are common in real-world builds can be opened, dumped onto the network, and closed in one shot.

Closes #15670.

PiperOrigin-RevId: 455074457
Change-Id: I243c7e6c51b2d415cdc0a00e7e71d06aefadbd2c
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 34d1213..4615397 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -336,36 +336,12 @@
               }
               if (committedSize > lastCommittedOffset) {
                 // We have made progress on this upload in the last request. Reset the backoff so
-                // that
-                // this request has a full deck of retries
+                // that this request has a full deck of retries
                 progressiveBackoff.reset();
               }
             }
             lastCommittedOffset = committedSize;
-            try {
-              chunker.seek(committedSize);
-            } catch (IOException e) {
-              try {
-                chunker.reset();
-              } catch (IOException resetException) {
-                e.addSuppressed(resetException);
-              }
-              String tooManyOpenFilesError = "Too many open files";
-              if (Ascii.toLowerCase(e.getMessage())
-                  .contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
-                String newMessage =
-                    "An IOException was thrown because the process opened too"
-                        + " many files. We recommend setting"
-                        + " --bep_maximum_open_remote_upload_files flag to a"
-                        + " number lower than your system default (run 'ulimit"
-                        + " -a' for *nix-based operating systems). Original"
-                        + " error message: "
-                        + e.getMessage();
-                return Futures.immediateFailedFuture(new IOException(newMessage, e));
-              }
-              return Futures.immediateFailedFuture(e);
-            }
-            return upload();
+            return upload(committedSize);
           },
           MoreExecutors.directExecutor());
     }
@@ -415,12 +391,14 @@
           MoreExecutors.directExecutor());
     }
 
-    private ListenableFuture<Long> upload() {
+    private ListenableFuture<Long> upload(long pos) {
       return channel.withChannelFuture(
           channel -> {
             SettableFuture<Long> uploadResult = SettableFuture.create();
             grpcContext.run(
-                () -> bsAsyncStub(channel).write(new Writer(resourceName, chunker, uploadResult)));
+                () ->
+                    bsAsyncStub(channel)
+                        .write(new Writer(resourceName, chunker, pos, uploadResult)));
             return uploadResult;
           });
     }
@@ -434,15 +412,18 @@
   private static final class Writer
       implements ClientResponseObserver<WriteRequest, WriteResponse>, Runnable {
     private final Chunker chunker;
+    private final long pos;
     private final String resourceName;
     private final SettableFuture<Long> uploadResult;
     private long committedSize = -1;
     private ClientCallStreamObserver<WriteRequest> requestObserver;
     private boolean first = true;
 
-    private Writer(String resourceName, Chunker chunker, SettableFuture<Long> uploadResult) {
+    private Writer(
+        String resourceName, Chunker chunker, long pos, SettableFuture<Long> uploadResult) {
       this.resourceName = resourceName;
       this.chunker = chunker;
+      this.pos = pos;
       this.uploadResult = uploadResult;
     }
 
@@ -459,6 +440,15 @@
         return;
       }
       while (requestObserver.isReady()) {
+        WriteRequest.Builder request = WriteRequest.newBuilder();
+        if (first) {
+          first = false;
+          if (!seekChunker()) {
+            return;
+          }
+          // Resource name only needs to be set on the first write for each file.
+          request.setResourceName(resourceName);
+        }
         Chunker.Chunk chunk;
         try {
           chunk = chunker.next();
@@ -467,17 +457,12 @@
           return;
         }
         boolean isLastChunk = !chunker.hasNext();
-        WriteRequest.Builder request =
-            WriteRequest.newBuilder()
+        requestObserver.onNext(
+            request
                 .setData(chunk.getData())
                 .setWriteOffset(chunk.getOffset())
-                .setFinishWrite(isLastChunk);
-        if (first) {
-          first = false;
-          // Resource name only needs to be set on the first write for each file.
-          request.setResourceName(resourceName);
-        }
-        requestObserver.onNext(request.build());
+                .setFinishWrite(isLastChunk)
+                .build());
         if (isLastChunk) {
           requestObserver.onCompleted();
           return;
@@ -485,6 +470,32 @@
       }
     }
 
+    private boolean seekChunker() {
+      try {
+        chunker.seek(pos);
+      } catch (IOException e) {
+        try {
+          chunker.reset();
+        } catch (IOException resetException) {
+          e.addSuppressed(resetException);
+        }
+        String tooManyOpenFilesError = "Too many open files";
+        if (Ascii.toLowerCase(e.getMessage()).contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
+          String newMessage =
+              "An IOException was thrown because the process opened too many files. We recommend"
+                  + " setting --bep_maximum_open_remote_upload_files flag to a number lower than"
+                  + " your system default (run 'ulimit -a' for *nix-based operating systems)."
+                  + " Original error message: "
+                  + e.getMessage();
+          e = new IOException(newMessage, e);
+        }
+        uploadResult.setException(e);
+        requestObserver.cancel("failed to seek chunk", e);
+        return false;
+      }
+      return true;
+    }
+
     @Override
     public void onNext(WriteResponse response) {
       committedSize = response.getCommittedSize();
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index de2ff4d..ca3175d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -30,6 +30,7 @@
 import com.google.bytestream.ByteStreamProto.WriteRequest;
 import com.google.bytestream.ByteStreamProto.WriteResponse;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.hash.HashCode;
@@ -77,7 +78,6 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.junit.After;
@@ -782,23 +782,18 @@
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Mockito.mock(Chunker.class);
     Digest digest = DIGEST_UTIL.compute(blob);
-    AtomicLong committedOffset = new AtomicLong(0);
-    Mockito.doThrow(new IOException("Too many open files"))
-        .when(chunker)
-        .seek(committedOffset.get());
+    Mockito.doThrow(new IOException("Too many open files")).when(chunker).seek(0);
     Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes());
+    serviceRegistry.addService(new MaybeFailOnceUploadService(ImmutableMap.of()));
 
-    try {
-      uploader.uploadBlob(context, digest, chunker);
-      fail("Should have thrown an exception.");
-    } catch (IOException e) {
-      String newMessage =
-          "An IOException was thrown because the process opened too many files. We recommend"
-              + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
-              + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
-              + " message: Too many open files";
-      assertThat(newMessage).isEqualTo(e.getMessage());
-    }
+    String newMessage =
+        "An IOException was thrown because the process opened too many files. We recommend setting"
+            + " --bep_maximum_open_remote_upload_files flag to a number lower than your system"
+            + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:"
+            + " Too many open files";
+    assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)))
+        .hasMessageThat()
+        .isEqualTo(newMessage);
   }
 
   @Test