ByteStreamUploader: Open files at the last possible moment.
Open files to upload only when the network is ready to accept data. Empirically, this greatly reduces the need to set `--bep_maximum_open_remote_upload_files` because the small-to-medium-sized files that are common in real-world builds can be opened, dumped onto the network, and closed in one shot.
Closes #15670.
PiperOrigin-RevId: 455074457
Change-Id: I243c7e6c51b2d415cdc0a00e7e71d06aefadbd2c
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 34d1213..4615397 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -336,36 +336,12 @@
}
if (committedSize > lastCommittedOffset) {
// We have made progress on this upload in the last request. Reset the backoff so
- // that
- // this request has a full deck of retries
+ // that this request has a full deck of retries
progressiveBackoff.reset();
}
}
lastCommittedOffset = committedSize;
- try {
- chunker.seek(committedSize);
- } catch (IOException e) {
- try {
- chunker.reset();
- } catch (IOException resetException) {
- e.addSuppressed(resetException);
- }
- String tooManyOpenFilesError = "Too many open files";
- if (Ascii.toLowerCase(e.getMessage())
- .contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
- String newMessage =
- "An IOException was thrown because the process opened too"
- + " many files. We recommend setting"
- + " --bep_maximum_open_remote_upload_files flag to a"
- + " number lower than your system default (run 'ulimit"
- + " -a' for *nix-based operating systems). Original"
- + " error message: "
- + e.getMessage();
- return Futures.immediateFailedFuture(new IOException(newMessage, e));
- }
- return Futures.immediateFailedFuture(e);
- }
- return upload();
+ return upload(committedSize);
},
MoreExecutors.directExecutor());
}
@@ -415,12 +391,14 @@
MoreExecutors.directExecutor());
}
- private ListenableFuture<Long> upload() {
+ private ListenableFuture<Long> upload(long pos) {
return channel.withChannelFuture(
channel -> {
SettableFuture<Long> uploadResult = SettableFuture.create();
grpcContext.run(
- () -> bsAsyncStub(channel).write(new Writer(resourceName, chunker, uploadResult)));
+ () ->
+ bsAsyncStub(channel)
+ .write(new Writer(resourceName, chunker, pos, uploadResult)));
return uploadResult;
});
}
@@ -434,15 +412,18 @@
private static final class Writer
implements ClientResponseObserver<WriteRequest, WriteResponse>, Runnable {
private final Chunker chunker;
+ private final long pos;
private final String resourceName;
private final SettableFuture<Long> uploadResult;
private long committedSize = -1;
private ClientCallStreamObserver<WriteRequest> requestObserver;
private boolean first = true;
- private Writer(String resourceName, Chunker chunker, SettableFuture<Long> uploadResult) {
+ private Writer(
+ String resourceName, Chunker chunker, long pos, SettableFuture<Long> uploadResult) {
this.resourceName = resourceName;
this.chunker = chunker;
+ this.pos = pos;
this.uploadResult = uploadResult;
}
@@ -459,6 +440,15 @@
return;
}
while (requestObserver.isReady()) {
+ WriteRequest.Builder request = WriteRequest.newBuilder();
+ if (first) {
+ first = false;
+ if (!seekChunker()) {
+ return;
+ }
+ // Resource name only needs to be set on the first write for each file.
+ request.setResourceName(resourceName);
+ }
Chunker.Chunk chunk;
try {
chunk = chunker.next();
@@ -467,17 +457,12 @@
return;
}
boolean isLastChunk = !chunker.hasNext();
- WriteRequest.Builder request =
- WriteRequest.newBuilder()
+ requestObserver.onNext(
+ request
.setData(chunk.getData())
.setWriteOffset(chunk.getOffset())
- .setFinishWrite(isLastChunk);
- if (first) {
- first = false;
- // Resource name only needs to be set on the first write for each file.
- request.setResourceName(resourceName);
- }
- requestObserver.onNext(request.build());
+ .setFinishWrite(isLastChunk)
+ .build());
if (isLastChunk) {
requestObserver.onCompleted();
return;
@@ -485,6 +470,32 @@
}
}
+ private boolean seekChunker() {
+ try {
+ chunker.seek(pos);
+ } catch (IOException e) {
+ try {
+ chunker.reset();
+ } catch (IOException resetException) {
+ e.addSuppressed(resetException);
+ }
+ String tooManyOpenFilesError = "Too many open files";
+ if (Ascii.toLowerCase(e.getMessage()).contains(Ascii.toLowerCase(tooManyOpenFilesError))) {
+ String newMessage =
+ "An IOException was thrown because the process opened too many files. We recommend"
+ + " setting --bep_maximum_open_remote_upload_files flag to a number lower than"
+ + " your system default (run 'ulimit -a' for *nix-based operating systems)."
+ + " Original error message: "
+ + e.getMessage();
+ e = new IOException(newMessage, e);
+ }
+ uploadResult.setException(e);
+ requestObserver.cancel("failed to seek chunk", e);
+ return false;
+ }
+ return true;
+ }
+
@Override
public void onNext(WriteResponse response) {
committedSize = response.getCommittedSize();
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index de2ff4d..ca3175d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -30,6 +30,7 @@
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashCode;
@@ -77,7 +78,6 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.junit.After;
@@ -782,23 +782,18 @@
byte[] blob = new byte[CHUNK_SIZE];
Chunker chunker = Mockito.mock(Chunker.class);
Digest digest = DIGEST_UTIL.compute(blob);
- AtomicLong committedOffset = new AtomicLong(0);
- Mockito.doThrow(new IOException("Too many open files"))
- .when(chunker)
- .seek(committedOffset.get());
+ Mockito.doThrow(new IOException("Too many open files")).when(chunker).seek(0);
Mockito.when(chunker.getSize()).thenReturn(digest.getSizeBytes());
+ serviceRegistry.addService(new MaybeFailOnceUploadService(ImmutableMap.of()));
- try {
- uploader.uploadBlob(context, digest, chunker);
- fail("Should have thrown an exception.");
- } catch (IOException e) {
- String newMessage =
- "An IOException was thrown because the process opened too many files. We recommend"
- + " setting --bep_maximum_open_remote_upload_files flag to a number lower than your"
- + " system default (run 'ulimit -a' for *nix-based operating systems). Original error"
- + " message: Too many open files";
- assertThat(newMessage).isEqualTo(e.getMessage());
- }
+ String newMessage =
+ "An IOException was thrown because the process opened too many files. We recommend setting"
+ + " --bep_maximum_open_remote_upload_files flag to a number lower than your system"
+ + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:"
+ + " Too many open files";
+ assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)))
+ .hasMessageThat()
+ .isEqualTo(newMessage);
}
@Test