blob: 80785ca0cab498addb6be71d65cdbd57feb5c713 [file] [log] [blame]
// Copyright 2014 The Bazel Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
import java.util.Collection;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
* <p>Visit the transitive closure of a label. Primarily used to "fault in"
* packages to the packageProvider and ensure the necessary targets exists, in
* advance of the configuration step, which is intolerant of missing
* packages/targets.
* <p>LabelVisitor loads packages concurrently where possible, to increase I/O
* parallelism. However, the public interface is not thread-safe: calls to
* public methods should not be made concurrently.
* <p>LabelVisitor is stateful: It remembers the previous visitation and can
* check its validity on subsequent calls to sync() instead of doing the normal
* visitation.
* <p>TODO(bazel-team): (2009) a small further optimization could be achieved if we
* create tasks at the package (not individual label) level, since package
* loading is the expensive step. This would require additional bookkeeping to
* maintain the list of labels that we need to visit once a package becomes
* available. Profiling suggests that there is still a potential benefit to be
* gained: when the set of packages is known a-priori, loading a set of packages
* that took 20 seconds can be done under 5 in the sequential case or 7 in the
* current (parallel) case.
* <h4>Concurrency</h4>
* <p>The sync() methods of this class is thread-compatible. The accessor
* ({@link #hasVisited} and similar must not be called until the concurrent phase
* is over, i.e. all external calls to visit() methods have completed.
final class LabelVisitor {
* Attributes of a visitation which determine whether it is up-to-date or not.
private class VisitationAttributes {
private Collection<Target> targetsToVisit;
private boolean success = false;
private boolean visitSubincludes = true;
private int maxDepth = 0;
* Returns true if and only if this visitation attribute is still up-to-date.
boolean current() {
return targetsToVisit.equals(lastVisitation.targetsToVisit)
&& maxDepth <= lastVisitation.maxDepth
&& visitSubincludes == lastVisitation.visitSubincludes;
* Interrupts during the loading phase ===================================
* Bazel can be interrupted in the middle of the loading phase. The mechanics
* of this are far from trivial, so there is an explanation of how they are
* supposed to work. For a description how the same thing works in the
* execution phase, see .
* The sequence of events that happen when the user presses Ctrl-C is the
* following:
* 1. A SIGINT gets delivered to the Bazel client process.
* 2. The client process delivers the SIGINT to the server process.
* 3. The interruption state of the main thread is set to true.
* 4. Sooner or later, this results in an InterruptedException being thrown.
* Usually this takes place because the main thread is interrupted during
* AbstractQueueVisitor.awaitTermination(). The only exception to this is when
* the interruption occurs during the loading of a package of a label
* specified on the command line; in this case, the InterruptedException is
* thrown during the loading of an individual package (see below where this
* can occur)
* 5. The main thread calls ThreadPoolExecutor.shutdown(), which in turn
* interrupts every worker thread. Then the main thread waits for their
* termination.
* 6. An InterruptedException is thrown during the loading of an individual
* package in the worker threads.
* 7. All worker threads terminate.
* 8. An InterruptedException is thrown from
* AbstractQueueVisitor.awaitTermination()
* 9. This exception causes the execution of the currently running command to
* terminate prematurely.
* The interruption of the loading of an individual package can happen in two
* different ways depending on whether Python preprocessing is in effect or
* not.
* If there is no Python preprocessing:
* 1. We periodically check the interruption state of the thread in
* UnixGlob.reallyGlob(). If it is interrupted, an InterruptedException is
* thrown.
* 2. The stack is unwound until we are out of the part of the call stack
* responsible for package loading. This either means that the worker thread
* terminates or that the label parsing terminates if the package that is
* being loaded was specified on the command line.
* If there is Python preprocessing, events are a bit more complicated. In
* this case, the real work happens on the thread the Python preprocessor is
* called from, but in a bit more convoluted way: a new thread is spawned by
* to handle the input from the Python process and
* the output to the Python process is handled on the main thread. The reading
* thread parses requests from the preprocessor, and passes them using a queue
* to the writing thread (that is, the main thread), so that we can do the
* work there. This is important because this way, we don't have any work that
* we need to interrupt in a thread that is not spawned by us. So:
* 1. The interrupted state of the main thread is set.
* 2. This results in an InterruptedException during the execution of the task
* in PythonStdinInputStream.getNextMessage().
* 3. We exit from prematurely, set a flag to
* signal that we were interrupted, and throw an InterruptedIOException.
* 4. The Python child process and reading thread are terminated.
* 5. Based on the flag we set in step 3, we realize that the termination was
* due to an interruption, and an InterruptedException is thrown. This can
* either raise an AbnormalTerminationException, or make Command.execute()
* return normally, so we check for both cases.
* 6. This InterruptedException causes the loading of the package to terminate
* prematurely.
* Life is not simple.
private final TargetProvider targetProvider;
private final DependencyFilter edgeFilter;
private final SetMultimap<Package, Target> visitedMap =
Multimaps.synchronizedSetMultimap(HashMultimap.<Package, Target>create());
private final ConcurrentMap<Label, Integer> visitedTargets = new ConcurrentHashMap<>();
private VisitationAttributes lastVisitation;
* Constant for limiting the permitted depth of recursion.
private static final int RECURSION_LIMIT = 100;
* Construct a LabelVisitor.
* @param targetProvider how to resolve labels to targets
* @param edgeFilter which edges may be traversed
public LabelVisitor(TargetProvider targetProvider, DependencyFilter edgeFilter) {
this.targetProvider = targetProvider;
this.lastVisitation = new VisitationAttributes();
this.edgeFilter = edgeFilter;
boolean syncWithVisitor(
ExtendedEventHandler eventHandler,
Collection<Target> targetsToVisit,
boolean keepGoing,
int parallelThreads,
int maxDepth,
TargetEdgeObserver... observers)
throws InterruptedException {
VisitationAttributes nextVisitation = new VisitationAttributes();
nextVisitation.targetsToVisit = targetsToVisit;
nextVisitation.maxDepth = maxDepth;
if (!lastVisitation.success || !nextVisitation.current()) {
try {
nextVisitation.success = redoVisitation(eventHandler, nextVisitation, keepGoing,
parallelThreads, maxDepth, observers);
return nextVisitation.success;
} finally {
lastVisitation = nextVisitation;
} else {
return true;
// Does a bounded transitive visitation starting at the given top-level targets.
private boolean redoVisitation(
ExtendedEventHandler eventHandler,
VisitationAttributes visitation,
boolean keepGoing,
int parallelThreads,
int maxDepth,
TargetEdgeObserver... observers)
throws InterruptedException {
Visitor visitor = new Visitor(eventHandler, keepGoing, parallelThreads, maxDepth, observers);
Throwable uncaught = null;
boolean result;
try {
} catch (Throwable t) {
uncaught = t;
} finally {
// Run finish() in finally block to ensure we don't leak threads on exceptions.
result = visitor.finish();
return result;
boolean hasVisited(Label target) {
return visitedTargets.containsKey(target);
@VisibleForTesting class Visitor extends AbstractQueueVisitor {
private final static String THREAD_NAME = "LabelVisitor";
private final ExtendedEventHandler eventHandler;
private final boolean keepGoing;
private final int maxDepth;
private final Iterable<TargetEdgeObserver> observers;
private final TargetEdgeErrorObserver errorObserver;
private final AtomicBoolean stopNewActions = new AtomicBoolean(false);
private static final boolean CONCURRENT = true;
public Visitor(
ExtendedEventHandler eventHandler,
boolean keepGoing,
int parallelThreads,
int maxDepth,
TargetEdgeObserver... observers) {
// Observing the loading phase of a typical large package (with all subpackages) shows
// maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is
// therefore conservative and should help us avoid hitting native limits.
super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME);
this.eventHandler = eventHandler;
this.maxDepth = maxDepth;
this.errorObserver = new TargetEdgeErrorObserver();
ImmutableList.Builder<TargetEdgeObserver> builder = ImmutableList.builder();
this.observers =;
this.keepGoing = keepGoing;
* Visit the specified labels and follow the transitive closure of their outbound dependencies.
* @param targets the targets to visit
public void visitTargets(Iterable<Target> targets) throws InterruptedException {
for (Target target : targets) {
visit(null, null, target, 0, 0);
public boolean finish() throws InterruptedException {
awaitQuiescence(/*interruptWorkers=*/ true);
return !errorObserver.hasErrors();
protected boolean blockNewActions() {
return (!keepGoing && errorObserver.hasErrors()) || super.blockNewActions() ||
public void stopNewActions() {
private void enqueueTarget(
final Target from, final Attribute attr, final Label label, final int depth,
final int count) {
// Don't perform the targetProvider lookup if at the maximum depth already.
if (depth >= maxDepth) {
// Avoid thread-related overhead when not crossing packages.
// Can start a new thread when count reaches 100, to prevent infinite recursion.
if (from != null && from.getLabel().getPackageFragment() == label.getPackageFragment() &&
!blockNewActions() && count < RECURSION_LIMIT) {
newVisitRunnable(from, attr, label, depth, count + 1).run();
} else {
execute(newVisitRunnable(from, attr, label, depth, 0));
private Runnable newVisitRunnable(final Target from, final Attribute attr, final Label label,
final int depth, final int count) {
return new Runnable() {
public void run() {
try {
try {
visit(from, attr, targetProvider.getTarget(eventHandler, label), depth + 1, count);
} catch (NoSuchThingException e) {
observeError(from, label, e);
} catch (InterruptedException e) {
private void visitTargetVisibility(Target target, int depth, int count) {
Attribute attribute = null;
if (target instanceof Rule) {
Rule rule = (Rule) target;
RuleClass ruleClass = rule.getRuleClassObject();
if (!ruleClass.hasAttr("visibility", BuildType.NODEP_LABEL_LIST)) {
attribute = ruleClass.getAttributeByName("visibility");
if (!edgeFilter.apply(rule, attribute)) {
for (Label label : target.getVisibility().getDependencyLabels()) {
enqueueTarget(target, attribute, label, depth, count);
* Visit all the labels in a given rule.
* <p>Called in a worker thread if CONCURRENT.
* @param rule the rule to visit
private void visitRule(final Rule rule, final int depth, final int count)
throws InterruptedException {
// Follow all labels defined by this rule:
AggregatingAttributeMapper.of(rule).visitLabels(new AttributeMap.AcceptsLabelAttribute() {
public void acceptLabelAttribute(Label label, Attribute attribute) {
if (!edgeFilter.apply(rule, attribute)) {
enqueueTarget(rule, attribute, label, depth, count);
private void visitPackageGroup(PackageGroup packageGroup, int depth, int count) {
for (final Label include : packageGroup.getIncludes()) {
enqueueTarget(packageGroup, null, include, depth, count);
* Visits the target and its package.
* <p>Potentially blocking invocations into the package cache are enqueued in the worker pool if
private void visit(Target from, Attribute attribute, final Target target, int depth, int count)
throws InterruptedException {
if (target == null) {
throw new NullPointerException(
String.format("'%s' attribute '%s'",
from == null ? "(null)" : from.getLabel().toString(),
attribute == null ? "(null)" : attribute.getName()));
if (depth > maxDepth) {
if (from != null) {
observeEdge(from, attribute, target);
visitAspectsIfRequired(from, attribute, target, depth, count);
visitedMap.put(target.getPackage(), target);
visitTargetNode(target, depth, count);
private void visitAspectsIfRequired(
Target from, Attribute attribute, final Target to, int depth, int count) {
ImmutableMultimap<Attribute, Label> labelsFromAspects =
AspectDefinition.visitAspectsIfRequired(from, attribute, to, edgeFilter);
// Create an edge from target to the attribute value.
for (Entry<Attribute, Label> entry : labelsFromAspects.entries()) {
enqueueTarget(from, entry.getKey(), entry.getValue(), depth, count);
* Visit the specified target. Called in a worker thread if CONCURRENT.
* @param target the target to visit
private void visitTargetNode(Target target, int depth, int count) throws InterruptedException {
Integer minTargetDepth = visitedTargets.putIfAbsent(target.getLabel(), depth);
if (minTargetDepth != null) {
// The target was already visited at a greater depth.
// The closure we are about to build is therefore a subset of what
// has already been built, and we can skip it.
// Also special case MAX_VALUE, where we never want to revisit targets.
// (This avoids loading phase overhead outside of queries).
if (maxDepth == Integer.MAX_VALUE || minTargetDepth <= depth) {
// Check again in case it was overwritten by another thread.
synchronized (visitedTargets) {
if (visitedTargets.get(target.getLabel()) <= depth) {
visitedTargets.put(target.getLabel(), depth);
if (target instanceof OutputFile) {
Rule rule = ((OutputFile) target).getGeneratingRule();
observeEdge(target, null, rule);
// This is the only recursive call to visit which doesn't pass through enqueueTarget().
visit(null, null, rule, depth + 1, count + 1);
visitTargetVisibility(target, depth, count);
} else if (target instanceof InputFile) {
visitTargetVisibility(target, depth, count);
} else if (target instanceof Rule) {
visitTargetVisibility(target, depth, count);
visitRule((Rule) target, depth, count);
} else if (target instanceof PackageGroup) {
visitPackageGroup((PackageGroup) target, depth, count);
private void observeEdge(Target from, Attribute attribute, Target to) {
for (TargetEdgeObserver observer : observers) {
observer.edge(from, attribute, to);
private void observeNode(Target target) {
for (TargetEdgeObserver observer : observers) {
private void observeError(Target from, Label label, NoSuchThingException e)
throws InterruptedException {
for (TargetEdgeObserver observer : observers) {
observer.missingEdge(from, label, e);