BEP artifact upload opens all files upfront causing os to throw 'too many files open' exception
To address this problem, added --bep_maximum_open_remote_upload_files flag so users have control over the max number of files the BEP has open at once during remote cache artifact upload. Users can set the flag to a limit their native os can handle.
PiperOrigin-RevId: 415589490
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index c488f14..2087492 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -27,6 +27,7 @@
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Ascii;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.flogger.GoogleLogger;
@@ -59,6 +60,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@@ -93,6 +95,8 @@
@GuardedBy("lock")
private boolean isShutdown;
+ @Nullable private final Semaphore openedFilePermits;
+
/**
* Creates a new instance.
*
@@ -109,14 +113,15 @@
ReferenceCountedChannel channel,
CallCredentialsProvider callCredentialsProvider,
long callTimeoutSecs,
- RemoteRetrier retrier) {
+ RemoteRetrier retrier,
+ int maximumOpenFiles) {
checkArgument(callTimeoutSecs > 0, "callTimeoutSecs must be gt 0.");
-
this.instanceName = instanceName;
this.channel = channel;
this.callCredentialsProvider = callCredentialsProvider;
this.callTimeoutSecs = callTimeoutSecs;
this.retrier = retrier;
+ this.openedFilePermits = maximumOpenFiles != -1 ? new Semaphore(maximumOpenFiles) : null;
}
@VisibleForTesting
@@ -254,7 +259,6 @@
if (inProgress != null) {
return inProgress;
}
-
ListenableFuture<Void> uploadResult =
Futures.transform(
startAsyncUpload(context, digest, chunker),
@@ -339,6 +343,16 @@
retrier,
resourceName,
chunker);
+ if (openedFilePermits != null) {
+ try {
+ openedFilePermits.acquire();
+ } catch (InterruptedException e) {
+ return Futures.immediateFailedFuture(
+ new InterruptedException(
+ "Unexpected interrupt while acquiring open file permit. Original error message: "
+ + e.getMessage()));
+ }
+ }
ListenableFuture<Void> currUpload = newUpload.start();
currUpload.addListener(
() -> {
@@ -371,7 +385,7 @@
return this;
}
- private static class AsyncUpload {
+ private class AsyncUpload {
private final RemoteActionExecutionContext context;
private final Channel channel;
@@ -420,6 +434,18 @@
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
+ String tooManyOpenFilesError = "Too many open files";
+ if (Ascii.toLowerCase(e.getMessage())
+ .contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
+ String newMessage =
+ "An IOException was thrown because the process opened too many"
+ + " files. We recommend setting"
+ + " --bep_maximum_open_remote_upload_files flag to a number"
+ + " lower than your system default (run 'ulimit -a' for"
+ + " *nix-based operating systems). Original error message: "
+ + e.getMessage();
+ return Futures.immediateFailedFuture(new IOException(newMessage, e));
+ }
return Futures.immediateFailedFuture(e);
}
if (chunker.hasNext()) {
@@ -429,7 +455,9 @@
},
progressiveBackoff),
callCredentialsProvider);
-
+ if (openedFilePermits != null) {
+ callFuture.addListener(openedFilePermits::release, MoreExecutors.directExecutor());
+ }
return Futures.transformAsync(
callFuture,
(result) -> {
@@ -632,4 +660,9 @@
}
}
}
+
+ @VisibleForTesting
+ public Semaphore getOpenedFilePermits() {
+ return openedFilePermits;
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index d1024a3..19c7a0f 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -38,12 +38,14 @@
/**
* Splits a data source into one or more {@link Chunk}s of at most {@code chunkSize} bytes.
*
- * <p>After a data source has been fully consumed, that is until {@link #hasNext()} returns
- * {@code false}, the chunker closes the underlying data source (i.e. file) itself. However, in
- * case of error or when a data source does not get fully consumed, a user must call
- * {@link #reset()} manually.
+ * <p>After a data source has been fully consumed, that is until {@link #hasNext()} returns {@code
+ * false}, the chunker closes the underlying data source (i.e. file) itself. However, in case of
+ * error or when a data source does not get fully consumed, a user must call {@link #reset()}
+ * manually.
+ *
+ * <p>This class should not be extended - it's only non-final for testing.
*/
-public final class Chunker {
+public class Chunker {
private static int defaultChunkSize = 1024 * 16;
@@ -104,7 +106,7 @@
private final int chunkSize;
private final Chunk emptyChunk;
- private ChunkerInputStream data;
+ @VisibleForTesting protected ChunkerInputStream data;
private long offset;
private byte[] chunkCache;
@@ -274,7 +276,13 @@
public Builder setInput(byte[] data) {
checkState(inputStream == null);
size = data.length;
- inputStream = () -> new ByteArrayInputStream(data);
+ setInputSupplier(() -> new ByteArrayInputStream(data));
+ return this;
+ }
+
+ @VisibleForTesting
+ protected final Builder setInputSupplier(Supplier<InputStream> inputStream) {
+ this.inputStream = inputStream;
return this;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index de7ed30..4e2db8d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -528,7 +528,8 @@
cacheChannel.retain(),
callCredentialsProvider,
remoteOptions.remoteTimeout.getSeconds(),
- retrier);
+ retrier,
+ remoteOptions.maximumOpenFiles);
cacheChannel.release();
RemoteCacheClient cacheClient =
diff --git a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
index d7dc2ec..61a53db 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/options/RemoteOptions.java
@@ -565,6 +565,14 @@
+ "that loads objects from the CAS on demand.")
public String remoteDownloadSymlinkTemplate;
+ @Option(
+ name = "bep_maximum_open_remote_upload_files",
+ defaultValue = "-1",
+ documentationCategory = OptionDocumentationCategory.OUTPUT_PARAMETERS,
+ effectTags = {OptionEffectTag.AFFECTS_OUTPUTS},
+ help = "Maximum number of open files allowed during BEP artifact upload.")
+ public int maximumOpenFiles;
+
// The below options are not configurable by users, only tests.
// This is part of the effort to reduce the overall number of flags.
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index a409604..fdf575e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -178,7 +178,55 @@
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
ByteStreamUploader uploader =
new ByteStreamUploader(
- "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+ "instance",
+ refCntChannel,
+ CallCredentialsProvider.NO_CREDENTIALS,
+ 3,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
+ ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
+
+ PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
+ for (Path file : filesToUpload.keySet()) {
+ String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
+ long size = file.getFileSize();
+ String conversion = pathConverter.apply(file);
+ assertThat(conversion)
+ .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
+ }
+
+ artifactUploader.release();
+
+ assertThat(uploader.refCnt()).isEqualTo(0);
+ assertThat(refCntChannel.isShutdown()).isTrue();
+ }
+
+ @Test
+ public void uploadsShouldWork_fewerPermitsThanUploads() throws Exception {
+ int numUploads = 2;
+ Map<HashCode, byte[]> blobsByHash = new HashMap<>();
+ Map<Path, LocalFile> filesToUpload = new HashMap<>();
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ Path file = fs.getPath("/file" + i);
+ OutputStream out = file.getOutputStream();
+ int blobSize = rand.nextInt(100) + 1;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ out.write(blob);
+ out.close();
+ blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
+ filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+ }
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
+ // number of permits is less than number of uploads to affirm permit is released
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier, 1);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
@@ -207,7 +255,12 @@
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
ByteStreamUploader uploader =
new ByteStreamUploader(
- "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+ "instance",
+ refCntChannel,
+ CallCredentialsProvider.NO_CREDENTIALS,
+ 3,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
@@ -270,7 +323,12 @@
ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory);
ByteStreamUploader uploader =
new ByteStreamUploader(
- "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier);
+ "instance",
+ refCntChannel,
+ CallCredentialsProvider.NO_CREDENTIALS,
+ 3,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
artifactUploader.upload(filesToUpload).get();
@@ -298,7 +356,12 @@
ByteStreamUploader uploader =
spy(
new ByteStreamUploader(
- "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier));
+ "instance",
+ refCntChannel,
+ CallCredentialsProvider.NO_CREDENTIALS,
+ 3,
+ retrier,
+ /*maximumOpenFiles=*/ -1));
RemoteActionInputFetcher actionInputFetcher = Mockito.mock(RemoteActionInputFetcher.class);
ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader);
@@ -356,7 +419,12 @@
ByteStreamUploader uploader =
spy(
new ByteStreamUploader(
- "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier));
+ "instance",
+ refCntChannel,
+ CallCredentialsProvider.NO_CREDENTIALS,
+ 3,
+ retrier,
+ /*maximumOpenFiles=*/ -1));
doReturn(Futures.immediateFuture(null))
.when(uploader)
.uploadBlobAsync(any(), any(Digest.class), any(), anyBoolean());
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 15cc335..4865af2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;
@@ -80,6 +81,8 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
@@ -162,7 +165,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -170,55 +174,7 @@
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
- serviceRegistry.addService(
- new ByteStreamImplBase() {
- @Override
- public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
- return new StreamObserver<WriteRequest>() {
-
- byte[] receivedData = new byte[blob.length];
- long nextOffset = 0;
-
- @Override
- public void onNext(WriteRequest writeRequest) {
- if (nextOffset == 0) {
- assertThat(writeRequest.getResourceName()).isNotEmpty();
- assertThat(writeRequest.getResourceName()).startsWith(INSTANCE_NAME + "/uploads");
- assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
- } else {
- assertThat(writeRequest.getResourceName()).isEmpty();
- }
-
- assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
-
- ByteString data = writeRequest.getData();
-
- System.arraycopy(
- data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
-
- nextOffset += data.size();
- boolean lastWrite = blob.length == nextOffset;
- assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
- }
-
- @Override
- public void onError(Throwable throwable) {
- fail("onError should never be called.");
- }
-
- @Override
- public void onCompleted() {
- assertThat(nextOffset).isEqualTo(blob.length);
- assertThat(receivedData).isEqualTo(blob);
-
- WriteResponse response =
- WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
- streamObserver.onNext(response);
- streamObserver.onCompleted();
- }
- };
- }
- });
+ serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
uploader.uploadBlob(context, hash, chunker, true);
@@ -239,7 +195,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -355,7 +312,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
300,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -480,7 +438,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
1,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -539,7 +498,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -610,7 +570,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -648,7 +609,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
3,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -703,7 +665,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
300,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -735,7 +698,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
int numUploads = 10;
Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
@@ -759,6 +723,118 @@
}
@Test
+ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
+ byte[] blob = new byte[CHUNK_SIZE];
+ Chunker chunker = Mockito.mock(Chunker.class);
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ AtomicLong committedOffset = new AtomicLong(0);
+ Mockito.doThrow(new IOException("Too many open files"))
+ .when(chunker)
+ .seek(committedOffset.get());
+ Mockito.when(chunker.getSize()).thenReturn(1L);
+
+ try {
+ uploader.uploadBlob(context, hash, chunker, true);
+ fail("Should have thrown an exception.");
+ } catch (IOException e) {
+ String newMessage =
+ "An IOException was thrown because the process opened too many files. We recommend"
+ + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
+ + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
+ + " message: Too many open files";
+ assertThat(newMessage).isEqualTo(e.getMessage());
+ }
+ }
+
+ @Test
+ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAllPermits()
+ throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ // number of permits is less than number of uploads to affirm permit is released
+ int maximumOpenFiles = 999;
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ maximumOpenFiles);
+
+ assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(999);
+
+ CustomFileTracker customFileTracker = new CustomFileTracker(maximumOpenFiles);
+ int numUploads = 1000;
+ Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+ Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ Chunker chunker =
+ TestChunker.builder(customFileTracker).setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ chunkers.put(hash, chunker);
+ blobsByHash.put(hash, blob);
+ }
+
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ uploader.uploadBlobs(context, chunkers, true);
+
+ blockUntilInternalStateConsistent(uploader);
+
+ assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
+ }
+
+ @Test
+ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
+ RemoteRetrier retrier =
+ TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService);
+ ByteStreamUploader uploader =
+ new ByteStreamUploader(
+ INSTANCE_NAME,
+ new ReferenceCountedChannel(channelConnectionFactory),
+ CallCredentialsProvider.NO_CREDENTIALS,
+ /* callTimeoutSecs= */ 60,
+ retrier,
+ /*maximumOpenFiles=*/ -1);
+ assertThat(uploader.getOpenedFilePermits()).isNull();
+
+ int numUploads = 10;
+ Map<HashCode, byte[]> blobsByHash = Maps.newHashMap();
+ Map<HashCode, Chunker> chunkers = Maps.newHashMapWithExpectedSize(numUploads);
+ Random rand = new Random();
+ for (int i = 0; i < numUploads; i++) {
+ int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
+ byte[] blob = new byte[blobSize];
+ rand.nextBytes(blob);
+ Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
+ HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
+ chunkers.put(hash, chunker);
+ blobsByHash.put(hash, blob);
+ }
+
+ serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+ uploader.uploadBlobs(context, chunkers, true);
+ blockUntilInternalStateConsistent(uploader);
+ assertThat(uploader.getOpenedFilePermits()).isNull();
+ }
+
+ @Test
public void contextShouldBePreservedUponRetries() throws Exception {
// We upload blobs with different context, and retry 3 times for each upload.
// We verify that the correct metadata is passed to the server with every blob.
@@ -770,7 +846,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
Map<Digest, Chunker> chunkers = Maps.newHashMapWithExpectedSize(toUpload.size());
@@ -901,7 +978,8 @@
}),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -962,7 +1040,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 10];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1028,7 +1107,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
@@ -1061,7 +1141,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
CountDownLatch cancellations = new CountDownLatch(2);
@@ -1125,7 +1206,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1163,7 +1245,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1206,7 +1289,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
AtomicInteger numCalls = new AtomicInteger();
@@ -1242,7 +1326,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1320,7 +1405,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1401,7 +1487,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1458,7 +1545,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
callCredentialsProvider,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1527,7 +1615,8 @@
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
@@ -1736,4 +1825,65 @@
Thread.sleep(1);
}
}
+
+ /* Custom Chunker used to track number of open files */
+ private static class TestChunker extends Chunker {
+
+ TestChunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
+ super(dataSupplier, size, chunkSize, compressed);
+ }
+
+ public static Builder builder(CustomFileTracker customFileTracker) {
+ return new TestChunkerBuilder(customFileTracker);
+ }
+
+ private static class TestChunkerBuilder extends Chunker.Builder {
+ private final CustomFileTracker customFileTracker;
+
+ TestChunkerBuilder(CustomFileTracker customFileTracker) {
+ this.customFileTracker = customFileTracker;
+ }
+
+ @Override
+ public Chunker.Builder setInput(byte[] existingData) {
+ return setInputSupplier(
+ () -> new TestByteArrayInputStream(existingData, customFileTracker));
+ }
+ }
+ }
+
+ private static class TestByteArrayInputStream extends ByteArrayInputStream {
+ private final CustomFileTracker customFileTracker;
+
+ TestByteArrayInputStream(byte[] buf, CustomFileTracker customFileTracker) {
+ super(buf);
+ this.customFileTracker = customFileTracker;
+ customFileTracker.incrementOpenFiles();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ customFileTracker.decrementOpenFiles();
+ }
+ }
+
+ private static class CustomFileTracker {
+ private final AtomicInteger openFiles = new AtomicInteger(0);
+ private final int maxOpenFiles;
+
+ CustomFileTracker(int maxOpenFiles) {
+ this.maxOpenFiles = maxOpenFiles;
+ }
+
+ private void incrementOpenFiles() {
+ openFiles.getAndIncrement();
+ checkState(openFiles.get() <= maxOpenFiles);
+ }
+
+ private void decrementOpenFiles() {
+ openFiles.getAndDecrement();
+ checkState(openFiles.get() >= 0);
+ }
+ }
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
index 5975f5e..43e38e2 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java
@@ -251,7 +251,8 @@
channel.retain(),
callCredentialsProvider,
remoteOptions.remoteTimeout.getSeconds(),
- retrier);
+ retrier,
+ remoteOptions.maximumOpenFiles);
return new GrpcCacheClient(
channel.retain(), callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL, uploader);
}
@@ -338,7 +339,7 @@
@Test
public void testDownloadBlobSingleChunk() throws Exception {
- final GrpcCacheClient client = newClient();
+ GrpcCacheClient client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -355,7 +356,7 @@
@Test
public void testDownloadBlobMultipleChunks() throws Exception {
- final GrpcCacheClient client = newClient();
+ GrpcCacheClient client = newClient();
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -949,7 +950,7 @@
@Test
public void testGetCachedActionResultWithRetries() throws Exception {
- final GrpcCacheClient client = newClient();
+ GrpcCacheClient client = newClient();
ActionKey actionKey = DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key"));
serviceRegistry.addService(
new ActionCacheImplBase() {
@@ -971,8 +972,7 @@
@Test
public void downloadBlobIsRetriedWithProgress() throws IOException, InterruptedException {
Backoff mockBackoff = Mockito.mock(Backoff.class);
- final GrpcCacheClient client =
- newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+ GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1002,8 +1002,7 @@
public void downloadBlobDoesNotRetryZeroLengthRequests()
throws IOException, InterruptedException {
Backoff mockBackoff = Mockito.mock(Backoff.class);
- final GrpcCacheClient client =
- newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+ GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
@@ -1024,8 +1023,7 @@
public void downloadBlobPassesThroughDeadlineExceededWithoutProgress() throws IOException {
Backoff mockBackoff = Mockito.mock(Backoff.class);
Mockito.when(mockBackoff.nextDelayMillis(any(Exception.class))).thenReturn(-1L);
- final GrpcCacheClient client =
- newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
+ GrpcCacheClient client = newClient(Options.getDefaults(RemoteOptions.class), () -> mockBackoff);
final Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
serviceRegistry.addService(
new ByteStreamImplBase() {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
index 817394f..16e0b37 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java
@@ -289,7 +289,8 @@
channel.retain(),
callCredentialsProvider,
remoteOptions.remoteTimeout.getSeconds(),
- retrier);
+ retrier,
+ /*maximumOpenFiles=*/ -1);
GrpcCacheClient cacheProtocol =
new GrpcCacheClient(
channel.retain(),
diff --git a/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java b/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
index 16f394a..e6425a4 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/options/RemoteOptionsTest.java
@@ -96,4 +96,11 @@
fail(e.getMessage());
}
}
+
+ @Test
+ public void testRemoteMaximumOpenFilesDefault() {
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ int defaultMax = options.maximumOpenFiles;
+ assertThat(defaultMax).isEqualTo(-1);
+ }
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
index 5a003e1..3c1033f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -33,7 +33,10 @@
"//third_party:junit4",
"//third_party:rxjava3",
"//third_party:truth",
+ "//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java b/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
index 5e50e71..419d24f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/TestUtils.java
@@ -13,12 +13,20 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.devtools.build.lib.remote.RemoteRetrier;
import com.google.devtools.build.lib.remote.Retrier;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -154,4 +162,53 @@
delegate.execute(command);
}
}
+
+ public static final ByteStreamImplBase newNoErrorByteStreamService(byte[] blob) {
+ return new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+ return new StreamObserver<WriteRequest>() {
+
+ byte[] receivedData = new byte[blob.length];
+ long nextOffset = 0;
+
+ @Override
+ public void onNext(WriteRequest writeRequest) {
+ if (nextOffset == 0) {
+ assertThat(writeRequest.getResourceName()).isNotEmpty();
+ assertThat(writeRequest.getResourceName()).endsWith(String.valueOf(blob.length));
+ } else {
+ assertThat(writeRequest.getResourceName()).isEmpty();
+ }
+
+ assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);
+
+ ByteString data = writeRequest.getData();
+
+ System.arraycopy(data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
+
+ nextOffset += data.size();
+ boolean lastWrite = blob.length == nextOffset;
+ assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ fail("onError should never be called.");
+ }
+
+ @Override
+ public void onCompleted() {
+ assertThat(nextOffset).isEqualTo(blob.length);
+ assertThat(receivedData).isEqualTo(blob);
+
+ WriteResponse response =
+ WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+ streamObserver.onNext(response);
+ streamObserver.onCompleted();
+ }
+ };
+ }
+ };
+ }
}