blob: b8087c51c5510271b393d2d5244b5b72e34d1d69 [file] [log] [blame]
// Copyright 2015 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.collect.CompactHashSet;
import java.util.Collections;
import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
/** Several query utilities to make easier to work with query callbacks and uniquifiers. */
public final class QueryUtil {
private QueryUtil() { }
/** A {@link Callback} that can aggregate all the partial results into one set. */
public interface AggregateAllCallback<T> extends Callback<T> {
Set<T> getResult();
}
/** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */
public interface ThreadSafeAggregateAllCallback<T>
extends AggregateAllCallback<T>, ThreadSafeCallback<T> {
}
/** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
public abstract static class AggregateAllOutputFormatterCallback<T>
extends OutputFormatterCallback<T> implements AggregateAllCallback<T> {
}
private static class AggregateAllOutputFormatterCallbackImpl<T>
extends AggregateAllOutputFormatterCallback<T> {
// We only add to the CompactHashSet, so iteration order is the same as insertion order.
private final Set<T> result = CompactHashSet.create();
@Override
public final void processOutput(Iterable<T> partialResult) {
Iterables.addAll(result, partialResult);
}
@Override
public Set<T> getResult() {
return result;
}
}
private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl<T>
implements ThreadSafeAggregateAllCallback<T> {
private final Set<T> result = Sets.newConcurrentHashSet();
@Override
public final void process(Iterable<T> partialResult) {
Iterables.addAll(result, partialResult);
}
@Override
public Set<T> getResult() {
return result;
}
}
/**
* Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
* results into one set.
*
* <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
* {@link #newThreadSafeAggregateAllCallback}.
*/
public static <T> AggregateAllOutputFormatterCallback<T>
newAggregateAllOutputFormatterCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
}
/**
* Returns a fresh {@link AggregateAllCallback}.
*
* <p>Intended to be used by {@link QueryEnvironment#eval} implementations that make use of
* neither of streaming evaluation nor parallel evaluation.
*/
public static <T> AggregateAllCallback<T> newOrderedAggregateAllCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
}
/**
* Returns a fresh {@link ThreadSafeAggregateAllCallback}.
*
* <p>Intended to be used by {@link QueryExpression} implementations; contrast with
* {@link #newAggregateAllOutputFormatterCallback}.
*/
public static <T> AggregateAllCallback<T> newThreadSafeAggregateAllCallback() {
return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>();
}
/**
* Fully evaluate a {@code QueryExpression} and return a set with all the results.
*
* <p>Should only be used by QueryExpressions when it is the only way of achieving correctness.
*/
public static <T> Set<T> evalAll(
QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
throws QueryException, InterruptedException {
AggregateAllCallback<T> callback = newThreadSafeAggregateAllCallback();
env.eval(expr, context, callback);
return callback.getResult();
}
/** A trivial {@link Uniquifier} base class. */
public abstract static class AbstractUniquifier<T, K>
extends AbstractUniquifierBase<T, K> {
private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
@Override
public final boolean unique(T element) {
return alreadySeen.add(extractKey(element));
}
/**
* Extracts an unique key that can be used to dedupe the given {@code element}.
*
* <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
*/
protected abstract K extractKey(T element);
}
/** A trivial {@link ThreadSafeUniquifier} base class. */
public abstract static class AbstractThreadSafeUniquifier<T, K>
extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
private final Set<K> alreadySeen;
protected AbstractThreadSafeUniquifier(int concurrencyLevel) {
this.alreadySeen = Collections.newSetFromMap(
new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
}
@Override
public final boolean unique(T element) {
return alreadySeen.add(extractKey(element));
}
/**
* Extracts an unique key that can be used to dedupe the given {@code element}.
*
* <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
*/
protected abstract K extractKey(T element);
}
private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
@Override
public final ImmutableList<T> unique(Iterable<T> newElements) {
ImmutableList.Builder<T> result = ImmutableList.builder();
for (T element : newElements) {
if (unique(element)) {
result.add(element);
}
}
return result.build();
}
}
/**
* An abstraction for sending results to a {@link Callback} based on partial query results.
*
* <p>Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval}
* implementations.
*/
@ThreadSafe
interface Processor<T> {
/**
* Processes the given {@code partialResult} and then invokes the given {@code callback} with
* a further result.
*/
void process(Iterable<T> partialResult, Callback<T> callback)
throws QueryException, InterruptedException;
}
/**
* Similar to {@link Processor}, except the {@link #process} function also gets access to a
* {@link Uniquifier}.
*/
@ThreadSafe
interface ProcessorWithUniquifier<T> {
/**
* Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes
* the given {@code callback} with a further result.
*/
void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback)
throws QueryException, InterruptedException;
}
/**
* Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is,
* this method returns a {@code c} such that {@code c.process(r)} calls
* {@code callback.process(r, callback)}.
*
* <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback}
* is as well.
*/
public static <T> Callback<T> compose(
final Processor<T> processor,
final Callback<T> callback) {
return callback instanceof ThreadSafeCallback
? new ThreadSafeCallback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, callback);
}
}
: new Callback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, callback);
}
};
}
/**
* Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied
* to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)}
* calls {@code callback.process(r, uniquifier, c)}.
*
* <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a
* {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}.
*/
public static <T> Callback<T> compose(
final ProcessorWithUniquifier<T> processor,
final Uniquifier<T> uniquifier,
final Callback<T> callback) {
return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier)
? new ThreadSafeCallback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, uniquifier, callback);
}
}
: new Callback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
processor.process(partialResult, uniquifier, callback);
}
};
}
}