blob: b8087c51c5510271b393d2d5244b5b72e34d1d69 [file] [log] [blame]
// Copyright 2015 The Bazel Authors. All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
import java.util.Collections;
import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
/** Several query utilities to make easier to work with query callbacks and uniquifiers. */
public final class QueryUtil {
private QueryUtil() { }
/** A {@link Callback} that can aggregate all the partial results into one set. */
public interface AggregateAllCallback<T> extends Callback<T> {
Set<T> getResult();
/** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */
public interface ThreadSafeAggregateAllCallback<T>
extends AggregateAllCallback<T>, ThreadSafeCallback<T> {
/** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
public abstract static class AggregateAllOutputFormatterCallback<T>
extends OutputFormatterCallback<T> implements AggregateAllCallback<T> {
private static class AggregateAllOutputFormatterCallbackImpl<T>
extends AggregateAllOutputFormatterCallback<T> {
// We only add to the CompactHashSet, so iteration order is the same as insertion order.
private final Set<T> result = CompactHashSet.create();
public final void processOutput(Iterable<T> partialResult) {
Iterables.addAll(result, partialResult);
public Set<T> getResult() {
return result;
private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl<T>
implements ThreadSafeAggregateAllCallback<T> {
private final Set<T> result = Sets.newConcurrentHashSet();
public final void process(Iterable<T> partialResult) {
Iterables.addAll(result, partialResult);
public Set<T> getResult() {
return result;
* Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
* results into one set.
* <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
* {@link #newThreadSafeAggregateAllCallback}.
public static <T> AggregateAllOutputFormatterCallback<T>
newAggregateAllOutputFormatterCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
* Returns a fresh {@link AggregateAllCallback}.
* <p>Intended to be used by {@link QueryEnvironment#eval} implementations that make use of
* neither of streaming evaluation nor parallel evaluation.
public static <T> AggregateAllCallback<T> newOrderedAggregateAllCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
* Returns a fresh {@link ThreadSafeAggregateAllCallback}.
* <p>Intended to be used by {@link QueryExpression} implementations; contrast with
* {@link #newAggregateAllOutputFormatterCallback}.
public static <T> AggregateAllCallback<T> newThreadSafeAggregateAllCallback() {
return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>();
* Fully evaluate a {@code QueryExpression} and return a set with all the results.
* <p>Should only be used by QueryExpressions when it is the only way of achieving correctness.
public static <T> Set<T> evalAll(
QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
throws QueryException, InterruptedException {
AggregateAllCallback<T> callback = newThreadSafeAggregateAllCallback();
env.eval(expr, context, callback);
return callback.getResult();
/** A trivial {@link Uniquifier} base class. */
public abstract static class AbstractUniquifier<T, K>
extends AbstractUniquifierBase<T, K> {
private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
public final boolean unique(T element) {
return alreadySeen.add(extractKey(element));
* Extracts an unique key that can be used to dedupe the given {@code element}.
* <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
protected abstract K extractKey(T element);
/** A trivial {@link ThreadSafeUniquifier} base class. */
public abstract static class AbstractThreadSafeUniquifier<T, K>
extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
private final Set<K> alreadySeen;
protected AbstractThreadSafeUniquifier(int concurrencyLevel) {
this.alreadySeen = Collections.newSetFromMap(
new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
public final boolean unique(T element) {
return alreadySeen.add(extractKey(element));
* Extracts an unique key that can be used to dedupe the given {@code element}.
* <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
protected abstract K extractKey(T element);
private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
public final ImmutableList<T> unique(Iterable<T> newElements) {
ImmutableList.Builder<T> result = ImmutableList.builder();
for (T element : newElements) {
if (unique(element)) {
* An abstraction for sending results to a {@link Callback} based on partial query results.
* <p>Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval}
* implementations.
interface Processor<T> {
* Processes the given {@code partialResult} and then invokes the given {@code callback} with
* a further result.
void process(Iterable<T> partialResult, Callback<T> callback)
throws QueryException, InterruptedException;
* Similar to {@link Processor}, except the {@link #process} function also gets access to a
* {@link Uniquifier}.
interface ProcessorWithUniquifier<T> {
* Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes
* the given {@code callback} with a further result.
void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback)
throws QueryException, InterruptedException;
* Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is,
* this method returns a {@code c} such that {@code c.process(r)} calls
* {@code callback.process(r, callback)}.
* <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback}
* is as well.
public static <T> Callback<T> compose(
final Processor<T> processor,
final Callback<T> callback) {
return callback instanceof ThreadSafeCallback
? new ThreadSafeCallback<T>() {
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, callback);
: new Callback<T>() {
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, callback);
* Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied
* to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)}
* calls {@code callback.process(r, uniquifier, c)}.
* <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a
* {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}.
public static <T> Callback<T> compose(
final ProcessorWithUniquifier<T> processor,
final Uniquifier<T> uniquifier,
final Callback<T> callback) {
return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier)
? new ThreadSafeCallback<T>() {
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, uniquifier, callback);
: new Callback<T>() {
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, uniquifier, callback);