BEP artifact upload opens all files upfront causing os to throw 'too many files open' exception

To address this problem, added --bep_maximum_open_remote_upload_files flag so users have control over the max number of files the BEP has open at once during remote cache artifact upload. Users can set the flag to a limit their native os can handle.

PiperOrigin-RevId: 415589490
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index c488f14..2087492 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -27,6 +27,7 @@
 import com.google.bytestream.ByteStreamProto.WriteRequest;
 import com.google.bytestream.ByteStreamProto.WriteResponse;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Ascii;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.flogger.GoogleLogger;
@@ -59,6 +60,7 @@
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -93,6 +95,8 @@
   @GuardedBy("lock")
   private boolean isShutdown;
 
+  @Nullable private final Semaphore openedFilePermits;
+
   /**
    * Creates a new instance.
    *
@@ -109,14 +113,15 @@
       ReferenceCountedChannel channel,
       CallCredentialsProvider callCredentialsProvider,
       long callTimeoutSecs,
-      RemoteRetrier retrier) {
+      RemoteRetrier retrier,
+      int maximumOpenFiles) {
     checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
-
     this.instanceName = instanceName;
     this.channel = channel;
     this.callCredentialsProvider = callCredentialsProvider;
     this.callTimeoutSecs = callTimeoutSecs;
     this.retrier = retrier;
+    this.openedFilePermits = maximumOpenFiles != -1 ? new Semaphore(maximumOpenFiles) : null;
   }
 
   @VisibleForTesting
@@ -254,7 +259,6 @@
       if (inProgress != null) {
         return inProgress;
       }
-
       ListenableFuture<Void> uploadResult =
           Futures.transform(
               startAsyncUpload(context, digest, chunker),
@@ -339,6 +343,16 @@
             retrier,
             resourceName,
             chunker);
+    if (openedFilePermits != null) {
+      try {
+        openedFilePermits.acquire();
+      } catch (InterruptedException e) {
+        return Futures.immediateFailedFuture(
+            new InterruptedException(
+                "Unexpected interrupt while acquiring open file permit. Original error message: "
+                    + e.getMessage()));
+      }
+    }
     ListenableFuture<Void> currUpload = newUpload.start();
     currUpload.addListener(
         () -> {
@@ -371,7 +385,7 @@
     return this;
   }
 
-  private static class AsyncUpload {
+  private class AsyncUpload {
 
     private final RemoteActionExecutionContext context;
     private final Channel channel;
@@ -420,6 +434,18 @@
                           } catch (IOException resetException) {
                             e.addSuppressed(resetException);
                           }
+                          String tooManyOpenFilesError = "Too many open files";
+                          if (Ascii.toLowerCase(e.getMessage())
+                              .contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
+                            String newMessage =
+                                "An IOException was thrown because the process opened too many"
+                                    + " files. We recommend setting"
+                                    + " --bep_maximum_open_remote_upload_files flag to a number"
+                                    + " lower than your system default (run 'ulimit -a' for"
+                                    + " *nix-based operating systems). Original error message: "
+                                    + e.getMessage();
+                            return Futures.immediateFailedFuture(new IOException(newMessage, e));
+                          }
                           return Futures.immediateFailedFuture(e);
                         }
                         if (chunker.hasNext()) {
@@ -429,7 +455,9 @@
                       },
                       progressiveBackoff),
               callCredentialsProvider);
-
+      if (openedFilePermits != null) {
+        callFuture.addListener(openedFilePermits::release, MoreExecutors.directExecutor());
+      }
       return Futures.transformAsync(
           callFuture,
           (result) -> {
@@ -632,4 +660,9 @@
       }
     }
   }
+
+  @VisibleForTesting
+  public Semaphore getOpenedFilePermits() {
+    return openedFilePermits;
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index d1024a3..19c7a0f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -38,12 +38,14 @@
 /**
  * Splits a data source into one or more {@link Chunk}s of at most {@code chunkSize} bytes.
  *
- * <p>After a data source has been fully consumed, that is until {@link #hasNext()} returns
- * {@code false}, the chunker closes the underlying data source (i.e. file) itself. However, in
- * case of error or when a data source does not get fully consumed, a user must call
- * {@link #reset()} manually.
+ * <p>After a data source has been fully consumed, that is until {@link #hasNext()} returns {@code
+ * false}, the chunker closes the underlying data source (i.e. file) itself. However, in case of
+ * error or when a data source does not get fully consumed, a user must call {@link #reset()}
+ * manually.
+ *
+ * <p>This class should not be extended - it's only non-final for testing.
  */
-public final class Chunker {
+public class Chunker {
 
   private static int defaultChunkSize = 1024 * 16;
 
@@ -104,7 +106,7 @@
   private final int chunkSize;
   private final Chunk emptyChunk;
 
-  private ChunkerInputStream data;
+  @VisibleForTesting protected ChunkerInputStream data;
   private long offset;
   private byte[] chunkCache;
 
@@ -274,7 +276,13 @@
     public Builder setInput(byte[] data) {
       checkState(inputStream == null);
       size = data.length;
-      inputStream = () -> new ByteArrayInputStream(data);
+      setInputSupplier(() -> new ByteArrayInputStream(data));
+      return this;
+    }
+
+    @VisibleForTesting
+    protected final Builder setInputSupplier(Supplier<InputStream> inputStream) {
+      this.inputStream = inputStream;
       return this;
     }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index de7ed30..4e2db8d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -528,7 +528,8 @@
             cacheChannel.retain(),
             callCredentialsProvider,
             remoteOptions.remoteTimeout.getSeconds(),
-            retrier);
+            retrier,
+            remoteOptions.maximumOpenFiles);
 
     cacheChannel.release();
     RemoteCacheClient cacheClient =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
index d7dc2ec..61a53db 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
@@ -565,6 +565,14 @@
               + "that loads objects from the CAS on demand.")
   public String remoteDownloadSymlinkTemplate;
 
+  @Option(
+      name = "bep_maximum_open_remote_upload_files",
+      defaultValue = "-1",
+      documentationCategory = OptionDocumentationCategory.OUTPUT_PARAMETERS,
+      effectTags = {OptionEffectTag.AFFECTS_OUTPUTS},
+      help = "Maximum number of open files allowed during BEP artifact upload.")
+  public int maximumOpenFiles;
+
   // The below options are not configurable by users, only tests.
   // This is part of the effort to reduce the overall number of flags.
 
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index a409604..fdf575e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -178,7 +178,55 @@
     ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
     ByteStreamUploader uploader =
         new ByteStreamUploader(
-            "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+            "instance",
+            refCntChannel,
+            CallCredentialsProvider.NO_CREDENTIALS,
+            3,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
+    ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
+
+    PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
+    for (Path file : filesToUpload.keySet()) {
+      String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
+      long size = file.getFileSize();
+      String conversion = pathConverter.apply(file);
+      assertThat(conversion)
+          .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
+    }
+
+    artifactUploader.release();
+
+    assertThat(uploader.refCnt()).isEqualTo(0);
+    assertThat(refCntChannel.isShutdown()).isTrue();
+  }
+
+  @Test
+  public void uploadsShouldWork_fewerPermitsThanUploads() throws Exception {
+    int numUploads = 2;
+    Map<HashCode, byte[]> blobsByHash = new HashMap<>();
+    Map<Path, LocalFile> filesToUpload = new HashMap<>();
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      Path file = fs.getPath("/file" + i);
+      OutputStream out = file.getOutputStream();
+      int blobSize = rand.nextInt(100) + 1;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      out.write(blob);
+      out.close();
+      blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
+      filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+    }
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
+    // number of permits is less than number of uploads to affirm permit is released
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, 1);
     ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
 
     PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
@@ -207,7 +255,12 @@
     ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
     ByteStreamUploader uploader =
         new ByteStreamUploader(
-            "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+            "instance",
+            refCntChannel,
+            CallCredentialsProvider.NO_CREDENTIALS,
+            3,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
     ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
 
     PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
@@ -270,7 +323,12 @@
     ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
     ByteStreamUploader uploader =
         new ByteStreamUploader(
-            "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+            "instance",
+            refCntChannel,
+            CallCredentialsProvider.NO_CREDENTIALS,
+            3,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
     ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
 
     artifactUploader.upload(filesToUpload).get();
@@ -298,7 +356,12 @@
     ByteStreamUploader uploader =
         spy(
             new ByteStreamUploader(
-                "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier));
+                "instance",
+                refCntChannel,
+                CallCredentialsProvider.NO_CREDENTIALS,
+                3,
+                retrier,
+                /*maximumOpenFiles=*/ -1));
     RemoteActionInputFetcher actionInputFetcher = Mockito.mock(RemoteActionInputFetcher.class);
     ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
 
@@ -356,7 +419,12 @@
     ByteStreamUploader uploader =
         spy(
             new ByteStreamUploader(
-                "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier));
+                "instance",
+                refCntChannel,
+                CallCredentialsProvider.NO_CREDENTIALS,
+                3,
+                retrier,
+                /*maximumOpenFiles=*/ -1));
     doReturn(Futures.immediateFuture(null))
         .when(uploader)
         .uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean());
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 15cc335..4865af2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote;
 
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertThrows;
@@ -80,6 +81,8 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
@@ -162,7 +165,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -170,55 +174,7 @@
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(
-        new ByteStreamImplBase() {
-          @Override
-          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-            return new StreamObserver<WriteRequest>() {
-
-              byte[] receivedData = new byte[blob.length];
-              long nextOffset = 0;
-
-              @Override
-              public void onNext(WriteRequest writeRequest) {
-                if (nextOffset == 0) {
-                  assertThat(writeRequest.getResourceName()).isNotEmpty();
-                  assertThat(writeRequest.getResourceName()).startsWith(INSTANCE_NAME + "/uploads");
-                  assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
-                } else {
-                  assertThat(writeRequest.getResourceName()).isEmpty();
-                }
-
-                assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
-
-                ByteString data = writeRequest.getData();
-
-                System.arraycopy(
-                    data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
-
-                nextOffset += data.size();
-                boolean lastWrite = blob.length == nextOffset;
-                assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-              }
-
-              @Override
-              public void onError(Throwable throwable) {
-                fail("onError should never be called.");
-              }
-
-              @Override
-              public void onCompleted() {
-                assertThat(nextOffset).isEqualTo(blob.length);
-                assertThat(receivedData).isEqualTo(blob);
-
-                WriteResponse response =
-                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-                streamObserver.onNext(response);
-                streamObserver.onCompleted();
-              }
-            };
-          }
-        });
+    serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
 
     uploader.uploadBlob(context, hash, chunker, true);
 
@@ -239,7 +195,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -355,7 +312,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             300,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -480,7 +438,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             1,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -539,7 +498,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -610,7 +570,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -648,7 +609,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -703,7 +665,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             300,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -735,7 +698,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     int numUploads = 10;
     Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
@@ -759,6 +723,118 @@
   }
 
   @Test
+  public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
+    byte[] blob = new byte[CHUNK_SIZE];
+    Chunker chunker = Mockito.mock(Chunker.class);
+    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    AtomicLong committedOffset = new AtomicLong(0);
+    Mockito.doThrow(new IOException("Too many open files"))
+        .when(chunker)
+        .seek(committedOffset.get());
+    Mockito.when(chunker.getSize()).thenReturn(1L);
+
+    try {
+      uploader.uploadBlob(context, hash, chunker, true);
+      fail("Should have thrown an exception.");
+    } catch (IOException e) {
+      String newMessage =
+          "An IOException was thrown because the process opened too many files. We recommend"
+              + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
+              + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
+              + " message: Too many open files";
+      assertThat(newMessage).isEqualTo(e.getMessage());
+    }
+  }
+
+  @Test
+  public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAllPermits()
+      throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    // number of permits is less than number of uploads to affirm permit is released
+    int maximumOpenFiles = 999;
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            maximumOpenFiles);
+
+    assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999);
+
+    CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
+    int numUploads = 1000;
+    Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      Chunker chunker =
+          TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+      chunkers.put(hash, chunker);
+      blobsByHash.put(hash, blob);
+    }
+
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    uploader.uploadBlobs(context, chunkers, true);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
+  }
+
+  @Test
+  public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
+    assertThat(uploader.getOpenedFilePermits()).isNull();
+
+    int numUploads = 10;
+    Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+      chunkers.put(hash, chunker);
+      blobsByHash.put(hash, blob);
+    }
+
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    uploader.uploadBlobs(context, chunkers, true);
+    blockUntilInternalStateConsistent(uploader);
+    assertThat(uploader.getOpenedFilePermits()).isNull();
+  }
+
+  @Test
   public void contextShouldBePreservedUponRetries() throws Exception {
     // We upload blobs with different context, and retry 3 times for each upload.
     // We verify that the correct metadata is passed to the server with every blob.
@@ -770,7 +846,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
     Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(toUpload.size());
@@ -901,7 +978,8 @@
                 }),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -962,7 +1040,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 10];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1028,7 +1107,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1061,7 +1141,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     CountDownLatch cancellations = new CountDownLatch(2);
 
@@ -1125,7 +1206,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1163,7 +1245,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1206,7 +1289,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     AtomicInteger numCalls = new AtomicInteger();
 
@@ -1242,7 +1326,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1320,7 +1405,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1401,7 +1487,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1458,7 +1545,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1527,7 +1615,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1736,4 +1825,65 @@
       Thread.sleep(1);
     }
   }
+
+  /* Custom Chunker used to track number of open files */
+  private static class TestChunker extends Chunker {
+
+    TestChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
+      super(dataSupplier, size, chunkSize, compressed);
+    }
+
+    public static Builder builder(CustomFileTracker customFileTracker) {
+      return new TestChunkerBuilder(customFileTracker);
+    }
+
+    private static class TestChunkerBuilder extends Chunker.Builder {
+      private final CustomFileTracker customFileTracker;
+
+      TestChunkerBuilder(CustomFileTracker customFileTracker) {
+        this.customFileTracker = customFileTracker;
+      }
+
+      @Override
+      public Chunker.Builder setInput(byte[] existingData) {
+        return setInputSupplier(
+            () -> new TestByteArrayInputStream(existingData, customFileTracker));
+      }
+    }
+  }
+
+  private static class TestByteArrayInputStream extends ByteArrayInputStream {
+    private final CustomFileTracker customFileTracker;
+
+    TestByteArrayInputStream(byte[] buf, CustomFileTracker customFileTracker) {
+      super(buf);
+      this.customFileTracker = customFileTracker;
+      customFileTracker.incrementOpenFiles();
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      customFileTracker.decrementOpenFiles();
+    }
+  }
+
+  private static class CustomFileTracker {
+    private final AtomicInteger openFiles = new AtomicInteger(0);
+    private final int maxOpenFiles;
+
+    CustomFileTracker(int maxOpenFiles) {
+      this.maxOpenFiles = maxOpenFiles;
+    }
+
+    private void incrementOpenFiles() {
+      openFiles.getAndIncrement();
+      checkState(openFiles.get() <= maxOpenFiles);
+    }
+
+    private void decrementOpenFiles() {
+      openFiles.getAndDecrement();
+      checkState(openFiles.get() >= 0);
+    }
+  }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index 5975f5e..43e38e2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -251,7 +251,8 @@
             channel.retain(),
             callCredentialsProvider,
             remoteOptions.remoteTimeout.getSeconds(),
-            retrier);
+            retrier,
+            remoteOptions.maximumOpenFiles);
     return new GrpcCacheClient(
         channel.retain(), callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL, uploader);
   }
@@ -338,7 +339,7 @@
 
   @Test
   public void testDownloadBlobSingleChunk() throws Exception {
-    final GrpcCacheClient client = newClient();
+    GrpcCacheClient client = newClient();
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -355,7 +356,7 @@
 
   @Test
   public void testDownloadBlobMultipleChunks() throws Exception {
-    final GrpcCacheClient client = newClient();
+    GrpcCacheClient client = newClient();
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -949,7 +950,7 @@
 
   @Test
   public void testGetCachedActionResultWithRetries() throws Exception {
-    final GrpcCacheClient client = newClient();
+    GrpcCacheClient client = newClient();
     ActionKey actionKey = DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key"));
     serviceRegistry.addService(
         new ActionCacheImplBase() {
@@ -971,8 +972,7 @@
   @Test
   public void downloadBlobIsRetriedWithProgress() throws IOException, InterruptedException {
     Backoff mockBackoff = Mockito.mock(Backoff.class);
-    final GrpcCacheClient client =
-        newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+    GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1002,8 +1002,7 @@
   public void downloadBlobDoesNotRetryZeroLengthRequests()
       throws IOException, InterruptedException {
     Backoff mockBackoff = Mockito.mock(Backoff.class);
-    final GrpcCacheClient client =
-        newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+    GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1024,8 +1023,7 @@
   public void downloadBlobPassesThroughDeadlineExceededWithoutProgress() throws IOException {
     Backoff mockBackoff = Mockito.mock(Backoff.class);
     Mockito.when(mockBackoff.nextDelayMillis(any(Exception.class))).thenReturn(-1L);
-    final GrpcCacheClient client =
-        newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+    GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
     final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
     serviceRegistry.addService(
         new ByteStreamImplBase() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
index 817394f..16e0b37 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
@@ -289,7 +289,8 @@
             channel.retain(),
             callCredentialsProvider,
             remoteOptions.remoteTimeout.getSeconds(),
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
     GrpcCacheClient cacheProtocol =
         new GrpcCacheClient(
             channel.retain(),
diff --git a/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java b/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
index 16f394a..e6425a4 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
@@ -96,4 +96,11 @@
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testRemoteMaximumOpenFilesDefault() {
+    RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+    int defaultMax = options.maximumOpenFiles;
+    assertThat(defaultMax).isEqualTo(-1);
+  }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
index 5a003e1..3c1033f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -33,7 +33,10 @@
         "//third_party:junit4",
         "//third_party:rxjava3",
         "//third_party:truth",
+        "//third_party/grpc:grpc-jar",
         "//third_party/protobuf:protobuf_java",
+        "@googleapis//:google_bytestream_bytestream_java_grpc",
+        "@googleapis//:google_bytestream_bytestream_java_proto",
         "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
     ],
 )
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java b/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
index 5e50e71..419d24f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
@@ -13,12 +13,20 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote.util;
 
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.devtools.build.lib.remote.RemoteRetrier;
 import com.google.devtools.build.lib.remote.Retrier;
 import com.google.devtools.build.lib.remote.Retrier.Backoff;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -154,4 +162,53 @@
       delegate.execute(command);
     }
   }
+
+  public static final ByteStreamImplBase newNoErrorByteStreamService(byte[] blob) {
+    return new ByteStreamImplBase() {
+      @Override
+      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+        return new StreamObserver<WriteRequest>() {
+
+          byte[] receivedData = new byte[blob.length];
+          long nextOffset = 0;
+
+          @Override
+          public void onNext(WriteRequest writeRequest) {
+            if (nextOffset == 0) {
+              assertThat(writeRequest.getResourceName()).isNotEmpty();
+              assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
+            } else {
+              assertThat(writeRequest.getResourceName()).isEmpty();
+            }
+
+            assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
+
+            ByteString data = writeRequest.getData();
+
+            System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
+
+            nextOffset += data.size();
+            boolean lastWrite = blob.length == nextOffset;
+            assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+          }
+
+          @Override
+          public void onError(Throwable throwable) {
+            fail("onError should never be called.");
+          }
+
+          @Override
+          public void onCompleted() {
+            assertThat(nextOffset).isEqualTo(blob.length);
+            assertThat(receivedData).isEqualTo(blob);
+
+            WriteResponse response =
+                WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+            streamObserver.onNext(response);
+            streamObserver.onCompleted();
+          }
+        };
+      }
+    };
+  }
 }