| // Copyright 2017 The Bazel Authors. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| package com.google.devtools.build.lib.remote; |
| |
| import static com.google.common.base.Strings.isNullOrEmpty; |
| import static com.google.devtools.build.lib.profiler.ProfilerTask.REMOTE_DOWNLOAD; |
| import static com.google.devtools.build.lib.remote.util.Utils.createExecExceptionForCredentialHelperException; |
| import static com.google.devtools.build.lib.remote.util.Utils.createExecExceptionFromRemoteExecutionCapabilitiesException; |
| import static com.google.devtools.build.lib.remote.util.Utils.createSpawnResult; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Stopwatch; |
| import com.google.devtools.build.lib.actions.ActionInput; |
| import com.google.devtools.build.lib.actions.ExecException; |
| import com.google.devtools.build.lib.actions.FileArtifactValue; |
| import com.google.devtools.build.lib.actions.ForbiddenActionInputException; |
| import com.google.devtools.build.lib.actions.Spawn; |
| import com.google.devtools.build.lib.actions.SpawnMetrics; |
| import com.google.devtools.build.lib.actions.SpawnResult; |
| import com.google.devtools.build.lib.actions.cache.VirtualActionInput; |
| import com.google.devtools.build.lib.authandtls.credentialhelper.CredentialHelperException; |
| import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; |
| import com.google.devtools.build.lib.events.Event; |
| import com.google.devtools.build.lib.exec.SpawnCache; |
| import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; |
| import com.google.devtools.build.lib.profiler.Profiler; |
| import com.google.devtools.build.lib.profiler.ProfilerTask; |
| import com.google.devtools.build.lib.profiler.SilentCloseable; |
| import com.google.devtools.build.lib.remote.RemoteExecutionService.LocalExecution; |
| import com.google.devtools.build.lib.remote.RemoteExecutionService.RemoteActionResult; |
| import com.google.devtools.build.lib.remote.common.BulkTransferException; |
| import com.google.devtools.build.lib.remote.common.CacheNotFoundException; |
| import com.google.devtools.build.lib.remote.common.RemoteCacheClient; |
| import com.google.devtools.build.lib.remote.common.RemoteExecutionCapabilitiesException; |
| import com.google.devtools.build.lib.remote.options.RemoteOptions; |
| import com.google.devtools.build.lib.remote.util.DigestUtil; |
| import com.google.devtools.build.lib.remote.util.Utils; |
| import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput; |
| import com.google.devtools.build.lib.vfs.Path; |
| import java.io.IOException; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** A remote {@link SpawnCache} implementation. */ |
| @ThreadSafe // If the RemoteActionCache implementation is thread-safe. |
| final class RemoteSpawnCache implements SpawnCache { |
| |
| private final Path execRoot; |
| private final RemoteOptions options; |
| private final RemoteExecutionService remoteExecutionService; |
| private final DigestUtil digestUtil; |
| private final boolean verboseFailures; |
| private final ConcurrentHashMap<RemoteCacheClient.ActionKey, LocalExecution> inFlightExecutions = |
| new ConcurrentHashMap<>(); |
| |
| RemoteSpawnCache( |
| Path execRoot, |
| RemoteOptions options, |
| boolean verboseFailures, |
| RemoteExecutionService remoteExecutionService, |
| DigestUtil digestUtil) { |
| this.execRoot = execRoot; |
| this.options = options; |
| this.verboseFailures = verboseFailures; |
| this.remoteExecutionService = remoteExecutionService; |
| this.digestUtil = digestUtil; |
| } |
| |
| @VisibleForTesting |
| RemoteExecutionService getRemoteExecutionService() { |
| return remoteExecutionService; |
| } |
| |
| @Override |
| public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) |
| throws InterruptedException, IOException, ExecException, ForbiddenActionInputException { |
| boolean shouldAcceptCachedResult = |
| remoteExecutionService.getReadCachePolicy(spawn).allowAnyCache(); |
| boolean shouldUploadLocalResults = |
| remoteExecutionService.getWriteCachePolicy(spawn).allowAnyCache(); |
| if (!shouldAcceptCachedResult && !shouldUploadLocalResults) { |
| return SpawnCache.NO_RESULT_NO_STORE; |
| } |
| |
| Stopwatch totalTime = Stopwatch.createStarted(); |
| |
| RemoteAction action = remoteExecutionService.buildRemoteAction(spawn, context); |
| SpawnMetrics.Builder spawnMetrics = |
| SpawnMetrics.Builder.forRemoteExec() |
| .setInputBytes(action.getInputBytes()) |
| .setInputFiles(action.getInputFiles()); |
| |
| context.setDigest(digestUtil.asSpawnLogProto(action.getActionKey())); |
| |
| Profiler prof = Profiler.instance(); |
| LocalExecution thisExecution = null; |
| if (shouldAcceptCachedResult) { |
| // With path mapping enabled, different Spawns in a single build can have the same ActionKey. |
| // When their result isn't in the cache and two of them are scheduled concurrently, neither |
| // will result in a cache hit before the other finishes and uploads its result, which results |
| // in unnecessary work. To avoid this, we keep track of in-flight executions as long as their |
| // results haven't been uploaded to the cache yet and deduplicate all of them against the |
| // first one. |
| LocalExecution previousExecution = null; |
| try { |
| thisExecution = LocalExecution.createIfDeduplicatable(action); |
| if (shouldUploadLocalResults && thisExecution != null) { |
| LocalExecution previousOrThisExecution = |
| inFlightExecutions.merge( |
| action.getActionKey(), |
| thisExecution, |
| (existingExecution, thisExecutionArg) -> { |
| if (existingExecution.registerForOutputReuse()) { |
| return existingExecution; |
| } else { |
| // The existing execution has completed and its results may have already |
| // been modified by its action, so we can't deduplicate against it. Instead, |
| // start a new in-flight execution. |
| return thisExecutionArg; |
| } |
| }); |
| previousExecution = |
| previousOrThisExecution == thisExecution ? null : previousOrThisExecution; |
| } |
| try { |
| RemoteActionResult result; |
| try (SilentCloseable c = |
| prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { |
| result = remoteExecutionService.lookupCache(action); |
| } |
| // In case the remote cache returned a failed action (exit code != 0) we treat it as a |
| // cache miss |
| if (result != null && result.getExitCode() == 0) { |
| Stopwatch fetchTime = Stopwatch.createStarted(); |
| InMemoryOutput inMemoryOutput; |
| try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) { |
| inMemoryOutput = remoteExecutionService.downloadOutputs(action, result); |
| } |
| fetchTime.stop(); |
| totalTime.stop(); |
| spawnMetrics |
| .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) |
| .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) |
| .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); |
| SpawnResult spawnResult = |
| createSpawnResult( |
| digestUtil, |
| action.getActionKey(), |
| result.getExitCode(), |
| /* cacheHit= */ true, |
| result.cacheName(), |
| inMemoryOutput, |
| result.getExecutionMetadata().getExecutionStartTimestamp(), |
| result.getExecutionMetadata().getExecutionCompletedTimestamp(), |
| spawnMetrics.build(), |
| spawn.getMnemonic()); |
| return SpawnCache.success(spawnResult); |
| } |
| } catch (CacheNotFoundException e) { |
| // Intentionally left blank |
| } catch (CredentialHelperException e) { |
| throw createExecExceptionForCredentialHelperException(e); |
| } catch (RemoteExecutionCapabilitiesException e) { |
| throw createExecExceptionFromRemoteExecutionCapabilitiesException(e); |
| } catch (IOException e) { |
| if (BulkTransferException.allCausedByCacheNotFoundException(e)) { |
| // Intentionally left blank |
| } else { |
| String errorMessage = Utils.grpcAwareErrorMessage(e, verboseFailures); |
| if (isNullOrEmpty(errorMessage)) { |
| errorMessage = e.getClass().getSimpleName(); |
| } |
| errorMessage = "Remote Cache: " + errorMessage; |
| remoteExecutionService.report(Event.warn(errorMessage)); |
| } |
| } |
| if (previousExecution != null) { |
| Stopwatch fetchTime = Stopwatch.createStarted(); |
| SpawnResult previousResult; |
| try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "reuse outputs")) { |
| previousResult = |
| remoteExecutionService.waitForAndReuseOutputs(action, previousExecution); |
| } |
| if (previousResult != null) { |
| spawnMetrics |
| .setFetchTimeInMs((int) fetchTime.elapsed().toMillis()) |
| .setTotalTimeInMs((int) totalTime.elapsed().toMillis()) |
| .setNetworkTimeInMs((int) action.getNetworkTime().getDuration().toMillis()); |
| SpawnMetrics buildMetrics = spawnMetrics.build(); |
| return SpawnCache.success( |
| new SpawnResult.DelegateSpawnResult(previousResult) { |
| @Override |
| public String getRunnerName() { |
| return "deduplicated"; |
| } |
| |
| @Override |
| public SpawnMetrics getMetrics() { |
| return buildMetrics; |
| } |
| }); |
| } |
| // If we reach here, the previous execution was not successful (it encountered an |
| // exception or the spawn had an exit code != 0). Since it isn't possible to accurately |
| // recreate the failure without rerunning the action, we fall back to running the action |
| // locally. This means that we have introduced an unnecessary wait, but that can only |
| // happen in the case of a failing build with --keep_going. |
| } |
| } finally { |
| if (previousExecution != null) { |
| previousExecution.unregister(); |
| } |
| } |
| } |
| |
| if (shouldUploadLocalResults) { |
| final LocalExecution thisExecutionFinal = thisExecution; |
| return new CacheHandle() { |
| @Override |
| public boolean hasResult() { |
| return false; |
| } |
| |
| @Override |
| public SpawnResult getResult() { |
| throw new NoSuchElementException(); |
| } |
| |
| @Override |
| public boolean willStore() { |
| return true; |
| } |
| |
| @Override |
| public void store(SpawnResult result) throws ExecException, InterruptedException { |
| if (!remoteExecutionService.commitResultAndDecideWhetherToUpload( |
| result, thisExecutionFinal)) { |
| return; |
| } |
| |
| if (options.experimentalGuardAgainstConcurrentChanges) { |
| try (SilentCloseable c = prof.profile("checkForConcurrentModifications")) { |
| checkForConcurrentModifications(); |
| } catch (IOException | ForbiddenActionInputException e) { |
| var msg = |
| spawn.getTargetLabel() |
| + ": Skipping uploading outputs because of concurrent modifications " |
| + "with --experimental_guard_against_concurrent_changes enabled: " |
| + e.getMessage(); |
| remoteExecutionService.report(Event.warn(msg)); |
| return; |
| } |
| } |
| |
| // As soon as the result is in the cache, actions can get the result from it instead of |
| // from the first in-flight execution. Not keeping in-flight executions around |
| // indefinitely is important to avoid excessive memory pressure - Spawns can be very |
| // large. |
| remoteExecutionService.uploadOutputs( |
| action, result, () -> inFlightExecutions.remove(action.getActionKey())); |
| if (thisExecutionFinal != null |
| && action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) { |
| // In this case outputs have been uploaded synchronously and the callback above has run, |
| // so no new executions will be deduplicated against this one. We can safely await all |
| // existing executions finish the reuse. |
| // Note that while this call itself isn't interruptible, all operations it awaits are |
| // interruptible. |
| try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "await output reuse")) { |
| thisExecutionFinal.awaitAllOutputReuse(); |
| } |
| } |
| } |
| |
| private void checkForConcurrentModifications() |
| throws IOException, ForbiddenActionInputException { |
| for (ActionInput input : action.getInputMap(true).values()) { |
| if (input instanceof VirtualActionInput) { |
| continue; |
| } |
| FileArtifactValue metadata = context.getInputMetadataProvider().getInputMetadata(input); |
| Path path = execRoot.getRelative(input.getExecPath()); |
| if (metadata.wasModifiedSinceDigest(path)) { |
| throw new IOException(path + " was modified during execution"); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| if (thisExecutionFinal != null) { |
| thisExecutionFinal.cancel(); |
| } |
| } |
| }; |
| } else { |
| return SpawnCache.NO_RESULT_NO_STORE; |
| } |
| } |
| |
| @Override |
| public boolean usefulInDynamicExecution() { |
| return false; |
| } |
| } |