Make TargetPattern evaluation during query evaluation more parallel-friendly by introducing TargetPattern#parEval, which allows TargetPatterns' evaluations to explicitly have parallel implementations (no need to secretly use a FJP).

--
MOS_MIGRATED_REVID=139101922
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
index 13e5578..2ca2215 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
@@ -24,6 +24,7 @@
 import com.google.devtools.build.lib.util.BatchCallback;
 import com.google.devtools.build.lib.util.Preconditions;
 import com.google.devtools.build.lib.util.StringUtilities;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.PathFragment;
 
 import java.io.Serializable;
@@ -31,6 +32,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
 import java.util.regex.Pattern;
 
 import javax.annotation.concurrent.Immutable;
@@ -141,15 +143,6 @@
   }
 
   /**
-   * Evaluates the current target pattern and returns the result.
-   */
-  public <T, E extends Exception> void eval(
-      TargetPatternResolver<T> resolver, BatchCallback<T, E> callback, Class<E> exceptionClass)
-      throws TargetParsingException, E, InterruptedException {
-    eval(resolver, ImmutableSet.<PathFragment>of(), callback, exceptionClass);
-  }
-
-  /**
    * Evaluates the current target pattern, excluding targets under directories in
    * {@code excludedSubdirectories}, and returns the result.
    *
@@ -159,10 +152,25 @@
   public abstract <T, E extends Exception> void eval(
       TargetPatternResolver<T> resolver,
       ImmutableSet<PathFragment> excludedSubdirectories,
-      BatchCallback<T, E> callback, Class<E> exceptionClass)
+      BatchCallback<T, E> callback,
+      Class<E> exceptionClass)
       throws TargetParsingException, E, InterruptedException;
 
   /**
+   * Same as {@link #eval}, but optionally making use of the given {@link ForkJoinPool} to achieve
+   * parallelism.
+   */
+  public <T, E extends Exception> void parEval(
+      TargetPatternResolver<T> resolver,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      ThreadSafeBatchCallback<T, E> callback,
+      Class<E> exceptionClass,
+      ForkJoinPool forkJoinPool)
+      throws TargetParsingException, E, InterruptedException {
+    eval(resolver, excludedSubdirectories, callback, exceptionClass);
+  }
+
+  /**
    * Returns {@code true} iff this pattern has type {@code Type.TARGETS_BELOW_DIRECTORY} and
    * {@code directory} is contained by or equals this pattern's directory. For example,
    * returns {@code true} for {@code this = TargetPattern ("//...")} and {@code directory
@@ -469,7 +477,8 @@
     public <T, E extends Exception> void eval(
         TargetPatternResolver<T> resolver,
         ImmutableSet<PathFragment> excludedSubdirectories,
-        BatchCallback<T, E> callback, Class<E> exceptionClass)
+        BatchCallback<T, E> callback,
+        Class<E> exceptionClass)
         throws TargetParsingException, E, InterruptedException {
       resolver.findTargetsBeneathDirectory(
           directory.getRepository(),
@@ -482,6 +491,25 @@
     }
 
     @Override
+    public <T, E extends Exception> void parEval(
+        TargetPatternResolver<T> resolver,
+        ImmutableSet<PathFragment> excludedSubdirectories,
+        ThreadSafeBatchCallback<T, E> callback,
+        Class<E> exceptionClass,
+        ForkJoinPool forkJoinPool)
+        throws TargetParsingException, E, InterruptedException {
+      resolver.findTargetsBeneathDirectoryPar(
+          directory.getRepository(),
+          getOriginalPattern(),
+          directory.getPackageFragment().getPathString(),
+          rulesOnly,
+          excludedSubdirectories,
+          callback,
+          exceptionClass,
+          forkJoinPool);
+    }
+
+    @Override
     public boolean containsBelowDirectory(PackageIdentifier containedDirectory) {
       // Note that merely checking to see if the directory startsWith the TargetsBelowDirectory's
       // directory is insufficient. "food" begins with "foo", but "//foo/..." does not contain
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
index a3aea66..b6b384c 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
@@ -16,7 +16,9 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.devtools.build.lib.util.BatchCallback;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.PathFragment;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A callback interface that is used during the process of converting target patterns (such as
@@ -88,7 +90,23 @@
       String directory,
       boolean rulesOnly,
       ImmutableSet<PathFragment> excludedSubdirectories,
-      BatchCallback<T, E> callback, Class<E> exceptionClass)
+      BatchCallback<T, E> callback,
+      Class<E> exceptionClass)
+      throws TargetParsingException, E, InterruptedException;
+
+  /**
+   * Same as {@link #findTargetsBeneathDirectory}, but optionally making use of the given
+   * {@link ForkJoinPool} to achieve parallelism.
+   */
+  <E extends Exception> void findTargetsBeneathDirectoryPar(
+      RepositoryName repository,
+      String originalPattern,
+      String directory,
+      boolean rulesOnly,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      ThreadSafeBatchCallback<T, E> callback,
+      Class<E> exceptionClass,
+      ForkJoinPool forkJoinPool)
       throws TargetParsingException, E, InterruptedException;
 
   /**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
index cbff9d9..0a74cc0 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
@@ -46,6 +46,7 @@
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
 import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
 import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
 import com.google.devtools.build.lib.query2.engine.Uniquifier;
 import com.google.devtools.build.lib.query2.engine.VariableContext;
 import com.google.devtools.build.lib.util.Preconditions;
@@ -60,6 +61,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * The environment of a Blaze query. Not thread-safe.
@@ -182,6 +184,15 @@
   }
 
   @Override
+  public void getTargetsMatchingPatternPar(
+      QueryExpression caller,
+      String pattern,
+      ThreadSafeCallback<Target> callback,
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+    getTargetsMatchingPattern(caller, pattern, callback);
+  }
+
+  @Override
   public Target getTarget(Label label)
       throws TargetNotFoundException, QueryException, InterruptedException {
     // Can't use strictScope here because we are expecting a target back.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 8ca4cc5..9e33d50 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -224,7 +224,6 @@
             graphBackedRecursivePackageProvider,
             eventHandler,
             TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
-            forkJoinPool,
             packageSemaphore);
   }
 
@@ -580,6 +579,18 @@
     return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
   }
 
+  private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
+      throws TargetParsingException, InterruptedException {
+    TargetPatternKey targetPatternKey =
+        ((TargetPatternKey)
+            TargetPatternValue.key(
+                    pattern, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, parserPrefix)
+                .argument());
+    ImmutableSet<PathFragment> subdirectoriesToExclude =
+        targetPatternKey.getAllSubdirectoriesToExclude(blacklistPatternsSupplier);
+    return Pair.of(targetPatternKey.getParsedPattern(), subdirectoriesToExclude);
+  }
+
   @ThreadSafe
   @Override
   public void getTargetsMatchingPattern(
@@ -587,15 +598,33 @@
       throws QueryException, InterruptedException {
     // Directly evaluate the target pattern, making use of packages in the graph.
     try {
-      TargetPatternKey targetPatternKey =
-          ((TargetPatternKey)
-              TargetPatternValue.key(
-                      pattern, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, parserPrefix)
-                  .argument());
-      TargetPattern parsedPattern = targetPatternKey.getParsedPattern();
+      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
+          getPatternAndExcludes(pattern);
+      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
       ImmutableSet<PathFragment> subdirectoriesToExclude =
-          targetPatternKey.getAllSubdirectoriesToExclude(blacklistPatternsSupplier);
-      parsedPattern.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
+          patternToEvalAndSubdirectoriesToExclude.getSecond();
+      patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
+    } catch (TargetParsingException e) {
+      reportBuildFileError(owner, e.getMessage());
+    }
+  }
+
+  @Override
+  public void getTargetsMatchingPatternPar(
+      QueryExpression owner,
+      String pattern,
+      ThreadSafeCallback<Target> callback,
+      ForkJoinPool forkJoinPool)
+      throws QueryException, InterruptedException {
+    // Directly evaluate the target pattern, making use of packages in the graph.
+    try {
+      Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
+          getPatternAndExcludes(pattern);
+      TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
+      ImmutableSet<PathFragment> subdirectoriesToExclude =
+          patternToEvalAndSubdirectoriesToExclude.getSecond();
+      patternToEval.parEval(
+          resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool);
     } catch (TargetParsingException e) {
       reportBuildFileError(owner, e.getMessage());
     }
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index 5cf7c6e..9245a68 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -159,6 +159,16 @@
   void getTargetsMatchingPattern(QueryExpression owner, String pattern, Callback<T> callback)
       throws QueryException, InterruptedException;
 
+  /**
+   * Same as {@link #getTargetsMatchingPattern}, but optionally making use of the given
+   * {@link ForkJoinPool} to achieve parallelism.
+   */
+  void getTargetsMatchingPatternPar(
+      QueryExpression owner,
+      String pattern,
+      ThreadSafeCallback<T> callback,
+      ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
+
   /** Ensures the specified target exists. */
   // NOTE(bazel-team): this method is left here as scaffolding from a previous refactoring. It may
   // be possible to remove it.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
index 8b718ab..aeace9a 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
@@ -17,6 +17,7 @@
 
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 
 /**
  * A literal set of targets, using 'blaze build' syntax.  Or, a reference to a
@@ -44,23 +45,42 @@
     return LetExpression.isValidVarReference(pattern);
   }
 
+  private <T> void evalVarReference(VariableContext<T> context, Callback<T> callback)
+      throws QueryException, InterruptedException {
+    String varName = LetExpression.getNameFromReference(pattern);
+    Set<T> value = context.get(varName);
+    if (value == null) {
+      throw new QueryException(this, "undefined variable '" + varName + "'");
+    }
+    callback.process(value);
+  }
+
   @Override
   protected <T> void evalImpl(
       QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
           throws QueryException, InterruptedException {
     if (isVariableReference()) {
-      String varName = LetExpression.getNameFromReference(pattern);
-      Set<T> value = context.get(varName);
-      if (value == null) {
-        throw new QueryException(this, "undefined variable '" + varName + "'");
-      }
-      callback.process(value);
+      evalVarReference(context, callback);
     } else {
       env.getTargetsMatchingPattern(this, pattern, callback);
     }
   }
 
   @Override
+  protected <T> void parEvalImpl(
+      QueryEnvironment<T> env,
+      VariableContext<T> context,
+      ThreadSafeCallback<T> callback,
+      ForkJoinPool forkJoinPool)
+      throws QueryException, InterruptedException {
+    if (isVariableReference()) {
+      evalVarReference(context, callback);
+    } else {
+      env.getTargetsMatchingPatternPar(this, pattern, callback, forkJoinPool);
+    }
+  }
+
+  @Override
   public void collectTargetPatterns(Collection<String> literals) {
     if (!isVariableReference()) {
       literals.add(pattern);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
index d391728..950335e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
@@ -14,8 +14,10 @@
 package com.google.devtools.build.lib.query2.engine;
 
 import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 
 /** Marker interface for a {@link Callback} that is {@link ThreadSafe}. */
 @ThreadSafe
-public interface ThreadSafeCallback<T> extends Callback<T> {
+public interface ThreadSafeCallback<T>
+    extends Callback<T>, ThreadSafeBatchCallback<T, QueryException> {
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
index fc9cd81..df6351a 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
@@ -37,6 +37,7 @@
 import com.google.devtools.build.lib.util.BatchCallback;
 import com.google.devtools.build.lib.util.BatchCallback.NullCallback;
 import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import com.google.devtools.build.lib.vfs.RootedPath;
@@ -46,6 +47,7 @@
 import com.google.devtools.build.skyframe.SkyValue;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 
@@ -259,5 +261,26 @@
         }
       }
     }
+
+    @Override
+    public <E extends Exception> void findTargetsBeneathDirectoryPar(
+        RepositoryName repository,
+        String originalPattern,
+        String directory,
+        boolean rulesOnly,
+        ImmutableSet<PathFragment> excludedSubdirectories,
+        ThreadSafeBatchCallback<Void, E> callback,
+        Class<E> exceptionClass,
+        ForkJoinPool forkJoinPool)
+        throws TargetParsingException, E, InterruptedException {
+      findTargetsBeneathDirectory(
+          repository,
+          originalPattern,
+          directory,
+          rulesOnly,
+          excludedSubdirectories,
+          callback,
+          exceptionClass);
+    }
   }
 }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index 0071098..5492274 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.RepositoryName;
@@ -42,6 +43,8 @@
 import com.google.devtools.build.lib.pkgcache.RecursivePackageProvider;
 import com.google.devtools.build.lib.pkgcache.TargetPatternResolverUtil;
 import com.google.devtools.build.lib.util.BatchCallback;
+import com.google.devtools.build.lib.util.SynchronizedBatchCallback;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
 import com.google.devtools.build.lib.vfs.PathFragment;
 import java.util.ArrayList;
 import java.util.List;
@@ -49,6 +52,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -65,19 +69,16 @@
   private final RecursivePackageProvider recursivePackageProvider;
   private final EventHandler eventHandler;
   private final FilteringPolicy policy;
-  private final ExecutorService executor;
   private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
 
   public RecursivePackageProviderBackedTargetPatternResolver(
       RecursivePackageProvider recursivePackageProvider,
       EventHandler eventHandler,
       FilteringPolicy policy,
-      ExecutorService executor,
       MultisetSemaphore<PackageIdentifier> packageSemaphore) {
     this.recursivePackageProvider = recursivePackageProvider;
     this.eventHandler = eventHandler;
     this.policy = policy;
-    this.executor = executor;
     this.packageSemaphore = packageSemaphore;
   }
 
@@ -190,7 +191,51 @@
       String directory,
       boolean rulesOnly,
       ImmutableSet<PathFragment> excludedSubdirectories,
-      final BatchCallback<Target, E> callback, Class<E> exceptionClass)
+      BatchCallback<Target, E> callback,
+      Class<E> exceptionClass)
+      throws TargetParsingException, E, InterruptedException {
+    findTargetsBeneathDirectoryParImpl(
+        repository,
+        originalPattern,
+        directory,
+        rulesOnly,
+        excludedSubdirectories,
+        new SynchronizedBatchCallback<Target, E>(callback),
+        exceptionClass,
+        MoreExecutors.newDirectExecutorService());
+  }
+
+  @Override
+  public <E extends Exception> void findTargetsBeneathDirectoryPar(
+      final RepositoryName repository,
+      final String originalPattern,
+      String directory,
+      boolean rulesOnly,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      final ThreadSafeBatchCallback<Target, E> callback,
+      Class<E> exceptionClass,
+      ForkJoinPool forkJoinPool)
+      throws TargetParsingException, E, InterruptedException {
+    findTargetsBeneathDirectoryParImpl(
+        repository,
+        originalPattern,
+        directory,
+        rulesOnly,
+        excludedSubdirectories,
+        callback,
+        exceptionClass,
+        forkJoinPool);
+  }
+
+  private <E extends Exception> void findTargetsBeneathDirectoryParImpl(
+      final RepositoryName repository,
+      final String originalPattern,
+      String directory,
+      boolean rulesOnly,
+      ImmutableSet<PathFragment> excludedSubdirectories,
+      final ThreadSafeBatchCallback<Target, E> callback,
+      Class<E> exceptionClass,
+      ExecutorService executor)
       throws TargetParsingException, E, InterruptedException {
     final FilteringPolicy actualPolicy = rulesOnly
         ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy)
@@ -208,7 +253,6 @@
               }
             });
     final AtomicBoolean foundTarget = new AtomicBoolean(false);
-    final Object callbackLock = new Object();
 
     // For very large sets of packages, we may not want to process all of them at once, so we split
     // into batches.
@@ -236,9 +280,7 @@
                   }
                 }
               }
-              synchronized (callbackLock) {
-                callback.process(filteredTargets);
-              }
+              callback.process(filteredTargets);
             } finally {
               packageSemaphore.releaseAll(pkgIdBatchSet);
             }
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java
index ac75806..3ef3b11 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java
@@ -15,7 +15,6 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.devtools.build.lib.cmdline.Label;
 import com.google.devtools.build.lib.cmdline.PackageIdentifier;
 import com.google.devtools.build.lib.cmdline.ResolvedTargets;
@@ -58,7 +57,6 @@
               provider,
               env.getListener(),
               patternKey.getPolicy(),
-              MoreExecutors.newDirectExecutorService(),
               MultisetSemaphore.<PackageIdentifier>unbounded());
       TargetPattern parsedPattern = patternKey.getParsedPattern();
       ImmutableSet<PathFragment> excludedSubdirectories = patternKey.getExcludedSubdirectories();
diff --git a/src/main/java/com/google/devtools/build/lib/util/SynchronizedBatchCallback.java b/src/main/java/com/google/devtools/build/lib/util/SynchronizedBatchCallback.java
new file mode 100644
index 0000000..ec7bd17
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/SynchronizedBatchCallback.java
@@ -0,0 +1,33 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util;
+
+/**
+ * A {@link ThreadSafeBatchCallback} that trivially delegates to a {@link BatchCallback} in a
+ * synchronized manner.
+ */
+public class SynchronizedBatchCallback<T, E extends Exception>
+    implements ThreadSafeBatchCallback<T, E> {
+  private final BatchCallback<T, E> delegate;
+
+  public SynchronizedBatchCallback(BatchCallback<T, E> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public synchronized void process(Iterable<T> partialResult) throws E, InterruptedException {
+    delegate.process(partialResult);
+  }
+}
+
diff --git a/src/main/java/com/google/devtools/build/lib/util/ThreadSafeBatchCallback.java b/src/main/java/com/google/devtools/build/lib/util/ThreadSafeBatchCallback.java
new file mode 100644
index 0000000..2fed9cc
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/util/ThreadSafeBatchCallback.java
@@ -0,0 +1,21 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.util;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
+/** Marker interface for a {@link BatchCallback} that is {@link ThreadSafe}. */
+@ThreadSafe
+public interface ThreadSafeBatchCallback<T, E extends Exception> extends BatchCallback<T, E> {
+}
\ No newline at end of file