BEP artifact upload opens all files upfront causing os to throw 'too many files open' exception

To address this problem, added --bep_maximum_open_remote_upload_files flag so users have control over the max number of files the BEP has open at once during remote cache artifact upload. Users can set the flag to a limit their native os can handle.

PiperOrigin-RevId: 415589490
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 15cc335..4865af2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote;
 
+import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertThrows;
@@ -80,6 +81,8 @@
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.Before;
@@ -162,7 +165,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -170,55 +174,7 @@
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
     HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
 
-    serviceRegistry.addService(
-        new ByteStreamImplBase() {
-          @Override
-          public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
-            return new StreamObserver<WriteRequest>() {
-
-              byte[] receivedData = new byte[blob.length];
-              long nextOffset = 0;
-
-              @Override
-              public void onNext(WriteRequest writeRequest) {
-                if (nextOffset == 0) {
-                  assertThat(writeRequest.getResourceName()).isNotEmpty();
-                  assertThat(writeRequest.getResourceName()).startsWith(INSTANCE_NAME + "/uploads");
-                  assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
-                } else {
-                  assertThat(writeRequest.getResourceName()).isEmpty();
-                }
-
-                assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
-
-                ByteString data = writeRequest.getData();
-
-                System.arraycopy(
-                    data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
-
-                nextOffset += data.size();
-                boolean lastWrite = blob.length == nextOffset;
-                assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-              }
-
-              @Override
-              public void onError(Throwable throwable) {
-                fail("onError should never be called.");
-              }
-
-              @Override
-              public void onCompleted() {
-                assertThat(nextOffset).isEqualTo(blob.length);
-                assertThat(receivedData).isEqualTo(blob);
-
-                WriteResponse response =
-                    WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
-                streamObserver.onNext(response);
-                streamObserver.onCompleted();
-              }
-            };
-          }
-        });
+    serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
 
     uploader.uploadBlob(context, hash, chunker, true);
 
@@ -239,7 +195,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -355,7 +312,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             300,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -480,7 +438,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             1,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -539,7 +498,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -610,7 +570,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -648,7 +609,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             3,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -703,7 +665,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             300,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -735,7 +698,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     int numUploads = 10;
     Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
@@ -759,6 +723,118 @@
   }
 
   @Test
+  public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
+    byte[] blob = new byte[CHUNK_SIZE];
+    Chunker chunker = Mockito.mock(Chunker.class);
+    HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+    AtomicLong committedOffset = new AtomicLong(0);
+    Mockito.doThrow(new IOException("Too many open files"))
+        .when(chunker)
+        .seek(committedOffset.get());
+    Mockito.when(chunker.getSize()).thenReturn(1L);
+
+    try {
+      uploader.uploadBlob(context, hash, chunker, true);
+      fail("Should have thrown an exception.");
+    } catch (IOException e) {
+      String newMessage =
+          "An IOException was thrown because the process opened too many files. We recommend"
+              + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
+              + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
+              + " message: Too many open files";
+      assertThat(newMessage).isEqualTo(e.getMessage());
+    }
+  }
+
+  @Test
+  public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAllPermits()
+      throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    // number of permits is less than number of uploads to affirm permit is released
+    int maximumOpenFiles = 999;
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            maximumOpenFiles);
+
+    assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999);
+
+    CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
+    int numUploads = 1000;
+    Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      Chunker chunker =
+          TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+      chunkers.put(hash, chunker);
+      blobsByHash.put(hash, blob);
+    }
+
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    uploader.uploadBlobs(context, chunkers, true);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
+  }
+
+  @Test
+  public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
+    RemoteRetrier retrier =
+        TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader(
+            INSTANCE_NAME,
+            new ReferenceCountedChannel(channelConnectionFactory),
+            CallCredentialsProvider.NO_CREDENTIALS,
+            /* callTimeoutSecs= */ 60,
+            retrier,
+            /*maximumOpenFiles=*/ -1);
+    assertThat(uploader.getOpenedFilePermits()).isNull();
+
+    int numUploads = 10;
+    Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+    Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+      HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+      chunkers.put(hash, chunker);
+      blobsByHash.put(hash, blob);
+    }
+
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    uploader.uploadBlobs(context, chunkers, true);
+    blockUntilInternalStateConsistent(uploader);
+    assertThat(uploader.getOpenedFilePermits()).isNull();
+  }
+
+  @Test
   public void contextShouldBePreservedUponRetries() throws Exception {
     // We upload blobs with different context, and retry 3 times for each upload.
     // We verify that the correct metadata is passed to the server with every blob.
@@ -770,7 +846,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
     Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(toUpload.size());
@@ -901,7 +978,8 @@
                 }),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -962,7 +1040,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 10];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1028,7 +1107,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1061,7 +1141,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     CountDownLatch cancellations = new CountDownLatch(2);
 
@@ -1125,7 +1206,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1163,7 +1245,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     serviceRegistry.addService(
         new ByteStreamImplBase() {
@@ -1206,7 +1289,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     AtomicInteger numCalls = new AtomicInteger();
 
@@ -1242,7 +1326,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1320,7 +1405,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1401,7 +1487,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1458,7 +1545,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             callCredentialsProvider,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1527,7 +1615,8 @@
             new ReferenceCountedChannel(channelConnectionFactory),
             CallCredentialsProvider.NO_CREDENTIALS,
             /* callTimeoutSecs= */ 60,
-            retrier);
+            retrier,
+            /*maximumOpenFiles=*/ -1);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -1736,4 +1825,65 @@
       Thread.sleep(1);
     }
   }
+
+  /* Custom Chunker used to track number of open files */
+  private static class TestChunker extends Chunker {
+
+    TestChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
+      super(dataSupplier, size, chunkSize, compressed);
+    }
+
+    public static Builder builder(CustomFileTracker customFileTracker) {
+      return new TestChunkerBuilder(customFileTracker);
+    }
+
+    private static class TestChunkerBuilder extends Chunker.Builder {
+      private final CustomFileTracker customFileTracker;
+
+      TestChunkerBuilder(CustomFileTracker customFileTracker) {
+        this.customFileTracker = customFileTracker;
+      }
+
+      @Override
+      public Chunker.Builder setInput(byte[] existingData) {
+        return setInputSupplier(
+            () -> new TestByteArrayInputStream(existingData, customFileTracker));
+      }
+    }
+  }
+
+  private static class TestByteArrayInputStream extends ByteArrayInputStream {
+    private final CustomFileTracker customFileTracker;
+
+    TestByteArrayInputStream(byte[] buf, CustomFileTracker customFileTracker) {
+      super(buf);
+      this.customFileTracker = customFileTracker;
+      customFileTracker.incrementOpenFiles();
+    }
+
+    @Override
+    public void close() throws IOException {
+      super.close();
+      customFileTracker.decrementOpenFiles();
+    }
+  }
+
+  private static class CustomFileTracker {
+    private final AtomicInteger openFiles = new AtomicInteger(0);
+    private final int maxOpenFiles;
+
+    CustomFileTracker(int maxOpenFiles) {
+      this.maxOpenFiles = maxOpenFiles;
+    }
+
+    private void incrementOpenFiles() {
+      openFiles.getAndIncrement();
+      checkState(openFiles.get() <= maxOpenFiles);
+    }
+
+    private void decrementOpenFiles() {
+      openFiles.getAndDecrement();
+      checkState(openFiles.get() >= 0);
+    }
+  }
 }