Refactor combined cache. (#16110)

Instead of guessing when to use remote/disk part, combined cache now uses read/write cache policy provided by the context. The call sites can change the policy based on the requirements.

Fixes #15934. But unfortunately, I am not able to add an integration test for it because our own remote worker doesn't support the asset API.

Closes #16039.

PiperOrigin-RevId: 465577383
Change-Id: I99effab1cdcba0890671ea64c4660ea31b059ce7
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
index 15c12bb..ecc9a3c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java
@@ -30,7 +30,7 @@
 import com.google.devtools.build.lib.events.Event;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
-import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext.Step;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext.CachePolicy;
 import com.google.devtools.build.lib.remote.util.DigestUtil;
 import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
 import com.google.devtools.build.lib.vfs.Path;
@@ -274,8 +274,9 @@
 
     RequestMetadata metadata =
         TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null);
-    RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
-    context.setStep(Step.UPLOAD_BES_FILES);
+    RemoteActionExecutionContext context =
+        RemoteActionExecutionContext.create(metadata)
+            .withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY);
 
     return Single.using(
         remoteCache::retain,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
index 57741a8..debde1a 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java
@@ -44,12 +44,11 @@
       PathFragment diskCachePath,
       boolean remoteVerifyDownloads,
       DigestUtil digestUtil,
-      RemoteCacheClient remoteCacheClient,
-      RemoteOptions options)
+      RemoteCacheClient remoteCacheClient)
       throws IOException {
     DiskCacheClient diskCacheClient =
         createDiskCache(workingDirectory, diskCachePath, remoteVerifyDownloads, digestUtil);
-    return new DiskAndRemoteCacheClient(diskCacheClient, remoteCacheClient, options);
+    return new DiskAndRemoteCacheClient(diskCacheClient, remoteCacheClient);
   }
 
   public static RemoteCacheClient create(
@@ -147,12 +146,7 @@
 
     RemoteCacheClient httpCache = createHttp(options, cred, digestUtil);
     return createDiskAndRemoteClient(
-        workingDirectory,
-        diskCachePath,
-        options.remoteVerifyDownloads,
-        digestUtil,
-        httpCache,
-        options);
+        workingDirectory, diskCachePath, options.remoteVerifyDownloads, digestUtil, httpCache);
   }
 
   public static boolean isDiskCache(RemoteOptions options) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
index e94875e..63af731 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java
@@ -27,12 +27,7 @@
 import static com.google.devtools.build.lib.remote.util.Utils.getInMemoryOutputPath;
 import static com.google.devtools.build.lib.remote.util.Utils.grpcAwareErrorMessage;
 import static com.google.devtools.build.lib.remote.util.Utils.hasFilesToDownload;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldAcceptCachedResultFromCombinedCache;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldAcceptCachedResultFromDiskCache;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldAcceptCachedResultFromRemoteCache;
 import static com.google.devtools.build.lib.remote.util.Utils.shouldDownloadAllSpawnOutputs;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldUploadLocalResultsToCombinedDisk;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldUploadLocalResultsToDiskCache;
 import static com.google.devtools.build.lib.remote.util.Utils.shouldUploadLocalResultsToRemoteCache;
 
 import build.bazel.remote.execution.v2.Action;
@@ -97,8 +92,9 @@
 import com.google.devtools.build.lib.remote.common.BulkTransferException;
 import com.google.devtools.build.lib.remote.common.OperationObserver;
 import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
+import com.google.devtools.build.lib.remote.common.ProgressStatusListener;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
-import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext.Step;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext.CachePolicy;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
 import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
@@ -281,41 +277,72 @@
     return options.diskCache != null && !options.diskCache.isEmpty();
   }
 
-  /** Returns {@code true} if the {@code spawn} should accept cached results from remote cache. */
-  public boolean shouldAcceptCachedResult(Spawn spawn) {
+  public CachePolicy getReadCachePolicy(Spawn spawn) {
     if (remoteCache == null) {
-      return false;
+      return CachePolicy.NO_CACHE;
     }
 
+    boolean allowDiskCache = false;
+    boolean allowRemoteCache = false;
+
     if (useRemoteCache(remoteOptions)) {
+      allowRemoteCache = remoteOptions.remoteAcceptCached && Spawns.mayBeCachedRemotely(spawn);
       if (useDiskCache(remoteOptions)) {
-        return shouldAcceptCachedResultFromCombinedCache(remoteOptions, spawn);
-      } else {
-        return shouldAcceptCachedResultFromRemoteCache(remoteOptions, spawn);
+        // Combined cache
+        if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
+          // --incompatible_remote_results_ignore_disk is set. Disk cache is treated as local cache.
+          // Actions which are tagged with `no-remote-cache` can still hit the disk cache.
+          allowDiskCache = Spawns.mayBeCached(spawn);
+        } else {
+          // Disk cache is treated as a remote cache and disabled for `no-remote-cache`.
+          allowDiskCache = allowRemoteCache;
+        }
       }
     } else {
-      return shouldAcceptCachedResultFromDiskCache(remoteOptions, spawn);
+      // Disk cache only
+      if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
+        allowDiskCache = Spawns.mayBeCached(spawn);
+      } else {
+        allowDiskCache = remoteOptions.remoteAcceptCached && Spawns.mayBeCached(spawn);
+      }
     }
+
+    return CachePolicy.create(allowRemoteCache, allowDiskCache);
   }
 
-  /**
-   * Returns {@code true} if the local results of the {@code spawn} should be uploaded to remote
-   * cache.
-   */
-  public boolean shouldUploadLocalResults(Spawn spawn) {
+  public CachePolicy getWriteCachePolicy(Spawn spawn) {
     if (remoteCache == null) {
-      return false;
+      return CachePolicy.NO_CACHE;
     }
 
+    boolean allowDiskCache = false;
+    boolean allowRemoteCache = false;
+
     if (useRemoteCache(remoteOptions)) {
+      allowRemoteCache =
+          shouldUploadLocalResultsToRemoteCache(remoteOptions, spawn.getExecutionInfo());
       if (useDiskCache(remoteOptions)) {
-        return shouldUploadLocalResultsToCombinedDisk(remoteOptions, spawn);
-      } else {
-        return shouldUploadLocalResultsToRemoteCache(remoteOptions, spawn);
+        // Combined cache
+        if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
+          // If --incompatible_remote_results_ignore_disk is set, we treat the disk cache part as
+          // local cache. Actions which are tagged with `no-remote-cache` can still hit the disk
+          // cache.
+          allowDiskCache = Spawns.mayBeCached(spawn);
+        } else {
+          // Otherwise, it's treated as a remote cache and disabled for `no-remote-cache`.
+          allowDiskCache = allowRemoteCache;
+        }
       }
     } else {
-      return shouldUploadLocalResultsToDiskCache(remoteOptions, spawn);
+      // Disk cache only
+      if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
+        allowDiskCache = Spawns.mayBeCached(spawn);
+      } else {
+        allowDiskCache = remoteOptions.remoteUploadLocalResults && Spawns.mayBeCached(spawn);
+      }
     }
+
+    return CachePolicy.create(allowRemoteCache, allowDiskCache);
   }
 
   /** Returns {@code true} if the spawn may be executed remotely. */
@@ -443,7 +470,8 @@
         TracingMetadataUtils.buildMetadata(
             buildRequestId, commandId, actionKey.getDigest().getHash(), spawn.getResourceOwner());
     RemoteActionExecutionContext remoteActionExecutionContext =
-        RemoteActionExecutionContext.create(spawn, metadata);
+        RemoteActionExecutionContext.create(
+            spawn, metadata, getWriteCachePolicy(spawn), getReadCachePolicy(spawn));
 
     return new RemoteAction(
         spawn,
@@ -579,9 +607,9 @@
   @Nullable
   public RemoteActionResult lookupCache(RemoteAction action)
       throws IOException, InterruptedException {
-    checkState(shouldAcceptCachedResult(action.getSpawn()), "spawn doesn't accept cached result");
-
-    action.getRemoteActionExecutionContext().setStep(Step.CHECK_ACTION_CACHE);
+    checkState(
+        action.getRemoteActionExecutionContext().getReadCachePolicy().allowAnyCache(),
+        "spawn doesn't accept cached result");
 
     CachedActionResult cachedActionResult =
         remoteCache.downloadActionResult(
@@ -601,18 +629,21 @@
         .getRelative(actualPath.getBaseName() + ".tmp");
   }
 
-  private ListenableFuture<FileMetadata> downloadFile(RemoteAction action, FileMetadata file) {
+  private ListenableFuture<FileMetadata> downloadFile(
+      RemoteActionExecutionContext context,
+      ProgressStatusListener progressStatusListener,
+      FileMetadata file) {
     checkNotNull(remoteCache, "remoteCache can't be null");
 
     try {
       ListenableFuture<Void> future =
           remoteCache.downloadFile(
-              action.getRemoteActionExecutionContext(),
+              context,
               remotePathResolver.localPathToOutputPath(file.path()),
               toTmpDownloadPath(file.path()),
               file.digest(),
               new RemoteCache.DownloadProgressReporter(
-                  action.getSpawnExecutionContext()::report,
+                  progressStatusListener,
                   remotePathResolver.localPathToOutputPath(file.path()),
                   file.digest().getSizeBytes()));
       return transform(future, (d) -> file, directExecutor());
@@ -905,7 +936,8 @@
     return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build());
   }
 
-  ActionResultMetadata parseActionResultMetadata(RemoteAction action, RemoteActionResult result)
+  ActionResultMetadata parseActionResultMetadata(
+      RemoteActionExecutionContext context, RemoteActionResult result)
       throws IOException, InterruptedException {
     checkNotNull(remoteCache, "remoteCache can't be null");
 
@@ -915,8 +947,7 @@
       dirMetadataDownloads.put(
           remotePathResolver.outputPathToLocalPath(dir.getPath()),
           Futures.transformAsync(
-              remoteCache.downloadBlob(
-                  action.getRemoteActionExecutionContext(), dir.getTreeDigest()),
+              remoteCache.downloadBlob(context, dir.getTreeDigest()),
               (treeBytes) ->
                   immediateFuture(Tree.parseFrom(treeBytes, ExtensionRegistry.getEmptyRegistry())),
               directExecutor()));
@@ -974,11 +1005,16 @@
     checkState(!shutdown.get(), "shutdown");
     checkNotNull(remoteCache, "remoteCache can't be null");
 
-    action.getRemoteActionExecutionContext().setStep(Step.DOWNLOAD_OUTPUTS);
+    ProgressStatusListener progressStatusListener = action.getSpawnExecutionContext()::report;
+    RemoteActionExecutionContext context = action.getRemoteActionExecutionContext();
+    if (result.executeResponse != null) {
+      // Always read from remote cache for just remotely executed action.
+      context = context.withReadCachePolicy(context.getReadCachePolicy().addRemoteCache());
+    }
 
     ActionResultMetadata metadata;
     try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) {
-      metadata = parseActionResultMetadata(action, result);
+      metadata = parseActionResultMetadata(context, result);
     }
 
     if (result.success()) {
@@ -1009,11 +1045,10 @@
       // When downloading outputs from just remotely executed action, the action result comes from
       // Execution response which means, if disk cache is enabled, action result hasn't been
       // uploaded to it. Upload action result to disk cache here so next build could hit it.
-      if (useDiskCache(remoteOptions)
-          && action.getRemoteActionExecutionContext().getExecuteResponse() != null) {
+      if (useDiskCache(remoteOptions) && result.executeResponse != null) {
         getFromFuture(
             remoteCache.uploadActionResult(
-                action.getRemoteActionExecutionContext(),
+                context.withWriteCachePolicy(CachePolicy.DISK_CACHE_ONLY),
                 action.getActionKey(),
                 result.actionResult));
       }
@@ -1033,7 +1068,7 @@
     ImmutableList<ListenableFuture<FileMetadata>> forcedDownloads = ImmutableList.of();
 
     if (downloadOutputs) {
-      downloadsBuilder.addAll(buildFilesToDownload(metadata, action));
+      downloadsBuilder.addAll(buildFilesToDownload(context, progressStatusListener, metadata));
     } else {
       checkState(
           result.getExitCode() == 0,
@@ -1046,14 +1081,14 @@
       }
       if (shouldForceDownloads) {
         forcedDownloads =
-            buildFilesToDownloadWithPredicate(metadata, action, shouldForceDownloadPredicate);
+            buildFilesToDownloadWithPredicate(
+                context, progressStatusListener, metadata, shouldForceDownloadPredicate);
       }
     }
 
     FileOutErr tmpOutErr = outErr.childOutErr();
     List<ListenableFuture<Void>> outErrDownloads =
-        remoteCache.downloadOutErr(
-            action.getRemoteActionExecutionContext(), result.actionResult, tmpOutErr);
+        remoteCache.downloadOutErr(context, result.actionResult, tmpOutErr);
     for (ListenableFuture<Void> future : outErrDownloads) {
       downloadsBuilder.add(transform(future, (v) -> null, directExecutor()));
     }
@@ -1136,8 +1171,7 @@
       try (SilentCloseable c = Profiler.instance().profile("Remote.downloadInMemoryOutput")) {
         if (inMemoryOutput != null) {
           ListenableFuture<byte[]> inMemoryOutputDownload =
-              remoteCache.downloadBlob(
-                  action.getRemoteActionExecutionContext(), inMemoryOutputDigest);
+              remoteCache.downloadBlob(context, inMemoryOutputDigest);
           waitForBulkTransfer(
               ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
           byte[] data = getFromFuture(inMemoryOutputDownload);
@@ -1150,20 +1184,25 @@
   }
 
   private ImmutableList<ListenableFuture<FileMetadata>> buildFilesToDownload(
-      ActionResultMetadata metadata, RemoteAction action) {
+      RemoteActionExecutionContext context,
+      ProgressStatusListener progressStatusListener,
+      ActionResultMetadata metadata) {
     Predicate<String> alwaysTrue = unused -> true;
-    return buildFilesToDownloadWithPredicate(metadata, action, alwaysTrue);
+    return buildFilesToDownloadWithPredicate(context, progressStatusListener, metadata, alwaysTrue);
   }
 
   private ImmutableList<ListenableFuture<FileMetadata>> buildFilesToDownloadWithPredicate(
-      ActionResultMetadata metadata, RemoteAction action, Predicate<String> predicate) {
+      RemoteActionExecutionContext context,
+      ProgressStatusListener progressStatusListener,
+      ActionResultMetadata metadata,
+      Predicate<String> predicate) {
     HashSet<PathFragment> queuedFilePaths = new HashSet<>();
     ImmutableList.Builder<ListenableFuture<FileMetadata>> builder = new ImmutableList.Builder<>();
 
     for (FileMetadata file : metadata.files()) {
       PathFragment filePath = file.path().asFragment();
       if (queuedFilePaths.add(filePath) && predicate.test(file.path.toString())) {
-        builder.add(downloadFile(action, file));
+        builder.add(downloadFile(context, progressStatusListener, file));
       }
     }
 
@@ -1171,7 +1210,7 @@
       for (FileMetadata file : entry.getValue().files()) {
         PathFragment filePath = file.path().asFragment();
         if (queuedFilePaths.add(filePath) && predicate.test(file.path.toString())) {
-          builder.add(downloadFile(action, file));
+          builder.add(downloadFile(context, progressStatusListener, file));
         }
       }
     }
@@ -1235,13 +1274,13 @@
   public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
       throws InterruptedException, ExecException {
     checkState(!shutdown.get(), "shutdown");
-    checkState(shouldUploadLocalResults(action.getSpawn()), "spawn shouldn't upload local result");
+    checkState(
+        action.getRemoteActionExecutionContext().getWriteCachePolicy().allowAnyCache(),
+        "spawn shouldn't upload local result");
     checkState(
         SpawnResult.Status.SUCCESS.equals(spawnResult.status()) && spawnResult.exitCode() == 0,
         "shouldn't upload outputs of failed local action");
 
-    action.getRemoteActionExecutionContext().setStep(Step.UPLOAD_OUTPUTS);
-
     if (remoteOptions.remoteCacheAsync) {
       Single.using(
               remoteCache::retain,
@@ -1303,15 +1342,18 @@
     checkState(!shutdown.get(), "shutdown");
     checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");
 
-    action.getRemoteActionExecutionContext().setStep(Step.UPLOAD_INPUTS);
-
     RemoteExecutionCache remoteExecutionCache = (RemoteExecutionCache) remoteCache;
     // Upload the command and all the inputs into the remote cache.
     Map<Digest, Message> additionalInputs = Maps.newHashMapWithExpectedSize(2);
     additionalInputs.put(action.getActionKey().getDigest(), action.getAction());
     additionalInputs.put(action.getCommandHash(), action.getCommand());
     remoteExecutionCache.ensureInputsPresent(
-        action.getRemoteActionExecutionContext(), action.getMerkleTree(), additionalInputs, force);
+        action
+            .getRemoteActionExecutionContext()
+            .withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
+        action.getMerkleTree(),
+        additionalInputs,
+        force);
   }
 
   /**
@@ -1326,8 +1368,6 @@
     checkState(!shutdown.get(), "shutdown");
     checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");
 
-    action.getRemoteActionExecutionContext().setStep(Step.EXECUTE_REMOTELY);
-
     ExecuteRequest.Builder requestBuilder =
         ExecuteRequest.newBuilder()
             .setInstanceName(remoteOptions.remoteInstanceName)
@@ -1347,8 +1387,6 @@
     ExecuteResponse reply =
         remoteExecutor.executeRemotely(action.getRemoteActionExecutionContext(), request, observer);
 
-    action.getRemoteActionExecutionContext().setExecuteResponse(reply);
-
     return RemoteActionResult.createFromResponse(reply);
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
index 5fed432..2a60fbc 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java
@@ -567,8 +567,7 @@
                   remoteOptions.diskCache,
                   remoteOptions.remoteVerifyDownloads,
                   digestUtil,
-                  cacheClient,
-                  remoteOptions);
+                  cacheClient);
         } catch (IOException e) {
           handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
           return;
@@ -626,8 +625,7 @@
                   remoteOptions.diskCache,
                   remoteOptions.remoteVerifyDownloads,
                   digestUtil,
-                  cacheClient,
-                  remoteOptions);
+                  cacheClient);
         } catch (IOException e) {
           handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
           return;
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
index 3ce1981..9527b34 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java
@@ -82,8 +82,10 @@
   @Override
   public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
       throws InterruptedException, IOException, ExecException, ForbiddenActionInputException {
-    boolean shouldAcceptCachedResult = remoteExecutionService.shouldAcceptCachedResult(spawn);
-    boolean shouldUploadLocalResults = remoteExecutionService.shouldUploadLocalResults(spawn);
+    boolean shouldAcceptCachedResult =
+        remoteExecutionService.getReadCachePolicy(spawn).allowAnyCache();
+    boolean shouldUploadLocalResults =
+        remoteExecutionService.getWriteCachePolicy(spawn).allowAnyCache();
     if (!shouldAcceptCachedResult && !shouldUploadLocalResults) {
       return SpawnCache.NO_RESULT_NO_STORE;
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index 218ba97..c50dd30 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -183,10 +183,11 @@
         "Spawn can't be executed remotely. This is a bug.");
 
     Stopwatch totalTime = Stopwatch.createStarted();
-    boolean uploadLocalResults = remoteExecutionService.shouldUploadLocalResults(spawn);
-    boolean acceptCachedResult = remoteExecutionService.shouldAcceptCachedResult(spawn);
+    boolean acceptCachedResult = remoteExecutionService.getReadCachePolicy(spawn).allowAnyCache();
+    boolean uploadLocalResults = remoteExecutionService.getWriteCachePolicy(spawn).allowAnyCache();
 
     RemoteAction action = remoteExecutionService.buildRemoteAction(spawn, context);
+
     SpawnMetrics.Builder spawnMetrics =
         SpawnMetrics.Builder.forRemoteExec()
             .setInputBytes(action.getInputBytes())
diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteActionExecutionContext.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteActionExecutionContext.java
index 78cac39..46a58e5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteActionExecutionContext.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteActionExecutionContext.java
@@ -13,7 +13,6 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote.common;
 
-import build.bazel.remote.execution.v2.ExecuteResponse;
 import build.bazel.remote.execution.v2.RequestMetadata;
 import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
 import com.google.devtools.build.lib.actions.Spawn;
@@ -21,40 +20,82 @@
 
 /** A context that provide remote execution related information for executing an action remotely. */
 public class RemoteActionExecutionContext {
-  /** The current step of the context. */
-  public enum Step {
-    INIT,
-    CHECK_ACTION_CACHE,
-    UPLOAD_INPUTS,
-    EXECUTE_REMOTELY,
-    UPLOAD_OUTPUTS,
-    DOWNLOAD_OUTPUTS,
-    UPLOAD_BES_FILES,
+  /** Determines whether to read/write remote cache, disk cache or both. */
+  public enum CachePolicy {
+    NO_CACHE,
+    REMOTE_CACHE_ONLY,
+    DISK_CACHE_ONLY,
+    ANY_CACHE;
+
+    public static CachePolicy create(boolean allowRemoteCache, boolean allowDiskCache) {
+      if (allowRemoteCache && allowDiskCache) {
+        return ANY_CACHE;
+      } else if (allowRemoteCache) {
+        return REMOTE_CACHE_ONLY;
+      } else if (allowDiskCache) {
+        return DISK_CACHE_ONLY;
+      } else {
+        return NO_CACHE;
+      }
+    }
+
+    public boolean allowAnyCache() {
+      return this != NO_CACHE;
+    }
+
+    public boolean allowRemoteCache() {
+      return this == REMOTE_CACHE_ONLY || this == ANY_CACHE;
+    }
+
+    public boolean allowDiskCache() {
+      return this == DISK_CACHE_ONLY || this == ANY_CACHE;
+    }
+
+    public CachePolicy addRemoteCache() {
+      if (this == DISK_CACHE_ONLY || this == ANY_CACHE) {
+        return ANY_CACHE;
+      }
+
+      return REMOTE_CACHE_ONLY;
+    }
   }
 
   @Nullable private final Spawn spawn;
   private final RequestMetadata requestMetadata;
   private final NetworkTime networkTime;
-
-  @Nullable private ExecuteResponse executeResponse;
-  private Step step;
+  private final CachePolicy writeCachePolicy;
+  private final CachePolicy readCachePolicy;
 
   private RemoteActionExecutionContext(
       @Nullable Spawn spawn, RequestMetadata requestMetadata, NetworkTime networkTime) {
     this.spawn = spawn;
     this.requestMetadata = requestMetadata;
     this.networkTime = networkTime;
-    this.step = Step.INIT;
+    this.writeCachePolicy = CachePolicy.ANY_CACHE;
+    this.readCachePolicy = CachePolicy.ANY_CACHE;
   }
 
-  /** Returns current {@link Step} of the context. */
-  public Step getStep() {
-    return step;
+  private RemoteActionExecutionContext(
+      @Nullable Spawn spawn,
+      RequestMetadata requestMetadata,
+      NetworkTime networkTime,
+      CachePolicy writeCachePolicy,
+      CachePolicy readCachePolicy) {
+    this.spawn = spawn;
+    this.requestMetadata = requestMetadata;
+    this.networkTime = networkTime;
+    this.writeCachePolicy = writeCachePolicy;
+    this.readCachePolicy = readCachePolicy;
   }
 
-  /** Sets current {@link Step} of the context. */
-  public void setStep(Step step) {
-    this.step = step;
+  public RemoteActionExecutionContext withWriteCachePolicy(CachePolicy writeCachePolicy) {
+    return new RemoteActionExecutionContext(
+        spawn, requestMetadata, networkTime, writeCachePolicy, readCachePolicy);
+  }
+
+  public RemoteActionExecutionContext withReadCachePolicy(CachePolicy readCachePolicy) {
+    return new RemoteActionExecutionContext(
+        spawn, requestMetadata, networkTime, writeCachePolicy, readCachePolicy);
   }
 
   /** Returns the {@link Spawn} of the action being executed or {@code null}. */
@@ -86,13 +127,12 @@
     return spawn.getResourceOwner();
   }
 
-  public void setExecuteResponse(@Nullable ExecuteResponse executeResponse) {
-    this.executeResponse = executeResponse;
+  public CachePolicy getWriteCachePolicy() {
+    return writeCachePolicy;
   }
 
-  @Nullable
-  public ExecuteResponse getExecuteResponse() {
-    return executeResponse;
+  public CachePolicy getReadCachePolicy() {
+    return readCachePolicy;
   }
 
   /** Creates a {@link RemoteActionExecutionContext} with given {@link RequestMetadata}. */
@@ -108,4 +148,13 @@
       @Nullable Spawn spawn, RequestMetadata metadata) {
     return new RemoteActionExecutionContext(spawn, metadata, new NetworkTime());
   }
+
+  public static RemoteActionExecutionContext create(
+      @Nullable Spawn spawn,
+      RequestMetadata requestMetadata,
+      CachePolicy writeCachePolicy,
+      CachePolicy readCachePolicy) {
+    return new RemoteActionExecutionContext(
+        spawn, requestMetadata, new NetworkTime(), writeCachePolicy, readCachePolicy);
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
index c337a5e..1fc6200 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java
@@ -13,9 +13,9 @@
 // limitations under the License.
 package com.google.devtools.build.lib.remote.disk;
 
+import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
+import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldAcceptCachedResultFromRemoteCache;
-import static com.google.devtools.build.lib.remote.util.Utils.shouldUploadLocalResultsToRemoteCache;
 
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.Digest;
@@ -25,9 +25,7 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
 import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
-import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext.Step;
 import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
-import com.google.devtools.build.lib.remote.options.RemoteOptions;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -43,24 +41,22 @@
 
   private final RemoteCacheClient remoteCache;
   private final DiskCacheClient diskCache;
-  private final RemoteOptions options;
 
-  public DiskAndRemoteCacheClient(
-      DiskCacheClient diskCache, RemoteCacheClient remoteCache, RemoteOptions options) {
+  public DiskAndRemoteCacheClient(DiskCacheClient diskCache, RemoteCacheClient remoteCache) {
     this.diskCache = Preconditions.checkNotNull(diskCache);
     this.remoteCache = Preconditions.checkNotNull(remoteCache);
-    this.options = options;
   }
 
   @Override
   public ListenableFuture<Void> uploadActionResult(
       RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
-    ListenableFuture<Void> future = diskCache.uploadActionResult(context, actionKey, actionResult);
-    // Only upload action result to remote cache if we are uploading local outputs. This method
-    // could be called when we are downloading outputs from remote executor if disk cache is enabled
-    // because we want to upload the action result to it.
-    if (context.getStep() == Step.UPLOAD_OUTPUTS
-        && shouldUploadLocalResultsToRemoteCache(options, context.getSpawn())) {
+    ListenableFuture<Void> future = Futures.immediateVoidFuture();
+
+    if (context.getWriteCachePolicy().allowDiskCache()) {
+      future = diskCache.uploadActionResult(context, actionKey, actionResult);
+    }
+
+    if (context.getWriteCachePolicy().allowRemoteCache()) {
       future =
           Futures.transformAsync(
               future,
@@ -79,20 +75,13 @@
   @Override
   public ListenableFuture<Void> uploadFile(
       RemoteActionExecutionContext context, Digest digest, Path file) {
-    RemoteActionExecutionContext.Step step = context.getStep();
+    ListenableFuture<Void> future = Futures.immediateVoidFuture();
 
-    // For UPLOAD_INPUTS, only upload to remote cache.
-    if (step == Step.UPLOAD_INPUTS) {
-      return remoteCache.uploadFile(context, digest, file);
+    if (context.getWriteCachePolicy().allowDiskCache()) {
+      future = diskCache.uploadFile(context, digest, file);
     }
 
-    // For UPLOAD_BES_FILES, only upload to remote cache.
-    if (step == Step.UPLOAD_BES_FILES) {
-      return remoteCache.uploadFile(context, digest, file);
-    }
-
-    ListenableFuture<Void> future = diskCache.uploadFile(context, digest, file);
-    if (shouldUploadLocalResultsToRemoteCache(options, context.getSpawn())) {
+    if (context.getWriteCachePolicy().allowRemoteCache()) {
       future =
           Futures.transformAsync(
               future, v -> remoteCache.uploadFile(context, digest, file), directExecutor());
@@ -103,20 +92,13 @@
   @Override
   public ListenableFuture<Void> uploadBlob(
       RemoteActionExecutionContext context, Digest digest, ByteString data) {
-    RemoteActionExecutionContext.Step step = context.getStep();
+    ListenableFuture<Void> future = Futures.immediateVoidFuture();
 
-    // For UPLOAD_INPUTS, only upload to remote cache.
-    if (step == Step.UPLOAD_INPUTS) {
-      return remoteCache.uploadBlob(context, digest, data);
+    if (context.getWriteCachePolicy().allowDiskCache()) {
+      future = diskCache.uploadBlob(context, digest, data);
     }
 
-    // For BES upload, only upload to the remote cache.
-    if (step == Step.UPLOAD_BES_FILES) {
-      return remoteCache.uploadBlob(context, digest, data);
-    }
-
-    ListenableFuture<Void> future = diskCache.uploadBlob(context, digest, data);
-    if (shouldUploadLocalResultsToRemoteCache(options, context.getSpawn())) {
+    if (context.getWriteCachePolicy().allowRemoteCache()) {
       future =
           Futures.transformAsync(
               future, v -> remoteCache.uploadBlob(context, digest, data), directExecutor());
@@ -127,38 +109,27 @@
   @Override
   public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
       RemoteActionExecutionContext context, Iterable<Digest> digests) {
-    RemoteActionExecutionContext.Step step = context.getStep();
-
-    // For UPLOAD_INPUTS, find missing digests should only look at
-    // the remote cache, not the disk cache because the remote executor only
-    // has access to the remote cache, not the disk cache.
-    // Also, the DiskCache always returns all digests as missing
-    // and we don't want to transfer all the files all the time.
-    if (step == Step.UPLOAD_INPUTS) {
-      return remoteCache.findMissingDigests(context, digests);
+    ListenableFuture<ImmutableSet<Digest>> diskQuery = immediateFuture(ImmutableSet.of());
+    if (context.getWriteCachePolicy().allowDiskCache()) {
+      diskQuery = diskCache.findMissingDigests(context, digests);
     }
 
-    // For UPLOAD_BES_FILES, we only check the remote cache.
-    if (step == Step.UPLOAD_BES_FILES) {
-      return remoteCache.findMissingDigests(context, digests);
+    ListenableFuture<ImmutableSet<Digest>> remoteQuery = immediateFuture(ImmutableSet.of());
+    if (context.getWriteCachePolicy().allowRemoteCache()) {
+      remoteQuery = remoteCache.findMissingDigests(context, digests);
     }
 
-    ListenableFuture<ImmutableSet<Digest>> diskQuery =
-        diskCache.findMissingDigests(context, digests);
-    if (shouldUploadLocalResultsToRemoteCache(options, context.getSpawn())) {
-      ListenableFuture<ImmutableSet<Digest>> remoteQuery =
-          remoteCache.findMissingDigests(context, digests);
-      return Futures.whenAllSucceed(remoteQuery, diskQuery)
-          .call(
-              () ->
-                  ImmutableSet.<Digest>builder()
-                      .addAll(remoteQuery.get())
-                      .addAll(diskQuery.get())
-                      .build(),
-              directExecutor());
-    } else {
-      return diskQuery;
-    }
+    ListenableFuture<ImmutableSet<Digest>> diskQueryFinal = diskQuery;
+    ListenableFuture<ImmutableSet<Digest>> remoteQueryFinal = remoteQuery;
+
+    return Futures.whenAllSucceed(remoteQueryFinal, diskQueryFinal)
+        .call(
+            () ->
+                ImmutableSet.<Digest>builder()
+                    .addAll(remoteQueryFinal.get())
+                    .addAll(diskQueryFinal.get())
+                    .build(),
+            directExecutor());
   }
 
   private Path newTempPath() {
@@ -176,7 +147,7 @@
           } catch (IOException e) {
             rootCause.addSuppressed(e);
           }
-          return Futures.immediateFailedFuture(rootCause);
+          return immediateFailedFuture(rootCause);
         },
         directExecutor());
   }
@@ -184,7 +155,7 @@
   @Override
   public ListenableFuture<Void> downloadBlob(
       RemoteActionExecutionContext context, Digest digest, OutputStream out) {
-    if (diskCache.contains(digest)) {
+    if (context.getReadCachePolicy().allowDiskCache() && diskCache.contains(digest)) {
       return diskCache.downloadBlob(context, digest, out);
     }
 
@@ -192,9 +163,7 @@
     final OutputStream tempOut;
     tempOut = new LazyFileOutputStream(tempPath);
 
-    // Always download outputs for just remotely executed action.
-    if (context.getExecuteResponse() != null
-        || shouldAcceptCachedResultFromRemoteCache(options, context.getSpawn())) {
+    if (context.getReadCachePolicy().allowRemoteCache()) {
       ListenableFuture<Void> download =
           closeStreamOnError(remoteCache.downloadBlob(context, digest, tempOut), tempOut);
       return Futures.transformAsync(
@@ -204,29 +173,30 @@
               tempOut.close();
               diskCache.captureFile(tempPath, digest, /* isActionCache= */ false);
             } catch (IOException e) {
-              return Futures.immediateFailedFuture(e);
+              return immediateFailedFuture(e);
             }
             return diskCache.downloadBlob(context, digest, out);
           },
           directExecutor());
     } else {
-      return Futures.immediateFuture(null);
+      return immediateFuture(null);
     }
   }
 
   @Override
   public ListenableFuture<CachedActionResult> downloadActionResult(
       RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
-    if (diskCache.containsActionResult(actionKey)) {
+    if (context.getReadCachePolicy().allowDiskCache()
+        && diskCache.containsActionResult(actionKey)) {
       return diskCache.downloadActionResult(context, actionKey, inlineOutErr);
     }
 
-    if (shouldAcceptCachedResultFromRemoteCache(options, context.getSpawn())) {
+    if (context.getReadCachePolicy().allowRemoteCache()) {
       return Futures.transformAsync(
           remoteCache.downloadActionResult(context, actionKey, inlineOutErr),
           (cachedActionResult) -> {
             if (cachedActionResult == null) {
-              return Futures.immediateFuture(null);
+              return immediateFuture(null);
             } else {
               return Futures.transform(
                   diskCache.uploadActionResult(
@@ -237,7 +207,7 @@
           },
           directExecutor());
     } else {
-      return Futures.immediateFuture(null);
+      return immediateFuture(null);
     }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
index c775040..2705f2c 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
@@ -25,7 +25,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.AsyncCallable;
 import com.google.common.util.concurrent.FluentFuture;
@@ -583,33 +582,6 @@
     return String.format("%s %s", BYTE_COUNT_FORMAT.format(value / 1024.0), UNITS.get(unitIndex));
   }
 
-  public static boolean shouldAcceptCachedResultFromRemoteCache(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    return remoteOptions.remoteAcceptCached && (spawn == null || Spawns.mayBeCachedRemotely(spawn));
-  }
-
-  public static boolean shouldAcceptCachedResultFromDiskCache(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
-      return spawn == null || Spawns.mayBeCached(spawn);
-    } else {
-      return remoteOptions.remoteAcceptCached && (spawn == null || Spawns.mayBeCached(spawn));
-    }
-  }
-
-  public static boolean shouldAcceptCachedResultFromCombinedCache(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
-      // --incompatible_remote_results_ignore_disk is set. Disk cache is treated as local cache.
-      // Actions which are tagged with `no-remote-cache` can still hit the disk cache.
-      return spawn == null || Spawns.mayBeCached(spawn);
-    } else {
-      // Disk cache is treated as a remote cache and disabled for `no-remote-cache`.
-      return remoteOptions.remoteAcceptCached
-          && (spawn == null || Spawns.mayBeCachedRemotely(spawn));
-    }
-  }
-
   public static boolean shouldUploadLocalResultsToRemoteCache(
       RemoteOptions remoteOptions, @Nullable Map<String, String> executionInfo) {
     return remoteOptions.remoteUploadLocalResults
@@ -617,53 +589,4 @@
             || (Spawns.mayBeCachedRemotely(executionInfo)
                 && !executionInfo.containsKey(ExecutionRequirements.NO_REMOTE_CACHE_UPLOAD)));
   }
-
-  public static boolean shouldUploadLocalResultsToRemoteCache(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    ImmutableMap<String, String> executionInfo = null;
-    if (spawn != null) {
-      executionInfo = spawn.getExecutionInfo();
-    }
-    return shouldUploadLocalResultsToRemoteCache(remoteOptions, executionInfo);
-  }
-
-  public static boolean shouldUploadLocalResultsToDiskCache(
-      RemoteOptions remoteOptions, @Nullable Map<String, String> executionInfo) {
-    if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
-      return executionInfo == null || Spawns.mayBeCached(executionInfo);
-    } else {
-      return remoteOptions.remoteUploadLocalResults
-          && (executionInfo == null || Spawns.mayBeCached(executionInfo));
-    }
-  }
-
-  public static boolean shouldUploadLocalResultsToDiskCache(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    ImmutableMap<String, String> executionInfo = null;
-    if (spawn != null) {
-      executionInfo = spawn.getExecutionInfo();
-    }
-    return shouldUploadLocalResultsToDiskCache(remoteOptions, executionInfo);
-  }
-
-  public static boolean shouldUploadLocalResultsToCombinedDisk(
-      RemoteOptions remoteOptions, @Nullable Map<String, String> executionInfo) {
-    if (remoteOptions.incompatibleRemoteResultsIgnoreDisk) {
-      // If --incompatible_remote_results_ignore_disk is set, we treat the disk cache part as local
-      // cache. Actions which are tagged with `no-remote-cache` can still hit the disk cache.
-      return shouldUploadLocalResultsToDiskCache(remoteOptions, executionInfo);
-    } else {
-      // Otherwise, it's treated as a remote cache and disabled for `no-remote-cache`.
-      return shouldUploadLocalResultsToRemoteCache(remoteOptions, executionInfo);
-    }
-  }
-
-  public static boolean shouldUploadLocalResultsToCombinedDisk(
-      RemoteOptions remoteOptions, @Nullable Spawn spawn) {
-    ImmutableMap<String, String> executionInfo = null;
-    if (spawn != null) {
-      executionInfo = spawn.getExecutionInfo();
-    }
-    return shouldUploadLocalResultsToCombinedDisk(remoteOptions, executionInfo);
-  }
 }