Remote: Display download progress when actions are downloading outputs from remote cache.
Normally, when executing action with remote cache/execution, the UI only display the "remote"/"remote-cache" strategy:
```
[500 / 1000] 500 actions, 3 running
[Sched] Executing genrule //:test-1;
Executing genrule //:test-2; 2s remote
Executing genrule //:test-3; 3s remote ...
```
However, it doesn't tell users what is happening under the hood. #13555 fixed the confusion which the UI display the action is scheduling while it is actually downloading the outputs.
With this change, Bazel will display the downloads if action is downloading outputs. e.g.
```
[500 / 1000] 500 actions, 3 running
[Sched] Executing genrule //:test-1; 1s remote
Executing genrule //:test-2; Downloading 2.out, 20.1 KiB / 100 KiB; 2s remote
Executing genrule //:test-3; 3s remote ...
```
Add a generic `ActionProgressEvent` which can be reported within action execution to display detailed execution progress for that action.
Closes #13557.
PiperOrigin-RevId: 383224334
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
index 2ac8029..d978ad6 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
@@ -15,6 +15,8 @@
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static com.google.devtools.build.lib.remote.common.ProgressStatusListener.NO_ACTION;
+import static com.google.devtools.build.lib.remote.util.Utils.bytesCountToDisplayString;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import build.bazel.remote.execution.v2.Action;
@@ -50,6 +52,7 @@
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.actions.cache.MetadataInjector;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
+import com.google.devtools.build.lib.exec.SpawnProgressEvent;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
@@ -58,6 +61,7 @@
import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.SymlinkMetadata;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
+import com.google.devtools.build.lib.remote.common.ProgressStatusListener;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionFileArtifactValue;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
@@ -91,6 +95,9 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -314,6 +321,50 @@
return actualPath.getParentDirectory().getRelative(actualPath.getBaseName() + ".tmp");
}
+ static class DownloadProgressReporter {
+ private static final Pattern PATTERN = Pattern.compile("^bazel-out/[^/]+/[^/]+/");
+ private final ProgressStatusListener listener;
+ private final String id;
+ private final String file;
+ private final String totalSize;
+ private final AtomicLong downloadedBytes = new AtomicLong(0);
+
+ DownloadProgressReporter(ProgressStatusListener listener, String file, long totalSize) {
+ this.listener = listener;
+ this.id = file;
+ this.totalSize = bytesCountToDisplayString(totalSize);
+
+ Matcher matcher = PATTERN.matcher(file);
+ this.file = matcher.replaceFirst("");
+ }
+
+ void started() {
+ reportProgress(false, false);
+ }
+
+ void downloadedBytes(int count) {
+ downloadedBytes.addAndGet(count);
+ reportProgress(true, false);
+ }
+
+ void finished() {
+ reportProgress(true, true);
+ }
+
+ private void reportProgress(boolean includeBytes, boolean finished) {
+ String progress;
+ if (includeBytes) {
+ progress =
+ String.format(
+ "Downloading %s, %s / %s",
+ file, bytesCountToDisplayString(downloadedBytes.get()), totalSize);
+ } else {
+ progress = String.format("Downloading %s", file);
+ }
+ listener.onProgressStatus(SpawnProgressEvent.create(id, progress, finished));
+ }
+ }
+
/**
* Download the output files and directory trees of a remotely executed action to the local
* machine, as well stdin / stdout to the given files.
@@ -330,7 +381,8 @@
RemotePathResolver remotePathResolver,
ActionResult result,
FileOutErr origOutErr,
- OutputFilesLocker outputFilesLocker)
+ OutputFilesLocker outputFilesLocker,
+ ProgressStatusListener progressStatusListener)
throws ExecException, IOException, InterruptedException {
ActionResultMetadata metadata = parseActionResultMetadata(context, remotePathResolver, result);
@@ -347,7 +399,11 @@
context,
remotePathResolver.localPathToOutputPath(file.path()),
toTmpDownloadPath(file.path()),
- file.digest());
+ file.digest(),
+ new DownloadProgressReporter(
+ progressStatusListener,
+ remotePathResolver.localPathToOutputPath(file.path()),
+ file.digest().getSizeBytes()));
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
@@ -499,10 +555,14 @@
}
public ListenableFuture<Void> downloadFile(
- RemoteActionExecutionContext context, String outputPath, Path localPath, Digest digest)
+ RemoteActionExecutionContext context,
+ String outputPath,
+ Path localPath,
+ Digest digest,
+ DownloadProgressReporter reporter)
throws IOException {
SettableFuture<Void> outerF = SettableFuture.create();
- ListenableFuture<Void> f = downloadFile(context, localPath, digest);
+ ListenableFuture<Void> f = downloadFile(context, localPath, digest, reporter);
Futures.addCallback(
f,
new FutureCallback<Void>() {
@@ -529,6 +589,16 @@
/** Downloads a file (that is not a directory). The content is fetched from the digest. */
public ListenableFuture<Void> downloadFile(
RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
+ return downloadFile(context, path, digest, new DownloadProgressReporter(NO_ACTION, "", 0));
+ }
+
+ /** Downloads a file (that is not a directory). The content is fetched from the digest. */
+ public ListenableFuture<Void> downloadFile(
+ RemoteActionExecutionContext context,
+ Path path,
+ Digest digest,
+ DownloadProgressReporter reporter)
+ throws IOException {
Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
@@ -549,7 +619,9 @@
return COMPLETED_SUCCESS;
}
- OutputStream out = new LazyFileOutputStream(path);
+ reporter.started();
+ OutputStream out = new ReportingOutputStream(new LazyFileOutputStream(path), reporter);
+
SettableFuture<Void> outerF = SettableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
Futures.addCallback(
@@ -560,6 +632,7 @@
try {
out.close();
outerF.set(null);
+ reporter.finished();
} catch (IOException e) {
outerF.setException(e);
} catch (RuntimeException e) {
@@ -572,6 +645,7 @@
public void onFailure(Throwable t) {
try {
out.close();
+ reporter.finished();
} catch (IOException e) {
if (t != e) {
t.addSuppressed(e);
@@ -1100,6 +1174,49 @@
.build();
}
+ /**
+ * An {@link OutputStream} that reports all the write operations with {@link
+ * DownloadProgressReporter}.
+ */
+ private static class ReportingOutputStream extends OutputStream {
+
+ private final OutputStream out;
+ private final DownloadProgressReporter reporter;
+
+ ReportingOutputStream(OutputStream out, DownloadProgressReporter reporter) {
+ this.out = out;
+ this.reporter = reporter;
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ reporter.downloadedBytes(b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ reporter.downloadedBytes(len);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ reporter.downloadedBytes(1);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
/** In-memory representation of action result metadata. */
static class ActionResultMetadata {