blob: 1d87384a14e4d6f3308cf53bf1b5557bcd23840d [file] [log] [blame]
// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations
* of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods.
*/
public abstract class AbstractQueryEnvironment<T> implements QueryEnvironment<T> {
/** Concrete implementation of {@link QueryTaskFuture}. */
protected static final class QueryTaskFutureImpl<T>
extends QueryTaskFutureImplBase<T> implements ListenableFuture<T> {
private final ListenableFuture<T> delegate;
private QueryTaskFutureImpl(ListenableFuture<T> delegate) {
this.delegate = delegate;
}
public static <R> QueryTaskFutureImpl<R> ofDelegate(ListenableFuture<R> delegate) {
return (delegate instanceof QueryTaskFutureImpl)
? (QueryTaskFutureImpl<R>) delegate
: new QueryTaskFutureImpl<>(delegate);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
@Override
public void addListener(Runnable listener, Executor executor) {
delegate.addListener(listener, executor);
}
@Override
public T getIfSuccessful() {
try {
return Futures.getDone(delegate);
} catch (CancellationException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
public T getChecked() throws InterruptedException, QueryException {
try {
return get();
} catch (CancellationException e) {
throw new InterruptedException();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, QueryException.class);
Throwables.propagateIfPossible(cause, InterruptedException.class);
throw new IllegalStateException(e.getCause());
}
}
}
@Override
public <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value) {
return new QueryTaskFutureImpl<>(Futures.immediateFuture(value));
}
@Override
public <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e) {
return new QueryTaskFutureImpl<>(Futures.<R>immediateFailedFuture(e));
}
@Override
public <R> QueryTaskFuture<R> immediateCancelledFuture() {
return new QueryTaskFutureImpl<>(Futures.<R>immediateCancelledFuture());
}
@Override
public QueryTaskFuture<Void> eval(
QueryExpression expr, QueryExpressionContext<T> context, final Callback<T> callback) {
// Not all QueryEnvironment implementations embrace the async+streaming evaluation framework. In
// particular, the streaming callbacks employed by functions like 'deps' use
// QueryEnvironment#buildTransitiveClosure. So if the implementation of that method does some
// heavyweight blocking work, then it's best to do this blocking work in a single batch.
// Importantly, the callback we pass in needs to maintain order.
final QueryUtil.AggregateAllCallback<T, ?> aggregateAllCallback =
QueryUtil.newOrderedAggregateAllOutputFormatterCallback(this);
QueryTaskFuture<Void> evalAllFuture = expr.eval(this, context, aggregateAllCallback);
return whenSucceedsCall(
evalAllFuture,
new QueryTaskCallable<Void>() {
@Override
public Void call() throws QueryException, InterruptedException {
callback.process(aggregateAllCallback.getResult());
return null;
}
});
}
@Override
public <R> QueryTaskFuture<R> execute(QueryTaskCallable<R> callable) {
try {
return immediateSuccessfulFuture(callable.call());
} catch (QueryException e) {
return immediateFailedFuture(e);
} catch (InterruptedException e) {
return immediateCancelledFuture();
}
}
@Override
public <R> QueryTaskFuture<R> executeAsync(QueryTaskAsyncCallable<R> callable) {
return callable.call();
}
@Override
public <R> QueryTaskFuture<R> whenSucceedsCall(
QueryTaskFuture<?> future, QueryTaskCallable<R> callable) {
return whenAllSucceedCall(ImmutableList.of(future), callable);
}
private static class Dummy implements QueryTaskCallable<Void> {
public static final Dummy INSTANCE = new Dummy();
private Dummy() {}
@Override
public Void call() {
return null;
}
}
@Override
public QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures) {
return whenAllSucceedCall(futures, Dummy.INSTANCE);
}
@Override
public <R> QueryTaskFuture<R> whenAllSucceedCall(
Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
return QueryTaskFutureImpl.ofDelegate(
Futures.whenAllSucceed(cast(futures)).call(callable, directExecutor()));
}
@Override
public <T1, T2> QueryTaskFuture<T2> transformAsync(
QueryTaskFuture<T1> future,
final Function<T1, QueryTaskFuture<T2>> function) {
return QueryTaskFutureImpl.ofDelegate(
Futures.transformAsync(
(QueryTaskFutureImpl<T1>) future,
input -> (QueryTaskFutureImpl<T2>) function.apply(input),
directExecutor()));
}
protected static Iterable<QueryTaskFutureImpl<?>> cast(
Iterable<? extends QueryTaskFuture<?>> futures) {
return Iterables.transform(futures, future -> (QueryTaskFutureImpl<?>) future);
}
}