| // Copyright 2017 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.buildeventservice; |
| |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Strings; |
| import com.google.common.base.Supplier; |
| import com.google.common.base.Suppliers; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.flogger.GoogleLogger; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import com.google.devtools.build.lib.analysis.test.TestConfiguration.TestOptions; |
| import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; |
| import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; |
| import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventTransport; |
| import com.google.devtools.build.lib.buildeventstream.BuildEventTransportClosedEvent; |
| import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader; |
| import com.google.devtools.build.lib.buildeventstream.transports.BinaryFormatFileTransport; |
| import com.google.devtools.build.lib.buildeventstream.transports.BuildEventStreamOptions; |
| import com.google.devtools.build.lib.buildeventstream.transports.JsonFormatFileTransport; |
| import com.google.devtools.build.lib.buildeventstream.transports.TextFormatFileTransport; |
| import com.google.devtools.build.lib.events.Event; |
| import com.google.devtools.build.lib.events.EventHandler; |
| import com.google.devtools.build.lib.events.Reporter; |
| import com.google.devtools.build.lib.network.ConnectivityStatus; |
| import com.google.devtools.build.lib.network.ConnectivityStatus.Status; |
| import com.google.devtools.build.lib.network.ConnectivityStatusProvider; |
| import com.google.devtools.build.lib.profiler.AutoProfiler; |
| import com.google.devtools.build.lib.runtime.BlazeModule; |
| import com.google.devtools.build.lib.runtime.BuildEventStreamer; |
| import com.google.devtools.build.lib.runtime.CommandEnvironment; |
| import com.google.devtools.build.lib.runtime.CountingArtifactGroupNamer; |
| import com.google.devtools.build.lib.runtime.SynchronizedOutputStream; |
| import com.google.devtools.build.lib.util.AbruptExitException; |
| import com.google.devtools.build.lib.util.ExitCode; |
| import com.google.devtools.build.lib.util.LoggingUtil; |
| import com.google.devtools.build.lib.util.io.OutErr; |
| import com.google.devtools.common.options.OptionsBase; |
| import com.google.devtools.common.options.OptionsParsingException; |
| import com.google.devtools.common.options.OptionsParsingResult; |
| import java.io.BufferedOutputStream; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.time.Duration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| import javax.annotation.Nullable; |
| |
| /** |
| * Module responsible for the Build Event Transport (BEP) and Build Event Service (BES) |
| * functionality. |
| */ |
| public abstract class BuildEventServiceModule<BESOptionsT extends BuildEventServiceOptions> |
| extends BlazeModule { |
| |
| private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName()); |
| private static final GoogleLogger googleLogger = GoogleLogger.forEnclosingClass(); |
| |
| /** |
| * TargetComplete BEP events scale with the value of --runs_per_tests, thus setting a very large |
| * value for can result in BEP events that are too big for BES to handle. |
| */ |
| private static final int RUNS_PER_TEST_LIMIT = 100000; |
| |
| private BuildEventProtocolOptions bepOptions; |
| private AuthAndTLSOptions authTlsOptions; |
| private BuildEventStreamOptions besStreamOptions; |
| private boolean isRunsPerTestOverTheLimit; |
| |
| /** |
| * Holds the close futures for the upload of each transport with timeouts attached to them using |
| * {@link #constructCloseFuturesMapWithTimeouts(ImmutableMap)} obtained from {@link |
| * BuildEventTransport#getTimeout()}. |
| */ |
| private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> closeFuturesWithTimeoutsMap = |
| ImmutableMap.of(); |
| |
| /** |
| * Holds the half-close futures for the upload of each transport with timeouts attached to them |
| * using {@link #constructCloseFuturesMapWithTimeouts(ImmutableMap)} obtained from {@link |
| * BuildEventTransport#getTimeout()}. |
| * |
| * <p>The completion of the half-close indicates that the client has sent all of the data to the |
| * server and is just waiting for acknowledgement. The client must still keep the data buffered |
| * locally in case acknowledgement fails. |
| */ |
| private ImmutableMap<BuildEventTransport, ListenableFuture<Void>> |
| halfCloseFuturesWithTimeoutsMap = ImmutableMap.of(); |
| |
| // TODO(lpino): Use Optional instead of @Nullable for the members below. |
| @Nullable private OutErr outErr; |
| @Nullable private ImmutableSet<BuildEventTransport> bepTransports; |
| @Nullable private String buildRequestId; |
| @Nullable private String invocationId; |
| @Nullable private Reporter reporter; |
| @Nullable private BuildEventStreamer streamer; |
| @Nullable private ConnectivityStatusProvider connectivityProvider; |
| private static final String CONNECTIVITY_CACHE_KEY = "BES"; |
| |
| protected BESOptionsT besOptions; |
| |
| protected void reportCommandLineError(EventHandler commandLineReporter, Exception exception) { |
| // Don't hide unchecked exceptions as part of the error reporting. |
| Throwables.throwIfUnchecked(exception); |
| commandLineReporter.handle(Event.error(exception.getMessage())); |
| } |
| |
| /** Maximum duration Bazel waits for the previous invocation to finish before cancelling it. */ |
| protected Duration getMaxWaitForPreviousInvocation() { |
| return Duration.ofSeconds(5); |
| } |
| |
| /** Report errors in the command line and possibly fail the build. */ |
| private void reportError( |
| EventHandler commandLineReporter, |
| ModuleEnvironment moduleEnvironment, |
| String msg, |
| Exception exception, |
| ExitCode exitCode) { |
| // Don't hide unchecked exceptions as part of the error reporting. |
| Throwables.throwIfUnchecked(exception); |
| |
| googleLogger.atSevere().withCause(exception).log(msg); |
| AbruptExitException abruptException = new AbruptExitException(msg, exitCode, exception); |
| reportCommandLineError(commandLineReporter, exception); |
| moduleEnvironment.exit(abruptException); |
| } |
| |
| @Override |
| public Iterable<Class<? extends OptionsBase>> getCommonCommandOptions() { |
| return ImmutableList.of( |
| optionsClass(), |
| AuthAndTLSOptions.class, |
| BuildEventStreamOptions.class, |
| BuildEventProtocolOptions.class); |
| } |
| |
| // Resets the maps tracking the state of closing/half-closing BES transports. |
| private void resetPendingUploads() { |
| closeFuturesWithTimeoutsMap = ImmutableMap.of(); |
| halfCloseFuturesWithTimeoutsMap = ImmutableMap.of(); |
| } |
| |
| // Cancels and interrupts any in-flight threads closing BES transports, then resets the maps |
| // tracking in-flight close operations. |
| private void cancelAndResetPendingUploads() { |
| closeFuturesWithTimeoutsMap |
| .values() |
| .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true)); |
| resetPendingUploads(); |
| } |
| |
| private static boolean isTimeoutException(ExecutionException e) { |
| return e.getCause() instanceof TimeoutException; |
| } |
| |
| private void waitForPreviousInvocation() { |
| if (closeFuturesWithTimeoutsMap.isEmpty()) { |
| return; |
| } |
| |
| ConnectivityStatus status = connectivityProvider.getStatus(CONNECTIVITY_CACHE_KEY); |
| if (status.status != ConnectivityStatus.Status.OK) { |
| reporter.handle( |
| Event.info( |
| String.format( |
| "The Build Event Protocol encountered a connectivity problem: %s. Cancelling" |
| + " previous background uploads", |
| status))); |
| cancelAndResetPendingUploads(); |
| return; |
| } |
| |
| ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap = null; |
| boolean cancelCloseFutures = true; |
| switch (besOptions.besUploadMode) { |
| case FULLY_ASYNC: |
| waitingFutureMap = halfCloseFuturesWithTimeoutsMap; |
| cancelCloseFutures = false; |
| break; |
| case WAIT_FOR_UPLOAD_COMPLETE: |
| case NOWAIT_FOR_UPLOAD_COMPLETE: |
| waitingFutureMap = closeFuturesWithTimeoutsMap; |
| cancelCloseFutures = true; |
| break; |
| } |
| |
| Stopwatch stopwatch = Stopwatch.createStarted(); |
| try { |
| // TODO(b/234994611): It would be better to report before we wait, but the current |
| // infrastructure does not support that. At least we can report it afterwards. |
| Uninterruptibles.getUninterruptibly( |
| Futures.allAsList(waitingFutureMap.values()), |
| getMaxWaitForPreviousInvocation().getSeconds(), |
| TimeUnit.SECONDS); |
| long waitedMillis = stopwatch.elapsed().toMillis(); |
| if (waitedMillis > 100) { |
| reporter.handle( |
| Event.info( |
| String.format( |
| "Waited for the background upload of the Build Event Protocol for " |
| + "%d.%03d seconds.", |
| waitedMillis / 1000, waitedMillis % 1000))); |
| } |
| } catch (TimeoutException exception) { |
| long waitedMillis = stopwatch.elapsed().toMillis(); |
| String msg = |
| String.format( |
| "The background upload of the Build Event Protocol for the previous invocation " |
| + "failed to complete in %d.%03d seconds. " |
| + "Cancelling and starting a new invocation...", |
| waitedMillis / 1000, waitedMillis % 1000); |
| reporter.handle(Event.warn(msg)); |
| googleLogger.atWarning().withCause(exception).log(msg); |
| cancelCloseFutures = true; |
| } catch (ExecutionException e) { |
| String msg; |
| // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future |
| // times out. |
| if (isTimeoutException(e)) { |
| msg = |
| "The background upload of the Build Event Protocol for the previous invocation " |
| + "failed due to a network timeout. Ignoring the failure and starting a new " |
| + "invocation..."; |
| } else { |
| msg = |
| String.format( |
| "The background upload of the Build Event Protocol for the previous invocation " |
| + "failed with the following exception: '%s'. " |
| + "Ignoring the failure and starting a new invocation...", |
| e.getMessage()); |
| } |
| reporter.handle(Event.warn(msg)); |
| googleLogger.atWarning().withCause(e).log(msg); |
| cancelCloseFutures = true; |
| } finally { |
| if (cancelCloseFutures) { |
| cancelAndResetPendingUploads(); |
| } else { |
| resetPendingUploads(); |
| } |
| } |
| } |
| |
| @Override |
| public void beforeCommand(CommandEnvironment cmdEnv) { |
| this.invocationId = cmdEnv.getCommandId().toString(); |
| this.buildRequestId = cmdEnv.getBuildRequestId(); |
| this.reporter = cmdEnv.getReporter(); |
| |
| this.connectivityProvider = |
| Preconditions.checkNotNull( |
| cmdEnv.getRuntime().getBlazeModule(ConnectivityStatusProvider.class), |
| "No ConnectivityStatusProvider found in modules list"); |
| |
| OptionsParsingResult parsingResult = cmdEnv.getOptions(); |
| this.besOptions = Preconditions.checkNotNull(parsingResult.getOptions(optionsClass())); |
| this.bepOptions = |
| Preconditions.checkNotNull(parsingResult.getOptions(BuildEventProtocolOptions.class)); |
| this.authTlsOptions = |
| Preconditions.checkNotNull(parsingResult.getOptions(AuthAndTLSOptions.class)); |
| this.besStreamOptions = |
| Preconditions.checkNotNull(parsingResult.getOptions(BuildEventStreamOptions.class)); |
| this.isRunsPerTestOverTheLimit = |
| parsingResult.getOptions(TestOptions.class) != null |
| && parsingResult.getOptions(TestOptions.class).runsPerTest.stream() |
| .anyMatch( |
| (perLabelOptions) -> |
| Integer.parseInt(Iterables.getOnlyElement(perLabelOptions.getOptions())) |
| > RUNS_PER_TEST_LIMIT); |
| |
| ConnectivityStatus status = connectivityProvider.getStatus(CONNECTIVITY_CACHE_KEY); |
| String buildEventUploadStrategy = |
| status.status.equals(ConnectivityStatus.Status.OK) |
| ? this.bepOptions.buildEventUploadStrategy |
| : "local"; |
| |
| CountingArtifactGroupNamer artifactGroupNamer = new CountingArtifactGroupNamer(); |
| Supplier<BuildEventArtifactUploader> uploaderSupplier = |
| Suppliers.memoize( |
| () -> |
| cmdEnv |
| .getRuntime() |
| .getBuildEventArtifactUploaderFactoryMap() |
| .select(buildEventUploadStrategy) |
| .create(cmdEnv)); |
| |
| // We need to wait for the previous invocation before we check the whitelist of commands to |
| // allow completing previous runs using BES, for example: |
| // bazel build (..run with async BES..) |
| // bazel info <-- Doesn't run with BES unless we wait before checking the whitelist. |
| waitForPreviousInvocation(); |
| |
| if (!whitelistedCommands(besOptions).contains(cmdEnv.getCommandName())) { |
| // Exit early if the running command isn't supported. |
| return; |
| } |
| |
| bepTransports = createBepTransports(cmdEnv, uploaderSupplier, artifactGroupNamer); |
| if (bepTransports.isEmpty()) { |
| // Exit early if there are no transports to stream to. |
| return; |
| } |
| |
| streamer = |
| new BuildEventStreamer.Builder() |
| .buildEventTransports(bepTransports) |
| .besStreamOptions(besStreamOptions) |
| .artifactGroupNamer(artifactGroupNamer) |
| .build(); |
| |
| cmdEnv.getEventBus().register(streamer); |
| registerOutAndErrOutputStreams(); |
| |
| // This event should probably be posted in a more general place (e.g. {@link BuildTool}; |
| // however, so far the BES module is the only module that requires extra work after the build |
| // so we post it here until it's needed for other modules. |
| reporter.post(new AnnounceBuildEventTransportsEvent(bepTransports)); |
| } |
| |
| private void registerOutAndErrOutputStreams() { |
| int bufferSize = besOptions.besOuterrBufferSize; |
| int chunkSize = besOptions.besOuterrChunkSize; |
| final SynchronizedOutputStream out = new SynchronizedOutputStream(bufferSize, chunkSize); |
| final SynchronizedOutputStream err = new SynchronizedOutputStream(bufferSize, chunkSize); |
| |
| this.outErr = OutErr.create(out, err); |
| streamer.registerOutErrProvider( |
| new BuildEventStreamer.OutErrProvider() { |
| @Override |
| public Iterable<String> getOut() { |
| return out.readAndReset(); |
| } |
| |
| @Override |
| public Iterable<String> getErr() { |
| return err.readAndReset(); |
| } |
| }); |
| err.registerStreamer(streamer); |
| out.registerStreamer(streamer); |
| } |
| |
| @Override |
| public OutErr getOutputListener() { |
| return outErr; |
| } |
| |
| private void forceShutdownBuildEventStreamer() { |
| streamer.close(AbortReason.INTERNAL); |
| closeFuturesWithTimeoutsMap = |
| constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap()); |
| try { |
| googleLogger.atInfo().log("Closing pending build event transports"); |
| Uninterruptibles.getUninterruptibly(Futures.allAsList(closeFuturesWithTimeoutsMap.values())); |
| } catch (ExecutionException e) { |
| googleLogger.atSevere().withCause(e).log("Failed to close a build event transport"); |
| LoggingUtil.logToRemote(Level.SEVERE, "Failed to close a build event transport", e); |
| } finally { |
| cancelAndResetPendingUploads(); |
| } |
| } |
| |
| @Override |
| public void blazeShutdownOnCrash() { |
| if (streamer != null) { |
| googleLogger.atWarning().log("Attempting to close BES streamer on crash"); |
| forceShutdownBuildEventStreamer(); |
| } |
| } |
| |
| @Override |
| public void blazeShutdown() { |
| if (closeFuturesWithTimeoutsMap.isEmpty()) { |
| return; |
| } |
| |
| try { |
| Uninterruptibles.getUninterruptibly( |
| Futures.allAsList(closeFuturesWithTimeoutsMap.values()), |
| getMaxWaitForPreviousInvocation().getSeconds(), |
| TimeUnit.SECONDS); |
| } catch (TimeoutException | ExecutionException exception) { |
| googleLogger.atWarning().withCause(exception).log( |
| "Encountered Exception when closing BEP transports in Blaze's shutting down sequence"); |
| } finally { |
| cancelAndResetPendingUploads(); |
| } |
| } |
| |
| private void waitForBuildEventTransportsToClose( |
| Map<BuildEventTransport, ListenableFuture<Void>> transportFutures) |
| throws AbruptExitException { |
| final ScheduledExecutorService executor = |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder().setNameFormat("bes-notify-ui-%d").build()); |
| ScheduledFuture<?> waitMessageFuture = null; |
| |
| try { |
| // Notify the UI handler when a transport finished closing. |
| transportFutures.forEach( |
| (bepTransport, closeFuture) -> |
| closeFuture.addListener( |
| () -> { |
| reporter.post(new BuildEventTransportClosedEvent(bepTransport)); |
| }, |
| executor)); |
| |
| try (AutoProfiler p = AutoProfiler.logged("waiting for BES close", logger)) { |
| Uninterruptibles.getUninterruptibly(Futures.allAsList(transportFutures.values())); |
| } |
| } catch (ExecutionException e) { |
| // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future |
| // times out. |
| if (isTimeoutException(e)) { |
| throw new AbruptExitException( |
| "The Build Event Protocol upload timed out", |
| ExitCode.TRANSIENT_BUILD_EVENT_SERVICE_UPLOAD_ERROR, |
| e); |
| } |
| |
| Throwables.throwIfInstanceOf(e.getCause(), AbruptExitException.class); |
| throw new RuntimeException( |
| String.format( |
| "Unexpected Exception '%s' when closing BEP transports, this is a bug.", |
| e.getCause().getMessage())); |
| } finally { |
| cancelAndResetPendingUploads(); |
| if (waitMessageFuture != null) { |
| waitMessageFuture.cancel(/* mayInterruptIfRunning= */ true); |
| } |
| executor.shutdown(); |
| } |
| } |
| |
| private static ImmutableMap<BuildEventTransport, ListenableFuture<Void>> |
| constructCloseFuturesMapWithTimeouts( |
| ImmutableMap<BuildEventTransport, ListenableFuture<Void>> bepTransportToCloseFuturesMap) { |
| ImmutableMap.Builder<BuildEventTransport, ListenableFuture<Void>> builder = |
| ImmutableMap.builder(); |
| |
| bepTransportToCloseFuturesMap.forEach( |
| (bepTransport, closeFuture) -> { |
| final ListenableFuture<Void> closeFutureWithTimeout; |
| if (bepTransport.getTimeout().isZero() || bepTransport.getTimeout().isNegative()) { |
| closeFutureWithTimeout = closeFuture; |
| } else { |
| final ScheduledExecutorService timeoutExecutor = |
| Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setNameFormat("bes-close-" + bepTransport.name() + "-%d") |
| .build()); |
| |
| // Make sure to avoid propagating the cancellation to the enclosing future since |
| // we handle cancellation ourselves in this class. |
| // Futures.withTimeout may cancel the enclosing future when the timeout is |
| // reached. |
| final ListenableFuture<Void> enclosingFuture = |
| Futures.nonCancellationPropagating(closeFuture); |
| |
| closeFutureWithTimeout = |
| Futures.withTimeout( |
| enclosingFuture, |
| bepTransport.getTimeout().toMillis(), |
| TimeUnit.MILLISECONDS, |
| timeoutExecutor); |
| closeFutureWithTimeout.addListener( |
| () -> timeoutExecutor.shutdown(), MoreExecutors.directExecutor()); |
| } |
| builder.put(bepTransport, closeFutureWithTimeout); |
| }); |
| |
| return builder.build(); |
| } |
| |
| private void closeBepTransports() throws AbruptExitException { |
| closeFuturesWithTimeoutsMap = |
| constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap()); |
| halfCloseFuturesWithTimeoutsMap = |
| constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap()); |
| |
| Map<BuildEventTransport, ListenableFuture<Void>> blockingTransportFutures = new HashMap<>(); |
| for (Map.Entry<BuildEventTransport, ListenableFuture<Void>> entry : |
| closeFuturesWithTimeoutsMap.entrySet()) { |
| BuildEventTransport bepTransport = entry.getKey(); |
| if (!bepTransport.mayBeSlow() |
| || besOptions.besUploadMode |
| == BuildEventServiceOptions.BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE) { |
| blockingTransportFutures.put(bepTransport, entry.getValue()); |
| } else { |
| // When running asynchronously notify the UI immediately since we won't wait for the |
| // uploads to close. |
| reporter.post(new BuildEventTransportClosedEvent(bepTransport)); |
| } |
| } |
| if (!blockingTransportFutures.isEmpty()) { |
| waitForBuildEventTransportsToClose(blockingTransportFutures); |
| } |
| } |
| |
| @Override |
| public void afterCommand() throws AbruptExitException { |
| if (streamer != null) { |
| if (!streamer.isClosed()) { |
| // This should not occur, but close with an internal error if a {@link BuildEventStreamer} |
| // bug manifests as an unclosed streamer. |
| googleLogger.atWarning().log("Attempting to close BES streamer after command"); |
| String msg = "BES was not properly closed"; |
| LoggingUtil.logToRemote(Level.WARNING, msg, new IllegalStateException(msg)); |
| forceShutdownBuildEventStreamer(); |
| } |
| |
| closeBepTransports(); |
| |
| if (!Strings.isNullOrEmpty(besOptions.besBackend)) { |
| constructAndMaybeReportInvocationIdUrl(); |
| } else if (!bepTransports.isEmpty()) { |
| reporter.handle(Event.info("Build Event Protocol files produced successfully.")); |
| } |
| } |
| |
| if (!besStreamOptions.keepBackendConnections) { |
| clearBesClient(); |
| } |
| } |
| |
| @Override |
| public void commandComplete() { |
| this.outErr = null; |
| this.bepTransports = null; |
| this.invocationId = null; |
| this.buildRequestId = null; |
| this.reporter = null; |
| this.streamer = null; |
| } |
| |
| private void constructAndMaybeReportInvocationIdUrl() { |
| if (!getInvocationIdPrefix().isEmpty()) { |
| reporter.handle( |
| Event.info("Streaming build results to: " + getInvocationIdPrefix() + invocationId)); |
| } |
| } |
| |
| private void constructAndMaybeReportBuildRequestIdUrl() { |
| if (!getBuildRequestIdPrefix().isEmpty()) { |
| reporter.handle( |
| Event.info( |
| "See " |
| + getBuildRequestIdPrefix() |
| + buildRequestId |
| + " for more information about your request.")); |
| } |
| } |
| |
| private void constructAndReportIds() { |
| reporter.handle( |
| Event.info( |
| String.format( |
| "Streaming Build Event Protocol to '%s' with build_request_id: '%s'" |
| + " and invocation_id: '%s'", |
| besOptions.besBackend, buildRequestId, invocationId))); |
| } |
| |
| @Nullable |
| private BuildEventServiceTransport createBesTransport( |
| CommandEnvironment cmdEnv, |
| Supplier<BuildEventArtifactUploader> uploaderSupplier, |
| CountingArtifactGroupNamer artifactGroupNamer) { |
| if (Strings.isNullOrEmpty(besOptions.besBackend)) { |
| clearBesClient(); |
| return null; |
| } |
| |
| if (isRunsPerTestOverTheLimit) { |
| String msg = |
| String.format( |
| "The value of --runs_per_test is bigger than %d and it will produce build events " |
| + "that are too big for the Build Event Service to handle.", |
| RUNS_PER_TEST_LIMIT); |
| reportError( |
| reporter, |
| cmdEnv.getBlazeModuleEnvironment(), |
| msg, |
| new OptionsParsingException(msg), |
| ExitCode.COMMAND_LINE_ERROR); |
| return null; |
| } |
| |
| constructAndReportIds(); |
| |
| ConnectivityStatus status = connectivityProvider.getStatus(CONNECTIVITY_CACHE_KEY); |
| if (status.status != Status.OK) { |
| clearBesClient(); |
| String message = |
| String.format( |
| "Build Event Service uploads disabled due to a connectivity problem: %s", status); |
| reporter.handle(Event.warn(message)); |
| googleLogger.atWarning().log(message); |
| return null; |
| } |
| |
| final BuildEventServiceClient besClient; |
| try { |
| besClient = getBesClient(besOptions, authTlsOptions); |
| } catch (IOException | OptionsParsingException e) { |
| reportError( |
| reporter, |
| cmdEnv.getBlazeModuleEnvironment(), |
| e.getMessage(), |
| e, |
| ExitCode.LOCAL_ENVIRONMENTAL_ERROR); |
| return null; |
| } |
| |
| BuildEventServiceProtoUtil besProtoUtil = |
| new BuildEventServiceProtoUtil.Builder() |
| .buildRequestId(buildRequestId) |
| .invocationId(invocationId) |
| .projectId(besOptions.projectId) |
| .commandName(cmdEnv.getCommandName()) |
| .keywords(getBesKeywords(besOptions, cmdEnv.getRuntime().getStartupOptionsProvider())) |
| .build(); |
| |
| return new BuildEventServiceTransport.Builder() |
| .localFileUploader(uploaderSupplier.get()) |
| .besClient(besClient) |
| .besOptions(besOptions) |
| .besProtoUtil(besProtoUtil) |
| .artifactGroupNamer(artifactGroupNamer) |
| .bepOptions(bepOptions) |
| .clock(cmdEnv.getRuntime().getClock()) |
| .eventBus(cmdEnv.getEventBus()) |
| .build(); |
| } |
| |
| private ImmutableSet<BuildEventTransport> createBepTransports( |
| CommandEnvironment cmdEnv, |
| Supplier<BuildEventArtifactUploader> uploaderSupplier, |
| CountingArtifactGroupNamer artifactGroupNamer) { |
| ImmutableSet.Builder<BuildEventTransport> bepTransportsBuilder = new ImmutableSet.Builder<>(); |
| |
| if (!Strings.isNullOrEmpty(besStreamOptions.buildEventTextFile)) { |
| try { |
| BufferedOutputStream bepTextOutputStream = |
| new BufferedOutputStream( |
| Files.newOutputStream(Paths.get(besStreamOptions.buildEventTextFile))); |
| |
| BuildEventArtifactUploader localFileUploader = |
| besStreamOptions.buildEventTextFilePathConversion |
| ? uploaderSupplier.get() |
| : new LocalFilesArtifactUploader(); |
| bepTransportsBuilder.add( |
| new TextFormatFileTransport( |
| bepTextOutputStream, |
| bepOptions, |
| localFileUploader, |
| artifactGroupNamer)); |
| } catch (IOException exception) { |
| // TODO(b/125216340): Consider making this a warning instead of an error once the |
| // associated bug has been resolved. |
| reportError( |
| reporter, |
| cmdEnv.getBlazeModuleEnvironment(), |
| "Unable to write to '" |
| + besStreamOptions.buildEventTextFile |
| + "'. Omitting --build_event_text_file.", |
| exception, |
| ExitCode.LOCAL_ENVIRONMENTAL_ERROR); |
| } |
| } |
| |
| if (!Strings.isNullOrEmpty(besStreamOptions.buildEventBinaryFile)) { |
| try { |
| BufferedOutputStream bepBinaryOutputStream = |
| new BufferedOutputStream( |
| Files.newOutputStream(Paths.get(besStreamOptions.buildEventBinaryFile))); |
| |
| BuildEventArtifactUploader localFileUploader = |
| besStreamOptions.buildEventBinaryFilePathConversion |
| ? uploaderSupplier.get() |
| : new LocalFilesArtifactUploader(); |
| bepTransportsBuilder.add( |
| new BinaryFormatFileTransport( |
| bepBinaryOutputStream, |
| bepOptions, |
| localFileUploader, |
| artifactGroupNamer)); |
| } catch (IOException exception) { |
| // TODO(b/125216340): Consider making this a warning instead of an error once the |
| // associated bug has been resolved. |
| reportError( |
| reporter, |
| cmdEnv.getBlazeModuleEnvironment(), |
| "Unable to write to '" |
| + besStreamOptions.buildEventBinaryFile |
| + "'. Omitting --build_event_binary_file.", |
| exception, |
| ExitCode.LOCAL_ENVIRONMENTAL_ERROR); |
| } |
| } |
| |
| if (!Strings.isNullOrEmpty(besStreamOptions.buildEventJsonFile)) { |
| try { |
| BufferedOutputStream bepJsonOutputStream = |
| new BufferedOutputStream( |
| Files.newOutputStream(Paths.get(besStreamOptions.buildEventJsonFile))); |
| BuildEventArtifactUploader localFileUploader = |
| besStreamOptions.buildEventJsonFilePathConversion |
| ? uploaderSupplier.get() |
| : new LocalFilesArtifactUploader(); |
| bepTransportsBuilder.add( |
| new JsonFormatFileTransport( |
| bepJsonOutputStream, |
| bepOptions, |
| localFileUploader, |
| artifactGroupNamer)); |
| } catch (IOException exception) { |
| // TODO(b/125216340): Consider making this a warning instead of an error once the |
| // associated bug has been resolved. |
| reportError( |
| reporter, |
| cmdEnv.getBlazeModuleEnvironment(), |
| "Unable to write to '" |
| + besStreamOptions.buildEventJsonFile |
| + "'. Omitting --build_event_json_file.", |
| exception, |
| ExitCode.LOCAL_ENVIRONMENTAL_ERROR); |
| } |
| } |
| |
| BuildEventServiceTransport besTransport = |
| createBesTransport(cmdEnv, uploaderSupplier, artifactGroupNamer); |
| if (besTransport != null) { |
| constructAndMaybeReportInvocationIdUrl(); |
| constructAndMaybeReportBuildRequestIdUrl(); |
| bepTransportsBuilder.add(besTransport); |
| } |
| |
| return bepTransportsBuilder.build(); |
| } |
| |
| protected abstract Class<BESOptionsT> optionsClass(); |
| |
| protected abstract BuildEventServiceClient getBesClient( |
| BESOptionsT besOptions, AuthAndTLSOptions authAndTLSOptions) |
| throws IOException, OptionsParsingException; |
| |
| protected abstract void clearBesClient(); |
| |
| protected abstract Set<String> whitelistedCommands(BESOptionsT besOptions); |
| |
| protected Set<String> getBesKeywords( |
| BESOptionsT besOptions, @Nullable OptionsParsingResult startupOptionsProvider) { |
| return besOptions.besKeywords.stream() |
| .map(keyword -> "user_keyword=" + keyword) |
| .collect(ImmutableSet.toImmutableSet()); |
| } |
| |
| /** A prefix used when printing the invocation ID in the command line */ |
| protected abstract String getInvocationIdPrefix(); |
| |
| /** A prefix used when printing the build request ID in the command line */ |
| protected abstract String getBuildRequestIdPrefix(); |
| |
| // TODO(b/115961387): This method shouldn't exist. It only does because some tests are relying on |
| // the transport creation logic of this module directly. |
| @VisibleForTesting |
| ImmutableSet<BuildEventTransport> getBepTransports() { |
| return bepTransports; |
| } |
| } |