Push remote upload consolidation into the Uploader when using --experimental_stream_log_file_uploads
PiperOrigin-RevId: 281536757
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
index 9263c04..58474d7 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java
@@ -74,7 +74,7 @@
@Override
public BuildEventArtifactUploader getUploader() {
- return besUploader.getLocalFileUploader();
+ return besUploader.getBuildEventUploader();
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
index e03162e..8dacd2b 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java
@@ -94,7 +94,7 @@
private static final int DELAY_MILLIS = 1000;
private final BuildEventServiceClient besClient;
- private final BuildEventArtifactUploader localFileUploader;
+ private final BuildEventArtifactUploader buildEventUploader;
private final BuildEventServiceProtoUtil besProtoUtil;
private final BuildEventProtocolOptions buildEventProtocolOptions;
private final boolean publishLifecycleEvents;
@@ -154,7 +154,7 @@
ArtifactGroupNamer namer,
EventBus eventBus) {
this.besClient = besClient;
- this.localFileUploader = localFileUploader;
+ this.buildEventUploader = localFileUploader;
this.besProtoUtil = besProtoUtil;
this.buildEventProtocolOptions = buildEventProtocolOptions;
this.publishLifecycleEvents = publishLifecycleEvents;
@@ -168,8 +168,8 @@
() -> halfCloseFuture.setFuture(closeFuture), MoreExecutors.directExecutor());
}
- BuildEventArtifactUploader getLocalFileUploader() {
- return localFileUploader;
+ BuildEventArtifactUploader getBuildEventUploader() {
+ return buildEventUploader;
}
/** Enqueues an event for uploading to a BES backend. */
@@ -177,7 +177,7 @@
// This needs to happen outside a synchronized block as it may trigger
// stdout/stderr and lead to a deadlock. See b/109725432
ListenableFuture<PathConverter> localFileUploadFuture =
- localFileUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
+ buildEventUploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
// The generation of the sequence number and the addition to the {@link #eventQueue} should be
// atomic since BES expects the events in that exact order.
@@ -317,7 +317,7 @@
logger.severe("BES upload failed due to a RuntimeException / Error. This is a bug.");
throw e;
} finally {
- localFileUploader.shutdown();
+ buildEventUploader.shutdown();
MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS);
closeFuture.set(null);
}
@@ -613,19 +613,9 @@
throws LocalFileUploadException, InterruptedException {
try {
// Wait for the local file and pending remote uploads to complete.
- ListenableFuture<?> remoteUploads =
- Futures.successfulAsList(orderedBuildEvent.getEvent().remoteUploads());
-
- if (localFileUploader.timeout() != null) {
- remoteUploads =
- Futures.withTimeout(
- remoteUploads,
- localFileUploader.timeout().toMillis(),
- TimeUnit.MILLISECONDS,
- timeoutExecutor);
- }
- // TODO(bazel-team): Consider failing softy if remote upload fails.
- remoteUploads.get();
+ buildEventUploader
+ .waitForRemoteUploads(orderedBuildEvent.getEvent().remoteUploads(), timeoutExecutor)
+ .get();
return orderedBuildEvent.localFileUploadProgress().get();
} catch (ExecutionException e) {
logger.log(
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
index 3d4b1c6..aeb6afd 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java
@@ -23,9 +23,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.time.Duration;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;
@@ -42,12 +42,6 @@
*/
ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files);
- /** The timeout on individual file uploads, or null if none. */
- @Nullable
- default Duration timeout() {
- return null;
- }
-
/** The context associated with an in-flight remote upload. */
interface UploadContext {
@@ -109,4 +103,14 @@
}
return upload(localFileMap);
}
+
+ /**
+ * Blocks on the completion of pending remote uploads, enforcing the relevant timeout if
+ * applicable.
+ */
+ default ListenableFuture<?> waitForRemoteUploads(
+ Collection<ListenableFuture<String>> remoteUploads,
+ ScheduledExecutorService timeoutExecutor) {
+ return Futures.allAsList(remoteUploads);
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
index 9932b15..b99a1b9 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildToolLogs.java
@@ -93,11 +93,17 @@
String name = directFuturePair.getFirst();
ListenableFuture<String> directFuture = directFuturePair.getSecond();
try {
- toolLogs.addLog(
- BuildEventStreamProtos.File.newBuilder()
- .setName(name)
- .setUri(Futures.getDone(directFuture))
- .build());
+ String uri =
+ directFuture.isDone() && !directFuture.isCancelled()
+ ? Futures.getDone(directFuture)
+ : null;
+ if (uri != null) {
+ toolLogs.addLog(
+ BuildEventStreamProtos.File.newBuilder()
+ .setName(name)
+ .setUri(Futures.getDone(directFuture))
+ .build());
+ }
} catch (ExecutionException e) {
logger.log(Level.WARNING, "Skipping build tool log upload " + name, e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
index e317cdf..efef270 100644
--- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
+++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java
@@ -23,6 +23,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
@@ -40,7 +41,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,6 +65,11 @@
private final SequentialWriter writer;
private final ArtifactGroupNamer namer;
+ private final ScheduledExecutorService timeoutExecutor =
+ MoreExecutors.listeningDecorator(
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder().setNameFormat("file-uploader-timeout-%d").build()));
+
FileTransport(
BufferedOutputStream outputStream,
BuildEventProtocolOptions options,
@@ -69,7 +77,8 @@
ArtifactGroupNamer namer) {
this.uploader = uploader;
this.options = options;
- this.writer = new SequentialWriter(outputStream, this::serializeEvent, uploader);
+ this.writer =
+ new SequentialWriter(outputStream, this::serializeEvent, uploader, timeoutExecutor);
this.namer = namer;
}
@@ -96,18 +105,20 @@
final BlockingQueue<ListenableFuture<BuildEventStreamProtos.BuildEvent>> pendingWrites =
new LinkedBlockingDeque<>();
+ private ScheduledExecutorService timeoutExecutor;
+
SequentialWriter(
BufferedOutputStream outputStream,
Function<BuildEventStreamProtos.BuildEvent, byte[]> serializeFunc,
- BuildEventArtifactUploader uploader) {
- checkNotNull(outputStream);
- checkNotNull(serializeFunc);
+ BuildEventArtifactUploader uploader,
+ ScheduledExecutorService timeoutExecutor) {
checkNotNull(uploader);
- this.out = outputStream;
+ this.out = checkNotNull(outputStream);
this.writerThread = new Thread(this, "bep-local-writer");
- this.serializeFunc = serializeFunc;
- this.uploader = uploader;
+ this.serializeFunc = checkNotNull(serializeFunc);
+ this.uploader = checkNotNull(uploader);
+ this.timeoutExecutor = checkNotNull(timeoutExecutor);
writerThread.start();
}
@@ -142,6 +153,7 @@
out.close();
} finally {
uploader.shutdown();
+ timeoutExecutor.shutdown();
}
} catch (IOException e) {
logger.log(Level.SEVERE, "Failed to close BEP file output stream.", e);
@@ -239,8 +251,10 @@
ListenableFuture<PathConverter> converterFuture =
uploader.uploadReferencedLocalFiles(event.referencedLocalFiles());
+ ListenableFuture<?> remoteUploads =
+ uploader.waitForRemoteUploads(event.remoteUploads(), timeoutExecutor);
return Futures.transform(
- Futures.allAsList(converterFuture, Futures.successfulAsList(event.remoteUploads())),
+ Futures.allAsList(converterFuture, remoteUploads),
results -> {
BuildEventContext context =
new BuildEventContext() {