BEP artifact upload opens all files upfront causing os to throw 'too many files open' exception
To address this problem, added --bep_maximum_open_remote_upload_files flag so users have control over the max number of files the BEP has open at once during remote cache artifact upload. Users can set the flag to a limit their native os can handle.
PiperOrigin-RevId: 415589490
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 15cc335..4865af2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;
@@ -80,6 +81,8 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
@@ -162,7 +165,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -170,55 +174,7 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(
- new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- return new StreamObserver<WriteRequest>() {
-
- byte[] receivedData = new byte[blob.length];
- long nextOffset = 0;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- if (nextOffset == 0) {
- assertThat(writeRequest.getResourceName()).isNotEmpty();
- assertThat(writeRequest.getResourceName()).startsWith(INSTANCE_NAME + "/uploads");
- assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
- } else {
- assertThat(writeRequest.getResourceName()).isEmpty();
- }
-
- assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
-
- ByteString data = writeRequest.getData();
-
- System.arraycopy(
- data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
-
- nextOffset += data.size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
- assertThat(receivedData).isEqualTo(blob);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
+ serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
uploader.uploadBlob(context, hash, chunker, true);
@@ -239,7 +195,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -355,7 +312,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
300,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -480,7 +438,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
1,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -539,7 +498,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -610,7 +570,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -648,7 +609,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -703,7 +665,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
300,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -735,7 +698,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
int numUploads = 10;
Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
@@ -759,6 +723,118 @@
}
@Test
+ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
+ byte[] blob = new byte[CHUNK_SIZE];
+ Chunker chunker = Mockito.mock(Chunker.class);
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ AtomicLong committedOffset = new AtomicLong(0);
+ Mockito.doThrow(new IOException("Too many open files"))
+ .when(chunker)
+ .seek(committedOffset.get());
+ Mockito.when(chunker.getSize()).thenReturn(1L);
+
+ try {
+ uploader.uploadBlob(context, hash, chunker, true);
+ fail("Should have thrown an exception.");
+ } catch (IOException e) {
+ String newMessage =
+ "An IOException was thrown because the process opened too many files. We recommend"
+ + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
+ + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
+ + " message: Too many open files";
+ assertThat(newMessage).isEqualTo(e.getMessage());
+ }
+ }
+
+ @Test
+ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAllPermits()
+ throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ // number of permits is less than number of uploads to affirm permit is released
+ int maximumOpenFiles = 999;
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ maximumOpenFiles);
+
+ assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999);
+
+ CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
+ int numUploads = 1000;
+ Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+ Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ Chunker chunker =
+ TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ chunkers.put(hash, chunker);
+ blobsByHash.put(hash, blob);
+ }
+
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ uploader.uploadBlobs(context, chunkers, true);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
+ }
+
+ @Test
+ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
+ assertThat(uploader.getOpenedFilePermits()).isNull();
+
+ int numUploads = 10;
+ Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+ Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ chunkers.put(hash, chunker);
+ blobsByHash.put(hash, blob);
+ }
+
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ uploader.uploadBlobs(context, chunkers, true);
+ blockUntilInternalStateConsistent(uploader);
+ assertThat(uploader.getOpenedFilePermits()).isNull();
+ }
+
+ @Test
public void contextShouldBePreservedUponRetries() throws Exception {
// We upload blobs with different context, and retry 3 times for each upload.
// We verify that the correct metadata is passed to the server with every blob.
@@ -770,7 +846,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(toUpload.size());
@@ -901,7 +978,8 @@
}),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -962,7 +1040,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1028,7 +1107,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1061,7 +1141,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -1125,7 +1206,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1163,7 +1245,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1206,7 +1289,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
AtomicInteger numCalls = new AtomicInteger();
@@ -1242,7 +1326,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1320,7 +1405,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1401,7 +1487,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1458,7 +1545,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1527,7 +1615,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1736,4 +1825,65 @@
Thread.sleep(1);
}
}
+
+ /* Custom Chunker used to track number of open files */
+ private static class TestChunker extends Chunker {
+
+ TestChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
+ super(dataSupplier, size, chunkSize, compressed);
+ }
+
+ public static Builder builder(CustomFileTracker customFileTracker) {
+ return new TestChunkerBuilder(customFileTracker);
+ }
+
+ private static class TestChunkerBuilder extends Chunker.Builder {
+ private final CustomFileTracker customFileTracker;
+
+ TestChunkerBuilder(CustomFileTracker customFileTracker) {
+ this.customFileTracker = customFileTracker;
+ }
+
+ @Override
+ public Chunker.Builder setInput(byte[] existingData) {
+ return setInputSupplier(
+ () -> new TestByteArrayInputStream(existingData, customFileTracker));
+ }
+ }
+ }
+
+ private static class TestByteArrayInputStream extends ByteArrayInputStream {
+ private final CustomFileTracker customFileTracker;
+
+ TestByteArrayInputStream(byte[] buf, CustomFileTracker customFileTracker) {
+ super(buf);
+ this.customFileTracker = customFileTracker;
+ customFileTracker.incrementOpenFiles();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ customFileTracker.decrementOpenFiles();
+ }
+ }
+
+ private static class CustomFileTracker {
+ private final AtomicInteger openFiles = new AtomicInteger(0);
+ private final int maxOpenFiles;
+
+ CustomFileTracker(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ }
+
+ private void incrementOpenFiles() {
+ openFiles.getAndIncrement();
+ checkState(openFiles.get() <= maxOpenFiles);
+ }
+
+ private void decrementOpenFiles() {
+ openFiles.getAndDecrement();
+ checkState(openFiles.get() >= 0);
+ }
+ }
}