diff --git a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
index ef04d6c..ed54ed9 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/BazelRepositoryModule.java
@@ -22,7 +22,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.devtools.build.lib.analysis.BlazeDirectories;
 import com.google.devtools.build.lib.analysis.ConfiguredRuleClassProvider;
 import com.google.devtools.build.lib.analysis.RuleDefinition;
@@ -64,6 +63,7 @@
 import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.CheckDirectDepsMode;
 import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.LockfileMode;
 import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.RepositoryOverride;
+import com.google.devtools.build.lib.bazel.repository.RepositoryOptions.WorkerForRepoFetching;
 import com.google.devtools.build.lib.bazel.repository.cache.RepositoryCache;
 import com.google.devtools.build.lib.bazel.repository.downloader.DelegatingDownloader;
 import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
@@ -121,9 +121,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -163,9 +160,6 @@
   private List<String> allowedYankedVersions = ImmutableList.of();
   private boolean disableNativeRepoRules;
   private SingleExtensionEvalFunction singleExtensionEvalFunction;
-  private final ExecutorService repoFetchingWorkerThreadPool =
-      Executors.newFixedThreadPool(
-          100, new ThreadFactoryBuilder().setNameFormat("repo-fetching-worker-%d").build());
 
   @Nullable private CredentialModule credentialModule;
 
@@ -316,37 +310,8 @@
 
     RepositoryOptions repoOptions = env.getOptions().getOptions(RepositoryOptions.class);
     if (repoOptions != null) {
-      switch (repoOptions.workerForRepoFetching) {
-        case OFF:
-          starlarkRepositoryFunction.setWorkerExecutorService(null);
-          break;
-        case PLATFORM:
-          starlarkRepositoryFunction.setWorkerExecutorService(repoFetchingWorkerThreadPool);
-          break;
-        case VIRTUAL:
-        case AUTO:
-          try {
-            // Since Google hasn't migrated to JDK 21 yet, we can't directly call
-            // Executors.newVirtualThreadPerTaskExecutor here. But a bit of reflection never hurt
-            // anyone... right? (OSS Bazel already ships with a bundled JDK 21)
-            starlarkRepositoryFunction.setWorkerExecutorService(
-                (ExecutorService)
-                    Executors.class
-                        .getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class)
-                        .invoke(
-                            null, Thread.ofVirtual().name("starlark-repository-", 0).factory()));
-          } catch (ReflectiveOperationException e) {
-            if (repoOptions.workerForRepoFetching == RepositoryOptions.WorkerForRepoFetching.AUTO) {
-              starlarkRepositoryFunction.setWorkerExecutorService(null);
-            } else {
-              throw new AbruptExitException(
-                  detailedExitCode(
-                      "couldn't create virtual worker thread executor for repo fetching",
-                      Code.BAD_DOWNLOADER_CONFIG),
-                  e);
-            }
-          }
-      }
+      starlarkRepositoryFunction.setUseWorkers(
+          repoOptions.workerForRepoFetching != WorkerForRepoFetching.OFF);
       downloadManager.setDisableDownload(repoOptions.disableDownload);
       if (repoOptions.repositoryDownloaderRetries >= 0) {
         downloadManager.setRetries(repoOptions.repositoryDownloaderRetries);
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java
index 7bc1692..63a527d 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/RepositoryOptions.java
@@ -246,10 +246,8 @@
       effectTags = {OptionEffectTag.UNKNOWN},
       help =
           "The threading mode to use for repo fetching. If set to 'off', no worker thread is used,"
-              + " and the repo fetching is subject to restarts. Otherwise, uses a platform thread"
-              + " (i.e. OS thread) if set to 'platform' or a virtual thread if set to 'virtual'. If"
-              + " set to 'auto', virtual threads are used if available (i.e. running on JDK 21+),"
-              + " otherwise no worker thread is used.")
+              + " and the repo fetching is subject to restarts. Otherwise, uses a virtual worker"
+              + " thread.")
   public WorkerForRepoFetching workerForRepoFetching;
 
   @Option(
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java
index 19ce5cb..cef52f3 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/RepoFetchingSkyKeyComputeState.java
@@ -14,15 +14,22 @@
 
 package com.google.devtools.build.lib.bazel.repository.starlark;
 
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
 import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
 import com.google.devtools.build.skyframe.SkyFunction;
 import com.google.devtools.build.skyframe.SkyFunction.Environment.SkyKeyComputeState;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import javax.annotation.Nullable;
 
 /**
@@ -30,36 +37,30 @@
  * com.google.devtools.build.lib.rules.repository.RepositoryDelegatorFunction}, specifically {@link
  * StarlarkRepositoryFunction}.
  *
- * <p>This class is used to hold on to a worker thread (in reality just a {@link Future} object)
- * when fetching repos using a worker thread is enabled. The worker thread uses a {@link
- * SkyFunction.Environment} object acquired from the host thread, and can signal the host thread to
- * restart to get a fresh environment object.
+ * <p>This class is used to hold on to a worker thread when fetching repos using a worker thread is
+ * enabled. The worker thread uses a {@link SkyFunction.Environment} object acquired from the host
+ * thread, and can signal the host thread to restart to get a fresh environment object.
  */
 class RepoFetchingSkyKeyComputeState implements SkyKeyComputeState {
 
-  /** A signal that the worker thread can send to the host Skyframe thread. */
-  enum Signal {
-    /**
-     * Indicates that the host thread should return {@code null}, causing a Skyframe restart. After
-     * sending this signal, the client will immediately block on {@code delegateEnvQueue}, waiting
-     * for the host thread to send a fresh {@link SkyFunction.Environment} over.
-     */
-    RESTART,
-    /**
-     * Indicates that the worker thread has finished running, either yielding a result or an
-     * exception.
-     */
-    DONE
-  }
-
-  /** The channel for the worker thread to send a signal to the host Skyframe thread. */
-  final BlockingQueue<Signal> signalQueue = new SynchronousQueue<>();
+  /**
+   * A semaphore with 0 or 1 permit. The worker can release a permit either when it's finished
+   * (successfully or otherwise), or to indicate that the host thread should return {@code null},
+   * causing a Skyframe restart. In the latter case, the worker will immediately block on {@code
+   * delegateEnvQueue}, waiting for the host thread to send a fresh {@link SkyFunction.Environment}
+   * over.
+   */
+  // A Semaphore is useful here because, crucially, releasing a permit never blocks and thus cannot
+  // be interrupted.
+  final Semaphore signalSemaphore = new Semaphore(0);
 
   /**
    * The channel for the host Skyframe thread to send fresh {@link SkyFunction.Environment} objects
    * back to the worker thread.
    */
-  final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new SynchronousQueue<>();
+  // We use an ArrayBlockingQueue of size 1 instead of a SynchronousQueue, so that if the worker
+  // gets interrupted before the host thread restarts, the host thread doesn't hang forever.
+  final BlockingQueue<SkyFunction.Environment> delegateEnvQueue = new ArrayBlockingQueue<>(1);
 
   /**
    * This future holds on to the worker thread in order to cancel it when necessary; it also serves
@@ -69,7 +70,14 @@
   // could happen on multiple threads. Canceling a future multiple times is safe, though, so we
   // only need to worry about nullness. Using a mutex/synchronization is an alternative but it means
   // we might block in `close()`, which is potentially bad (see its javadoc).
-  @Nullable volatile Future<RepositoryDirectoryValue.Builder> workerFuture = null;
+  @Nullable volatile ListenableFuture<RepositoryDirectoryValue.Builder> workerFuture = null;
+
+  /** The executor service that manages the worker thread. */
+  // We hold on to this alongside `workerFuture` because it offers a convenient mechanism to make
+  // sure the worker thread has shut down (with its blocking `close()` method).
+  ListeningExecutorService workerExecutorService;
+
+  private final String repoName;
 
   /**
    * This is where the recorded inputs & values for the whole invocation is collected.
@@ -79,11 +87,48 @@
    */
   final Map<RepoRecordedInput, String> recordedInputValues = new TreeMap<>();
 
+  RepoFetchingSkyKeyComputeState(String repoName) {
+    this.repoName = repoName;
+    reset();
+  }
+
+  // This may only be called from the host Skyframe thread, *and* only when no worker thread is
+  // running.
+  private void reset() {
+    workerExecutorService =
+        MoreExecutors.listeningDecorator(
+            Executors.newThreadPerTaskExecutor(
+                Thread.ofVirtual().name("starlark-repository-" + repoName).factory()));
+    signalSemaphore.drainPermits();
+    delegateEnvQueue.clear();
+    recordedInputValues.clear();
+  }
+
+  /**
+   * Releases a permit on the {@code signalSemaphore} and immediately expect a fresh Environment
+   * back. This may only be called from the worker thread.
+   */
   SkyFunction.Environment signalForFreshEnv() throws InterruptedException {
-    signalQueue.put(Signal.RESTART);
+    signalSemaphore.release();
     return delegateEnvQueue.take();
   }
 
+  /**
+   * Starts a worker thread running the given callable. This sets the {@code workerFuture} field,
+   * and makes sure to release a permit on the {@code signalSemaphore} when the worker finishes,
+   * successfully or otherwise. Returns the worker future. This may only be called from the host
+   * Skyframe thread.
+   */
+  ListenableFuture<RepositoryDirectoryValue.Builder> startWorker(
+      Callable<RepositoryDirectoryValue.Builder> c) {
+    var workerFuture = workerExecutorService.submit(c);
+    this.workerFuture = workerFuture;
+    workerFuture.addListener(signalSemaphore::release, directExecutor());
+    return workerFuture;
+  }
+
+  // This may be called from any thread, including the host Skyframe thread and the
+  // high-memory-pressure listener thread.
   @Override
   public void close() {
     var myWorkerFuture = workerFuture;
@@ -91,5 +136,22 @@
     if (myWorkerFuture != null) {
       myWorkerFuture.cancel(true);
     }
+    workerExecutorService.shutdownNow();
+  }
+
+  /**
+   * Closes the state object, and blocks until all pending async work is finished. The state object
+   * will reset to a clean slate after this method finishes. This may only be called from the host
+   * Skyframe thread.
+   */
+  public void closeAndWaitForTermination() throws InterruptedException {
+    close();
+    workerExecutorService.close(); // This blocks
+    // We reset the state object back to its very initial state, since the host SkyFunction may be
+    // re-entered (for example b/330892334 and  https://github.com/bazelbuild/bazel/issues/21238).
+    reset();
+    if (Thread.interrupted()) {
+      throw new InterruptedException();
+    }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
index 12e5e38..63545a9 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/repository/starlark/StarlarkRepositoryFunction.java
@@ -21,12 +21,10 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Table;
-import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.devtools.build.lib.analysis.BlazeDirectories;
 import com.google.devtools.build.lib.analysis.RuleDefinition;
 import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent;
 import com.google.devtools.build.lib.bazel.repository.downloader.DownloadManager;
-import com.google.devtools.build.lib.bazel.repository.starlark.RepoFetchingSkyKeyComputeState.Signal;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.LabelConstants;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
@@ -59,7 +57,6 @@
 import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import net.starlark.java.eval.EvalException;
 import net.starlark.java.eval.Mutability;
@@ -73,7 +70,7 @@
 public final class StarlarkRepositoryFunction extends RepositoryFunction {
   private final DownloadManager downloadManager;
   private double timeoutScaling = 1.0;
-  @Nullable private ExecutorService workerExecutorService = null;
+  private boolean useWorkers;
   @Nullable private ProcessWrapper processWrapper = null;
   @Nullable private RepositoryRemoteExecutor repositoryRemoteExecutor;
   @Nullable private SyscallCache syscallCache;
@@ -94,15 +91,15 @@
     this.syscallCache = checkNotNull(syscallCache);
   }
 
-  public void setWorkerExecutorService(@Nullable ExecutorService workerExecutorService) {
-    this.workerExecutorService = workerExecutorService;
+  public void setUseWorkers(boolean useWorkers) {
+    this.useWorkers = useWorkers;
   }
 
   @Override
   protected void setupRepoRootBeforeFetching(Path repoRoot) throws RepositoryFunctionException {
     // DON'T delete the repo root here if we're using a worker thread, since when this SkyFunction
     // restarts, fetching is still happening inside the worker thread.
-    if (workerExecutorService == null) {
+    if (!useWorkers) {
       setupRepoRoot(repoRoot);
     }
   }
@@ -111,11 +108,23 @@
   public void reportSkyframeRestart(Environment env, RepositoryName repoName) {
     // DON'T report a "restarting." event if we're using a worker thread, since the actual fetch
     // function run by the worker thread never restarts.
-    if (workerExecutorService == null) {
+    if (!useWorkers) {
       super.reportSkyframeRestart(env, repoName);
     }
   }
 
+  private record FetchArgs(
+      Rule rule,
+      Path outputDirectory,
+      BlazeDirectories directories,
+      Environment env,
+      Map<RepoRecordedInput, String> recordedInputValues,
+      SkyKey key) {
+    FetchArgs toWorkerArgs(Environment env, Map<RepoRecordedInput, String> recordedInputValues) {
+      return new FetchArgs(rule, outputDirectory, directories, env, recordedInputValues, key);
+    }
+  }
+
   @Nullable
   @Override
   public RepositoryDirectoryValue.Builder fetch(
@@ -126,89 +135,84 @@
       Map<RepoRecordedInput, String> recordedInputValues,
       SkyKey key)
       throws RepositoryFunctionException, InterruptedException {
-    if (workerExecutorService == null
-        || env.inErrorBubblingForSkyFunctionsThatCanFullyRecoverFromErrors()) {
-      // Don't use the worker thread if we're in Skyframe error bubbling. For some reason, using a
-      // worker thread during error bubbling very frequently causes deadlocks on Linux platforms.
-      // The deadlock is rather elusive and this is just the immediate thing that seems to help.
-      // Fortunately, no Skyframe restarts should happen during error bubbling anyway, so this
-      // shouldn't be a performance concern. See https://github.com/bazelbuild/bazel/issues/21238
-      // for more context.
-      return fetchInternal(rule, outputDirectory, directories, env, recordedInputValues, key);
+    var args = new FetchArgs(rule, outputDirectory, directories, env, recordedInputValues, key);
+    if (!useWorkers) {
+      return fetchInternal(args);
     }
-    var state = env.getState(RepoFetchingSkyKeyComputeState::new);
-    var workerFuture = state.workerFuture;
-    if (workerFuture == null) {
-      // No worker is running yet, which means we're just starting to fetch this repo. Start with a
-      // clean slate, and create the worker.
-      setupRepoRoot(outputDirectory);
-      Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
-      workerFuture =
-          workerExecutorService.submit(
-              () -> {
-                try {
-                  return fetchInternal(
-                      rule,
-                      outputDirectory,
-                      directories,
-                      workerEnv,
-                      state.recordedInputValues,
-                      key);
-                } finally {
-                  state.signalQueue.put(Signal.DONE);
-                }
-              });
-      state.workerFuture = workerFuture;
-    } else {
-      // A worker is already running. This can only mean one thing -- we just had a Skyframe
-      // restart, and need to send over a fresh Environment.
-      state.delegateEnvQueue.put(env);
+    var state = env.getState(() -> new RepoFetchingSkyKeyComputeState(rule.getName()));
+    if (state.workerExecutorService.isShutdown()) {
+      // If we get here and the worker executor is shut down, this can only mean that the worker
+      // future was cancelled while we (the host Skyframe thread) were inactive (as in, having
+      // returned `null` but not yet restarted). So we wait for the previous worker thread to finish
+      // first.
+      // TODO: instead of this complicated dance, consider making it legal for
+      //  `SkyKeyComputeState#close()` to block. This would undo the advice added in commit 8ef0a51,
+      //  but would allow us to merge `close()` and `closeAndWaitForTermination()` and avoid some
+      //  headache.
+      state.closeAndWaitForTermination();
     }
-    Signal signal;
+    boolean shouldShutDownWorkerExecutorInFinally = true;
     try {
-      signal = state.signalQueue.take();
-    } catch (InterruptedException e) {
-      // This means that we caught a Ctrl-C. Make sure to close the state object to interrupt the
-      // worker thread, wait for it to finish, and then propagate the InterruptedException.
-      state.close();
-      signal = Uninterruptibles.takeUninterruptibly(state.signalQueue);
-      // The call to Uninterruptibles.takeUninterruptibly() above may set the thread interrupted
-      // status if it suppressed an InterruptedException, so we clear it again.
-      Thread.interrupted();
-      throw new InterruptedException();
-    }
-    switch (signal) {
-      case RESTART:
+      var workerFuture = state.workerFuture;
+      if (workerFuture == null) {
+        // No worker is running yet, which means we're just starting to fetch this repo. Start with
+        // a clean slate, and create the worker.
+        setupRepoRoot(outputDirectory);
+        Environment workerEnv = new RepoFetchingWorkerSkyFunctionEnvironment(state, env);
+        workerFuture =
+            state.startWorker(
+                () -> fetchInternal(args.toWorkerArgs(workerEnv, state.recordedInputValues)));
+      } else {
+        // A worker is already running. This can only mean one thing -- we just had a Skyframe
+        // restart, and need to send over a fresh Environment.
+        state.delegateEnvQueue.put(env);
+      }
+      state.signalSemaphore.acquire();
+      if (!workerFuture.isDone()) {
+        // This means that the worker is still running, and expecting a fresh Environment. Return
+        // null to trigger a Skyframe restart, but *don't* shut down the worker executor.
+        shouldShutDownWorkerExecutorInFinally = false;
         return null;
-      case DONE:
-        try {
-          RepositoryDirectoryValue.Builder result = workerFuture.get();
-          recordedInputValues.putAll(state.recordedInputValues);
-          return result;
-        } catch (ExecutionException e) {
-          Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
-          Throwables.throwIfUnchecked(e.getCause());
-          throw new IllegalStateException(
-              "unexpected exception type: " + e.getClass(), e.getCause());
-        } catch (CancellationException e) {
-          // This can only happen if the state object was invalidated due to memory pressure, in
-          // which case we can simply reattempt the fetch.
-          env.getListener()
-              .post(
-                  RepositoryFetchProgress.ongoing(
-                      RepositoryName.createUnvalidated(rule.getName()),
-                      "fetch interrupted due to memory pressure; restarting."));
-          return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
-        } finally {
-          // At this point, the worker thread has definitely finished. But in some corner cases (see
-          // b/330892334), a Skyframe restart might still happen; to ensure we're not tricked into
-          // a deadlock, we clean up the worker thread and so that next time we come into fetch(),
-          // we actually restart the entire computation.
-          state.close();
-        }
+      }
+      RepositoryDirectoryValue.Builder result = workerFuture.get();
+      recordedInputValues.putAll(state.recordedInputValues);
+      return result;
+    } catch (ExecutionException e) {
+      Throwables.throwIfInstanceOf(e.getCause(), RepositoryFunctionException.class);
+      Throwables.throwIfUnchecked(e.getCause());
+      throw new IllegalStateException("unexpected exception type: " + e.getClass(), e.getCause());
+    } catch (CancellationException e) {
+      // This can only happen if the state object was invalidated due to memory pressure, in
+      // which case we can simply reattempt the fetch.
+      env.getListener()
+          .post(
+              RepositoryFetchProgress.ongoing(
+                  RepositoryName.createUnvalidated(rule.getName()),
+                  "fetch interrupted due to memory pressure; restarting."));
+      return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
+    } finally {
+      if (shouldShutDownWorkerExecutorInFinally) {
+        // Unless we know the worker is waiting on a fresh Environment, we should *always* shut down
+        // the worker executor and reset the state by the time we finish executing (successfully or
+        // otherwise). This ensures that 1) no background work happens without our knowledge, and
+        // 2) if the SkyFunction is re-entered for any reason (for example b/330892334 and
+        // https://github.com/bazelbuild/bazel/issues/21238), we don't have lingering state messing
+        // things up.
+        state.closeAndWaitForTermination();
+      }
     }
-    // TODO(wyv): use a switch expression above instead and remove this.
-    throw new IllegalStateException();
+  }
+
+  @Nullable
+  private RepositoryDirectoryValue.Builder fetchInternal(FetchArgs args)
+      throws RepositoryFunctionException, InterruptedException {
+    return fetchInternal(
+        args.rule,
+        args.outputDirectory,
+        args.directories,
+        args.env,
+        args.recordedInputValues,
+        args.key);
   }
 
   @Nullable
