Add Skyframe support for external computations

Add a SkyFunction.Environment.dependOnFuture() method to register a
dependency on the completion of a ListenableFuture. Once all such
futures are completed, the current node is re-enqueued.

We combine all futures that are registered in the same pass, and treat
them as a single logical dependency. To that end, we add another counter
to NodeEntry that does just that, but otherwise use the same signalDep
method.

This is in preparation for supporting async action execution, i.e.,
allowing actions to run in parallel without blocking a Skyframe thread.
We have reason to believe that this will be a significant performance
win for large builds: if there are more actions that could be run than
available jobs, but cranking up jobs is prohibitively costly.

In addition, async action execution also improves Bazel's command-line
output; since we no longer block Skyframe threads on action execution,
these threads are free to continue exploring the action graph, which
means we'll see the number of outstanding actions more quickly go to the
expected maximum for that build (excluding flaky test retries and
exclusive tests, which are not modeled in the action graph).

Progress on #6394.

PiperOrigin-RevId: 234450510
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index d2cef5f..6891c56 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -16,6 +16,8 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification;
 import java.util.Map;
@@ -446,6 +448,12 @@
     return remainingTasks.get();
   }
 
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    remainingTasks.incrementAndGet();
+    future.addListener(this::decrementRemainingTasks, MoreExecutors.directExecutor());
+  }
+
   /**
    * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
    * will be rejected if this is true.
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 3424bb4..d833ac4 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.concurrent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -52,6 +53,9 @@
    */
   void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
 
+  /** Prevent quiescence of the executor until the given future is completed. */
+  void dependOnFuture(ListenableFuture<?> future);
+
   /** Get latch that is released if a task throws an exception. Used only in tests. */
   @VisibleForTesting
   CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/ProgressEventSuppressingEnvironment.java b/src/main/java/com/google/devtools/build/lib/skyframe/ProgressEventSuppressingEnvironment.java
index c14d091..6983f69 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/ProgressEventSuppressingEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/ProgressEventSuppressingEnvironment.java
@@ -14,6 +14,7 @@
 package com.google.devtools.build.lib.skyframe;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.util.GroupedList;
 import com.google.devtools.build.skyframe.SkyFunction;
@@ -217,4 +218,8 @@
     return delegate.inErrorBubblingForTesting();
   }
 
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    delegate.dependOnFuture(future);
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/SkyFunctionEnvironmentForTesting.java b/src/main/java/com/google/devtools/build/lib/skyframe/SkyFunctionEnvironmentForTesting.java
index 03b22ee..70ccb5a 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/SkyFunctionEnvironmentForTesting.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/SkyFunctionEnvironmentForTesting.java
@@ -17,6 +17,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.skyframe.AbstractSkyFunctionEnvironment;
 import com.google.devtools.build.skyframe.EvaluationResult;
@@ -64,4 +65,9 @@
   public boolean inErrorBubblingForTesting() {
     return false;
   }
+
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/StateInformingSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/lib/skyframe/StateInformingSkyFunctionEnvironment.java
index f4bbf7d..e3ffb33 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/StateInformingSkyFunctionEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/StateInformingSkyFunctionEnvironment.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.skyframe;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.util.GroupedList;
 import com.google.devtools.build.skyframe.SkyFunction;
@@ -269,4 +270,9 @@
   interface Informee {
     void inform() throws InterruptedException;
   }
+
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    delegate.dependOnFuture(future);
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
index 7115951..8efa4e9 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
@@ -20,6 +20,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.clock.BlazeClock;
 import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
 import com.google.devtools.build.lib.events.Event;
@@ -620,10 +621,11 @@
         // direct deps. uniqueNewDeps will be the set of unique keys contained in newDirectDeps.
         Set<SkyKey> uniqueNewDeps = state.addTemporaryDirectDeps(newDirectDeps);
 
+        List<ListenableFuture<?>> externalDeps = env.externalDeps;
         // If there were no newly requested dependencies, at least one of them was in error or there
         // is a bug in the SkyFunction implementation. The environment has collected its errors, so
         // we just order it to be built.
-        if (uniqueNewDeps.isEmpty()) {
+        if (uniqueNewDeps.isEmpty() && externalDeps == null) {
           // TODO(bazel-team): This means a bug in the SkyFunction. What to do?
           Preconditions.checkState(
               !env.getChildErrorInfos().isEmpty(),
@@ -643,6 +645,13 @@
           return;
         }
 
+        // If there are external deps, we register that fact on the NodeEntry before we enqueue
+        // child nodes in order to prevent the current node from being re-enqueued between here and
+        // the call to registerExternalDeps below.
+        if (externalDeps != null) {
+          state.addExternalDep();
+        }
+
         // We want to split apart the dependencies that existed for this node the last time we did
         // an evaluation and those that were introduced in this evaluation. To be clear, the prefix
         // "newDeps" refers to newly discovered this time around after a SkyFunction#compute call
@@ -660,6 +669,10 @@
         handleKnownChildrenForDirtyNode(
             newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority);
 
+        // Due to multi-threading, this can potentially cause the current node to be re-enqueued if
+        // all 'new' children of this node are already done. Therefore, there should not be any
+        // code after this loop, as it would potentially race with the re-evaluation in another
+        // thread.
         for (Map.Entry<SkyKey, ? extends NodeEntry> e :
             newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) {
           SkyKey newDirectDep = e.getKey();
@@ -672,7 +685,15 @@
               /*depAlreadyExists=*/ false,
               childEvaluationPriority);
         }
-        // It is critical that there is no code below this point in the try block.
+        if (externalDeps != null) {
+          // This can cause the current node to be re-enqueued if all futures are already done.
+          // This is an exception to the rule above that there must not be code below the for
+          // loop. It is safe because we call state.addExternalDep above, which prevents
+          // re-enqueueing of the current node in the above loop if externalDeps != null.
+          evaluatorContext.getVisitor().registerExternalDeps(skyKey, state, externalDeps);
+        }
+        // Do not put any code here! Any code here can race with a re-evaluation of this same node
+        // in another thread.
       } catch (InterruptedException ie) {
         // InterruptedException cannot be thrown by Runnable.run, so we must wrap it.
         // Interrupts can be caught by both the Evaluator and the AbstractQueueVisitor.
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/skyframe/AbstractSkyFunctionEnvironment.java
index 813dce7..4ec831a 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractSkyFunctionEnvironment.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractSkyFunctionEnvironment.java
@@ -16,8 +16,11 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.util.GroupedList;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 
@@ -35,6 +38,7 @@
   // #getValueOrUntypedExceptions.
   protected boolean errorMightHaveBeenFound = false;
   @Nullable private final GroupedList<SkyKey> temporaryDirectDeps;
+  @Nullable protected List<ListenableFuture<?>> externalDeps;
 
   public AbstractSkyFunctionEnvironment(@Nullable GroupedList<SkyKey> temporaryDirectDeps) {
     this.temporaryDirectDeps = temporaryDirectDeps;
@@ -301,6 +305,18 @@
 
   @Override
   public boolean valuesMissing() {
-    return valuesMissing;
+    return valuesMissing || (externalDeps != null);
+  }
+
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    if (future.isDone()) {
+      // No need to track a dependency on something that's already done.
+      return;
+    }
+    if (externalDeps == null) {
+      externalDeps = new ArrayList<>();
+    }
+    externalDeps.add(future);
   }
 }
diff --git a/src/main/java/com/google/devtools/build/skyframe/DelegatingNodeEntry.java b/src/main/java/com/google/devtools/build/skyframe/DelegatingNodeEntry.java
index 823c127..4eb3996 100644
--- a/src/main/java/com/google/devtools/build/skyframe/DelegatingNodeEntry.java
+++ b/src/main/java/com/google/devtools/build/skyframe/DelegatingNodeEntry.java
@@ -216,4 +216,9 @@
   public void addTemporaryDirectDepsGroupToDirtyEntry(List<SkyKey> group) {
     getDelegate().addTemporaryDirectDepsGroupToDirtyEntry(group);
   }
+
+  @Override
+  public void addExternalDep() {
+    getDelegate().addExternalDep();
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/skyframe/DirtyBuildingState.java b/src/main/java/com/google/devtools/build/skyframe/DirtyBuildingState.java
index 977f31f..d1e8fb6 100644
--- a/src/main/java/com/google/devtools/build/skyframe/DirtyBuildingState.java
+++ b/src/main/java/com/google/devtools/build/skyframe/DirtyBuildingState.java
@@ -74,6 +74,20 @@
   private int signaledDeps = NOT_EVALUATING_SENTINEL;
 
   /**
+   * The number of external dependencies (in contrast to the number of internal dependencies which
+   * are tracked in NodeEntry. We never keep information about external dependencies across Skyframe
+   * calls.
+   */
+  // We do not strictly require a counter here; all external deps from one SkyFunction evaluation
+  // pass are registered as a single logical dependency, and the SkyFunction is only re-evaluated if
+  // all of them complete. Therefore, we only need a single bit to track this fact. If the mere
+  // existence of this field turns out to be a significant memory burden, we could change the
+  // implementation by moving to a single-bit approach, and then store that bit as part of the
+  // dirtyState field, e.g., by adding a REBUILDING_WAITING_FOR_EXTERNAL_DEPS enum value, as this
+  // can only happen during evaluation.
+  private int externalDeps;
+
+  /**
    * The dependencies requested (with group markers) last time the node was built (and below, the
    * value last time the node was built). They will be compared to dependencies requested on this
    * build to check whether this node has changed in {@link NodeEntry#setValue}. If they are null,
@@ -133,7 +147,7 @@
   }
 
   final void forceRebuild(int numTemporaryDirectDeps) {
-    Preconditions.checkState(numTemporaryDirectDeps == signaledDeps, this);
+    Preconditions.checkState(numTemporaryDirectDeps + externalDeps == signaledDeps, this);
     Preconditions.checkState(
         (dirtyState == DirtyState.CHECK_DEPENDENCIES
                 && getNumOfGroupsInLastBuildDirectDeps() == dirtyDirectDepIndex)
@@ -167,6 +181,11 @@
     signaledDeps++;
   }
 
+  final void addExternalDep() {
+    Preconditions.checkState(isEvaluating());
+    externalDeps++;
+  }
+
   /**
    * If this node is not yet known to need rebuilding, sets {@link #dirtyState} to {@link
    * DirtyState#NEEDS_REBUILDING} if the child has changed, and {@link DirtyState#VERIFIED_CLEAN} if
@@ -293,14 +312,20 @@
 
   /** Returns whether all known children of this node have signaled that they are done. */
   boolean isReady(int numDirectDeps) {
-    Preconditions.checkState(signaledDeps <= numDirectDeps, "%s %s", numDirectDeps, this);
-    return signaledDeps == numDirectDeps;
+    Preconditions.checkState(
+        signaledDeps <= numDirectDeps + externalDeps,
+        "%s %s %s",
+        numDirectDeps,
+        externalDeps,
+        this);
+    return signaledDeps == numDirectDeps + externalDeps;
   }
 
   protected MoreObjects.ToStringHelper getStringHelper() {
     return MoreObjects.toStringHelper(this)
         .add("dirtyState", dirtyState)
         .add("signaledDeps", signaledDeps)
+        .add("externalDeps", externalDeps)
         .add("dirtyDirectDepIndex", dirtyDirectDepIndex);
   }
 
diff --git a/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java b/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
index 058a3dc..233127d 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InMemoryNodeEntry.java
@@ -244,6 +244,12 @@
     dirtyBuildingState = null;
   }
 
+  @Override
+  public synchronized void addExternalDep() {
+    Preconditions.checkNotNull(dirtyBuildingState, this);
+    dirtyBuildingState.addExternalDep();
+  }
+
   protected final synchronized Set<SkyKey> setStateFinishedAndReturnReverseDepsToSignal() {
     Set<SkyKey> reverseDepsToSignal =
         ReverseDepsUtility.consolidateDataAndReturnNewElements(this, getOpToStoreBare());
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntry.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntry.java
index 61bbc24..5b60c36 100644
--- a/src/main/java/com/google/devtools/build/skyframe/NodeEntry.java
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntry.java
@@ -428,6 +428,10 @@
   @ThreadSafe
   void addTemporaryDirectDepsGroupToDirtyEntry(List<SkyKey> group);
 
+  default void addExternalDep() {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Returns true if the node is ready to be evaluated, i.e., it has been signaled exactly as many
    * times as it has temporary dependencies. This may only be called while the node is being
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
index db4f32f..ad640f7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -15,10 +15,14 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.concurrent.ErrorClassifier;
 import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
 import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,6 +101,31 @@
   }
 
   /**
+   * Registers a listener with all passed futures that causes the node to be re-enqueued when all
+   * futures are completed.
+   */
+  void registerExternalDeps(
+      SkyKey skyKey, NodeEntry entry, List<ListenableFuture<?>> externalDeps) {
+    // Generally speaking, there is no ordering guarantee for listeners registered with a single
+    // listenable future. If we used a listener here, there would be a potential race condition
+    // between re-enqueuing the key and notifying the quiescing executor, in which case the executor
+    // could shut down even though the work is not done yet. That would be bad.
+    //
+    // However, the whenAllComplete + run API guarantees that the Runnable is run before the
+    // returned future completes, i.e., before the quiescing executor is notified.
+    ListenableFuture<?> future =
+        Futures.whenAllComplete(externalDeps)
+            .run(
+                () -> {
+                  if (entry.signalDep(entry.getVersion(), null)) {
+                    enqueueEvaluation(skyKey, Integer.MAX_VALUE);
+                  }
+                },
+                MoreExecutors.directExecutor());
+    quiescingExecutor.dependOnFuture(future);
+  }
+
+  /**
    * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
    * request a halt.
    *
diff --git a/src/main/java/com/google/devtools/build/skyframe/RecordingSkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/skyframe/RecordingSkyFunctionEnvironment.java
index 7937032..01de512 100644
--- a/src/main/java/com/google/devtools/build/skyframe/RecordingSkyFunctionEnvironment.java
+++ b/src/main/java/com/google/devtools/build/skyframe/RecordingSkyFunctionEnvironment.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.skyframe;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.util.GroupedList;
 import com.google.devtools.build.skyframe.SkyFunction.Environment;
@@ -283,4 +284,9 @@
   public void injectVersionForNonHermeticFunction(Version version) {
     delegate.injectVersionForNonHermeticFunction(version);
   }
+
+  @Override
+  public void dependOnFuture(ListenableFuture<?> future) {
+    delegate.dependOnFuture(future);
+  }
 }
diff --git a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java
index 9346c2a..3d86e36 100644
--- a/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java
+++ b/src/main/java/com/google/devtools/build/skyframe/SkyFunction.java
@@ -15,6 +15,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
 import com.google.devtools.build.lib.events.ExtendedEventHandler;
 import com.google.devtools.build.lib.util.GroupedList;
@@ -352,5 +353,21 @@
     /** Returns whether we are currently in error bubbling. */
     @VisibleForTesting
     boolean inErrorBubblingForTesting();
+
+    /**
+     * Adds a dependency on a Skyframe-external event. If the given future is already complete, this
+     * method silently returns without doing anything (to avoid unnecessary function restarts).
+     * Otherwise, Skyframe adds a listener to the passed-in future, and only re-enqueues the current
+     * node after the future completes and all requested deps are done. The added listener will
+     * perform the minimum amount of work on the thread completing the future necessary for Skyframe
+     * bookkeeping.
+     *
+     * <p>Callers of this method must check {@link #valuesMissing} before returning {@code null}
+     * from a {@link SkyFunction}.
+     *
+     * <p>This API is intended for performing async computations (e.g., remote execution) in another
+     * thread pool without blocking the current Skyframe thread.
+     */
+    void dependOnFuture(ListenableFuture<?> future);
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
index 79ab720..7df53e5 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
@@ -19,6 +19,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification;
 import com.google.devtools.build.lib.testutil.TestThread;
@@ -53,6 +54,31 @@
   }
 
   @Test
+  public void externalDep() throws Exception {
+    SettableFuture<Object> future = SettableFuture.create();
+    AbstractQueueVisitor counter =
+        new AbstractQueueVisitor(
+            /*parallelism=*/ 2,
+            /* keepAliveTime= */ 3L,
+            TimeUnit.SECONDS,
+            /* failFastOnException= */ true,
+            "FOO-BAR",
+            ErrorClassifier.DEFAULT);
+    counter.dependOnFuture(future);
+    new Thread(
+            () -> {
+              try {
+                Thread.sleep(5);
+                future.set(new Object());
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            })
+        .start();
+    counter.awaitQuiescence(/*interruptWorkers=*/ false);
+  }
+
+  @Test
   public void callerOwnedPool() throws Exception {
     ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                                          new LinkedBlockingQueue<Runnable>());
diff --git a/src/test/java/com/google/devtools/build/skyframe/InMemoryNodeEntryTest.java b/src/test/java/com/google/devtools/build/skyframe/InMemoryNodeEntryTest.java
index a344ab2..a648dba 100644
--- a/src/test/java/com/google/devtools/build/skyframe/InMemoryNodeEntryTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/InMemoryNodeEntryTest.java
@@ -97,6 +97,22 @@
   }
 
   @Test
+  public void signalExternalDep() throws InterruptedException {
+    NodeEntry entry = new InMemoryNodeEntry();
+    entry.addReverseDepAndCheckIfDone(null); // Start evaluation.
+    entry.markRebuilding();
+    entry.addExternalDep();
+    assertThat(entry.isReady()).isFalse();
+    assertThat(entry.signalDep(ZERO_VERSION, null)).isTrue();
+    assertThat(entry.isReady()).isTrue();
+    entry.addExternalDep();
+    assertThat(entry.isReady()).isFalse();
+    assertThat(entry.signalDep(ZERO_VERSION, null)).isTrue();
+    assertThat(entry.isReady()).isTrue();
+    assertThatNodeEntry(entry).hasTemporaryDirectDepsThat().containsExactly();
+  }
+
+  @Test
   public void reverseDeps() throws InterruptedException {
     NodeEntry entry = new InMemoryNodeEntry();
     SkyKey mother = key("mother");
diff --git a/src/test/java/com/google/devtools/build/skyframe/NotifyingHelper.java b/src/test/java/com/google/devtools/build/skyframe/NotifyingHelper.java
index 302e469..8f1729c 100644
--- a/src/test/java/com/google/devtools/build/skyframe/NotifyingHelper.java
+++ b/src/test/java/com/google/devtools/build/skyframe/NotifyingHelper.java
@@ -148,6 +148,7 @@
   public enum EventType {
     CREATE_IF_ABSENT,
     ADD_REVERSE_DEP,
+    ADD_EXTERNAL_DEP,
     REMOVE_REVERSE_DEP,
     GET_TEMPORARY_DIRECT_DEPS,
     SIGNAL,
@@ -233,6 +234,12 @@
     }
 
     @Override
+    public void addExternalDep() {
+      super.addExternalDep();
+      graphListener.accept(myKey, EventType.ADD_EXTERNAL_DEP, Order.AFTER, null);
+    }
+
+    @Override
     public void removeReverseDep(SkyKey reverseDep) throws InterruptedException {
       graphListener.accept(myKey, EventType.REMOVE_REVERSE_DEP, Order.BEFORE, reverseDep);
       super.removeReverseDep(reverseDep);
diff --git a/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java b/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
index 9b464e9..0a19689 100644
--- a/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
@@ -30,6 +30,10 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
 import com.google.devtools.build.lib.concurrent.BlazeInterners;
@@ -52,6 +56,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -157,6 +163,272 @@
     assertThat(storedEventHandler.getPosts()).isEmpty();
   }
 
+  @Test
+  public void externalDep() throws Exception {
+    externalDep(1, 0);
+    externalDep(2, 0);
+    externalDep(1, 1);
+    externalDep(1, 2);
+    externalDep(2, 1);
+    externalDep(2, 2);
+  }
+
+  private void externalDep(int firstPassCount, int secondPassCount) throws Exception {
+    final SkyKey parentKey = GraphTester.toSkyKey("parentKey");
+    final CountDownLatch firstPassLatch = new CountDownLatch(1);
+    final CountDownLatch secondPassLatch = new CountDownLatch(1);
+    tester
+        .getOrCreate(parentKey)
+        .setBuilder(
+            new SkyFunction() {
+              // Skyframe doesn't have native support for continuations, so we use fields here. A
+              // simple continuation API in Skyframe could be Environment providing a
+              // setContinuation(SkyContinuation) method, where SkyContinuation provides a compute
+              // method similar to SkyFunction. When restarting the node, Skyframe would then call
+              // the continuation rather than the original SkyFunction. If we do that, we should
+              // consider only allowing calls to dependOnFuture in combination with setContinuation.
+              private List<SettableFuture<SkyValue>> firstPass;
+              private List<SettableFuture<SkyValue>> secondPass;
+
+              @Override
+              public SkyValue compute(SkyKey skyKey, Environment env) throws InterruptedException {
+                if (firstPass == null) {
+                  firstPass = new ArrayList<>();
+                  for (int i = 0; i < firstPassCount; i++) {
+                    SettableFuture<SkyValue> future = SettableFuture.create();
+                    firstPass.add(future);
+                    env.dependOnFuture(future);
+                  }
+                  assertThat(env.valuesMissing()).isTrue();
+                  Thread helper =
+                      new Thread(
+                          () -> {
+                            try {
+                              firstPassLatch.await();
+                              // Thread.sleep(5);
+                              for (int i = 0; i < firstPassCount; i++) {
+                                firstPass.get(i).set(new StringValue("value1"));
+                              }
+                            } catch (InterruptedException e) {
+                              throw new RuntimeException(e);
+                            }
+                          });
+                  helper.start();
+                  return null;
+                } else if (secondPass == null && secondPassCount > 0) {
+                  for (int i = 0; i < firstPassCount; i++) {
+                    assertThat(firstPass.get(i).isDone()).isTrue();
+                  }
+                  secondPass = new ArrayList<>();
+                  for (int i = 0; i < secondPassCount; i++) {
+                    SettableFuture<SkyValue> future = SettableFuture.create();
+                    secondPass.add(future);
+                    env.dependOnFuture(future);
+                  }
+                  assertThat(env.valuesMissing()).isTrue();
+                  Thread helper =
+                      new Thread(
+                          () -> {
+                            try {
+                              secondPassLatch.await();
+                              for (int i = 0; i < secondPassCount; i++) {
+                                secondPass.get(i).set(new StringValue("value2"));
+                              }
+                            } catch (InterruptedException e) {
+                              throw new RuntimeException(e);
+                            }
+                          });
+                  helper.start();
+                  return null;
+                }
+                for (int i = 0; i < secondPassCount; i++) {
+                  assertThat(secondPass.get(i).isDone()).isTrue();
+                }
+                return new StringValue("done!");
+              }
+
+              @Override
+              public String extractTag(SkyKey skyKey) {
+                return null;
+              }
+            });
+    graph =
+        NotifyingHelper.makeNotifyingTransformer(
+                new Listener() {
+                  private boolean firstPassDone;
+
+                  @Override
+                  public void accept(SkyKey key, EventType type, Order order, Object context) {
+                    // NodeEntry.addExternalDep is called as part of bookkeeping at the end of
+                    // AbstractParallelEvaluator.Evaluate#run.
+                    if (key == parentKey && type == EventType.ADD_EXTERNAL_DEP) {
+                      if (!firstPassDone) {
+                        firstPassLatch.countDown();
+                        firstPassDone = true;
+                      } else {
+                        secondPassLatch.countDown();
+                      }
+                    }
+                  }
+                })
+            .transform(new InMemoryGraphImpl());
+    EvaluationResult<StringValue> result = eval(/*keepGoing=*/ false, ImmutableList.of(parentKey));
+    assertThat(result.hasError()).isFalse();
+    assertThat(result.get(parentKey)).isEqualTo(new StringValue("done!"));
+  }
+
+  @Test
+  public void enqueueDoneFuture() throws Exception {
+    final SkyKey parentKey = GraphTester.toSkyKey("parentKey");
+    tester
+        .getOrCreate(parentKey)
+        .setBuilder(
+            new SkyFunction() {
+              @Override
+              public SkyValue compute(SkyKey skyKey, Environment env) throws InterruptedException {
+                SettableFuture<SkyValue> future = SettableFuture.create();
+                future.set(new StringValue("good"));
+                env.dependOnFuture(future);
+                assertThat(env.valuesMissing()).isFalse();
+                try {
+                  return future.get();
+                } catch (ExecutionException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+
+              @Override
+              public String extractTag(SkyKey skyKey) {
+                return null;
+              }
+            });
+    graph = new InMemoryGraphImpl();
+    EvaluationResult<StringValue> result = eval(/*keepGoing=*/ false, ImmutableList.of(parentKey));
+    assertThat(result.hasError()).isFalse();
+    assertThat(result.get(parentKey)).isEqualTo(new StringValue("good"));
+  }
+
+  @Test
+  public void enqueueBadFuture() throws Exception {
+    final SkyKey parentKey = GraphTester.toSkyKey("parentKey");
+    final CountDownLatch doneLatch = new CountDownLatch(1);
+    final ListeningExecutorService executor =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+    tester
+        .getOrCreate(parentKey)
+        .setBuilder(
+            new SkyFunction() {
+              private ListenableFuture<SkyValue> future;
+
+              @Override
+              public SkyValue compute(SkyKey skyKey, Environment env) throws InterruptedException {
+                if (future == null) {
+                  future =
+                      executor.submit(
+                          () -> {
+                            doneLatch.await();
+                            throw new UnsupportedOperationException();
+                          });
+                  env.dependOnFuture(future);
+                  assertThat(env.valuesMissing()).isTrue();
+                  return null;
+                }
+                assertThat(future.isDone()).isTrue();
+                try {
+                  future.get();
+                  fail();
+                } catch (ExecutionException expected) {
+                  assertThat(expected.getCause()).isInstanceOf(UnsupportedOperationException.class);
+                }
+                return new StringValue("Caught!");
+              }
+
+              @Override
+              public String extractTag(SkyKey skyKey) {
+                return null;
+              }
+            });
+    graph =
+        NotifyingHelper.makeNotifyingTransformer(
+                new Listener() {
+                  @Override
+                  public void accept(SkyKey key, EventType type, Order order, Object context) {
+                    // NodeEntry.addExternalDep is called as part of bookkeeping at the end of
+                    // AbstractParallelEvaluator.Evaluate#run.
+                    if (key == parentKey && type == EventType.ADD_EXTERNAL_DEP) {
+                      doneLatch.countDown();
+                    }
+                  }
+                })
+            .transform(new InMemoryGraphImpl());
+    EvaluationResult<StringValue> result = eval(/*keepGoing=*/ false, ImmutableList.of(parentKey));
+    assertThat(result.hasError()).isFalse();
+    assertThat(result.get(parentKey)).isEqualTo(new StringValue("Caught!"));
+  }
+
+  @Test
+  public void dependsOnKeyAndFuture() throws Exception {
+    final SkyKey parentKey = GraphTester.toSkyKey("parentKey");
+    final SkyKey childKey = GraphTester.toSkyKey("childKey");
+    final CountDownLatch doneLatch = new CountDownLatch(1);
+    tester.getOrCreate(childKey).setConstantValue(new StringValue("child"));
+    tester
+        .getOrCreate(parentKey)
+        .setBuilder(
+            new SkyFunction() {
+              private SettableFuture<SkyValue> future;
+
+              @Override
+              public SkyValue compute(SkyKey skyKey, Environment env) throws InterruptedException {
+                SkyValue child = env.getValue(childKey);
+                if (future == null) {
+                  assertThat(child).isNull();
+                  future = SettableFuture.create();
+                  env.dependOnFuture(future);
+                  assertThat(env.valuesMissing()).isTrue();
+                  new Thread(
+                          () -> {
+                            try {
+                              doneLatch.await();
+                            } catch (InterruptedException e) {
+                              throw new RuntimeException(e);
+                            }
+                            future.set(new StringValue("future"));
+                          })
+                      .start();
+                  return null;
+                }
+                assertThat(child).isEqualTo(new StringValue("child"));
+                assertThat(future.isDone()).isTrue();
+                try {
+                  assertThat(future.get()).isEqualTo(new StringValue("future"));
+                } catch (ExecutionException e) {
+                  throw new RuntimeException(e);
+                }
+                return new StringValue("All done!");
+              }
+
+              @Override
+              public String extractTag(SkyKey skyKey) {
+                return null;
+              }
+            });
+    graph =
+        NotifyingHelper.makeNotifyingTransformer(
+                new Listener() {
+                  @Override
+                  public void accept(SkyKey key, EventType type, Order order, Object context) {
+                    if (key == childKey && type == EventType.SET_VALUE) {
+                      doneLatch.countDown();
+                    }
+                  }
+                })
+            .transform(new InMemoryGraphImpl());
+    EvaluationResult<StringValue> result = eval(/*keepGoing=*/ false, ImmutableList.of(parentKey));
+    assertThat(result.hasError()).isFalse();
+    assertThat(result.get(parentKey)).isEqualTo(new StringValue("All done!"));
+  }
+
   /**
    * Test interruption handling when a long-running SkyFunction gets interrupted.
    */