|  | // Copyright 2019 The Bazel Authors. All rights reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  | package com.google.devtools.build.lib.remote; | 
|  |  | 
|  | import build.bazel.remote.execution.v2.Digest; | 
|  | import com.google.common.annotations.VisibleForTesting; | 
|  | import com.google.common.base.Preconditions; | 
|  | import com.google.common.collect.ImmutableSet; | 
|  | import com.google.common.util.concurrent.FutureCallback; | 
|  | import com.google.common.util.concurrent.Futures; | 
|  | import com.google.common.util.concurrent.ListenableFuture; | 
|  | import com.google.common.util.concurrent.MoreExecutors; | 
|  | import com.google.devtools.build.lib.actions.ActionInput; | 
|  | import com.google.devtools.build.lib.actions.ActionInputPrefetcher; | 
|  | import com.google.devtools.build.lib.actions.FileArtifactValue; | 
|  | import com.google.devtools.build.lib.actions.MetadataProvider; | 
|  | import com.google.devtools.build.lib.actions.cache.VirtualActionInput; | 
|  | import com.google.devtools.build.lib.profiler.Profiler; | 
|  | import com.google.devtools.build.lib.profiler.ProfilerTask; | 
|  | import com.google.devtools.build.lib.profiler.SilentCloseable; | 
|  | import com.google.devtools.build.lib.remote.util.DigestUtil; | 
|  | import com.google.devtools.build.lib.remote.util.Utils; | 
|  | import com.google.devtools.build.lib.vfs.Path; | 
|  | import io.grpc.Context; | 
|  | import java.io.IOException; | 
|  | import java.io.OutputStream; | 
|  | import java.util.HashMap; | 
|  | import java.util.HashSet; | 
|  | import java.util.Map; | 
|  | import java.util.Set; | 
|  | import java.util.concurrent.ExecutionException; | 
|  | import java.util.logging.Level; | 
|  | import java.util.logging.Logger; | 
|  | import javax.annotation.concurrent.GuardedBy; | 
|  |  | 
|  | /** | 
|  | * Stages output files that are stored remotely to the local filesystem. | 
|  | * | 
|  | * <p>This is necessary for remote caching/execution when {@code | 
|  | * --experimental_remote_download_outputs=minimal} is specified. | 
|  | */ | 
|  | class RemoteActionInputFetcher implements ActionInputPrefetcher { | 
|  |  | 
|  | private static final Logger logger = Logger.getLogger(RemoteActionInputFetcher.class.getName()); | 
|  |  | 
|  | private final Object lock = new Object(); | 
|  |  | 
|  | /** Set of successfully downloaded output files. */ | 
|  | @GuardedBy("lock") | 
|  | private final Set<Path> downloadedPaths = new HashSet<>(); | 
|  |  | 
|  | @VisibleForTesting | 
|  | @GuardedBy("lock") | 
|  | final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>(); | 
|  |  | 
|  | private final AbstractRemoteActionCache remoteCache; | 
|  | private final Path execRoot; | 
|  | private final Context ctx; | 
|  |  | 
|  | RemoteActionInputFetcher(AbstractRemoteActionCache remoteCache, Path execRoot, Context ctx) { | 
|  | this.remoteCache = Preconditions.checkNotNull(remoteCache); | 
|  | this.execRoot = Preconditions.checkNotNull(execRoot); | 
|  | this.ctx = Preconditions.checkNotNull(ctx); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under | 
|  | * their path in the output base. | 
|  | * | 
|  | * <p>This method blocks until all downloads have finished. | 
|  | * | 
|  | * <p>This method is safe to be called concurrently from spawn runners before running any local | 
|  | * spawn. | 
|  | */ | 
|  | @Override | 
|  | public void prefetchFiles( | 
|  | Iterable<? extends ActionInput> inputs, MetadataProvider metadataProvider) | 
|  | throws IOException, InterruptedException { | 
|  | try (SilentCloseable c = | 
|  | Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "stage remote inputs")) { | 
|  | Map<Path, ListenableFuture<Void>> downloadsToWaitFor = new HashMap<>(); | 
|  | for (ActionInput input : inputs) { | 
|  | if (input instanceof VirtualActionInput) { | 
|  | VirtualActionInput paramFileActionInput = (VirtualActionInput) input; | 
|  | Path outputPath = execRoot.getRelative(paramFileActionInput.getExecPath()); | 
|  | outputPath.getParentDirectory().createDirectoryAndParents(); | 
|  | try (OutputStream out = outputPath.getOutputStream()) { | 
|  | paramFileActionInput.writeTo(out); | 
|  | } | 
|  | } else { | 
|  | FileArtifactValue metadata = metadataProvider.getMetadata(input); | 
|  | if (metadata == null || !metadata.isRemote()) { | 
|  | continue; | 
|  | } | 
|  |  | 
|  | Path path = execRoot.getRelative(input.getExecPath()); | 
|  | synchronized (lock) { | 
|  | if (downloadedPaths.contains(path)) { | 
|  | continue; | 
|  | } | 
|  | ListenableFuture<Void> download = downloadFileAsync(path, metadata); | 
|  | downloadsToWaitFor.putIfAbsent(path, download); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | IOException ioException = null; | 
|  | InterruptedException interruptedException = null; | 
|  | for (Map.Entry<Path, ListenableFuture<Void>> entry : downloadsToWaitFor.entrySet()) { | 
|  | try { | 
|  | Utils.getFromFuture(entry.getValue()); | 
|  | } catch (IOException e) { | 
|  | if (e instanceof CacheNotFoundException) { | 
|  | e = | 
|  | new IOException( | 
|  | String.format( | 
|  | "Failed to fetch file with hash '%s' because it does not exist remotely." | 
|  | + " --experimental_remote_outputs=minimal does not work if" | 
|  | + " your remote cache evicts files during builds.", | 
|  | ((CacheNotFoundException) e).getMissingDigest().getHash())); | 
|  | } | 
|  | ioException = ioException == null ? e : ioException; | 
|  | } catch (InterruptedException e) { | 
|  | interruptedException = interruptedException == null ? e : interruptedException; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (interruptedException != null) { | 
|  | throw interruptedException; | 
|  | } | 
|  | if (ioException != null) { | 
|  | throw ioException; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | ImmutableSet<Path> downloadedFiles() { | 
|  | synchronized (lock) { | 
|  | return ImmutableSet.copyOf(downloadedPaths); | 
|  | } | 
|  | } | 
|  |  | 
|  | void downloadFile(Path path, FileArtifactValue metadata) | 
|  | throws IOException, InterruptedException { | 
|  | try { | 
|  | downloadFileAsync(path, metadata).get(); | 
|  | } catch (ExecutionException e) { | 
|  | if (e.getCause() instanceof IOException) { | 
|  | throw (IOException) e.getCause(); | 
|  | } | 
|  | throw new IOException(e.getCause()); | 
|  | } | 
|  | } | 
|  |  | 
|  | private ListenableFuture<Void> downloadFileAsync(Path path, FileArtifactValue metadata) | 
|  | throws IOException { | 
|  | synchronized (lock) { | 
|  | if (downloadedPaths.contains(path)) { | 
|  | return Futures.immediateFuture(null); | 
|  | } | 
|  |  | 
|  | ListenableFuture<Void> download = downloadsInProgress.get(path); | 
|  | if (download == null) { | 
|  | Context prevCtx = ctx.attach(); | 
|  | try { | 
|  | Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize()); | 
|  | download = remoteCache.downloadFile(path, digest); | 
|  | downloadsInProgress.put(path, download); | 
|  | Futures.addCallback( | 
|  | download, | 
|  | new FutureCallback<Void>() { | 
|  | @Override | 
|  | public void onSuccess(Void v) { | 
|  | synchronized (lock) { | 
|  | downloadsInProgress.remove(path); | 
|  | downloadedPaths.add(path); | 
|  | } | 
|  |  | 
|  | try { | 
|  | path.chmod(0755); | 
|  | } catch (IOException e) { | 
|  | logger.log(Level.WARNING, "Failed to chmod 755 on " + path, e); | 
|  | } | 
|  | } | 
|  |  | 
|  | @Override | 
|  | public void onFailure(Throwable t) { | 
|  | synchronized (lock) { | 
|  | downloadsInProgress.remove(path); | 
|  | } | 
|  | try { | 
|  | path.delete(); | 
|  | } catch (IOException e) { | 
|  | logger.log( | 
|  | Level.WARNING, | 
|  | "Failed to delete output file after incomplete download: " + path, | 
|  | e); | 
|  | } | 
|  | } | 
|  | }, | 
|  | MoreExecutors.directExecutor()); | 
|  | } finally { | 
|  | ctx.detach(prevCtx); | 
|  | } | 
|  | } | 
|  | return download; | 
|  | } | 
|  | } | 
|  | } |