blob: e52910912190841d1ea76bac7302e1a8dfb3023f [file] [log] [blame]
// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* Supports execution with retries on particular gRPC Statuses. The retrier is ThreadSafe.
*
* <p>Example usage: The simple use-case is to call retrier.execute, e.g:
*
* <pre>
* foo = retrier.execute(
* new Callable<Foo>() {
* @Override
* public Foo call() {
* return grpcStub.getFoo(fooRequest);
* }
* });
* </pre>
*/
public class Retrier {
/** Wraps around a StatusRuntimeException to make it pass through a single layer of retries. */
public static class PassThroughException extends Exception {
public PassThroughException(StatusRuntimeException e) {
super(e);
}
}
/**
* Backoff is a stateful object providing a sequence of durations that are used to time delays
* between retries. It is not ThreadSafe. The reason that Backoff needs to be stateful, rather
* than a static map of attempt number to delay, is to enable using the retrier via the manual
* calling isRetriable and nextDelayMillis manually (see ByteStreamUploader example).
*/
public interface Backoff {
/** Indicates that no more retries should be made for use in {@link #nextDelayMillis()}. */
static final long STOP = -1L;
/** Returns the next delay in milliseconds, or < 0 if we should not continue retrying. */
long nextDelayMillis();
/**
* Returns the number of calls to {@link #nextDelayMillis()} thus far, not counting any calls
* that returned STOP.
*/
int getRetryAttempts();
/**
* Creates a Backoff supplier for a Backoff which does not support any retries. Both the
* Supplier and the Backoff are stateless and thread-safe.
*/
static final Supplier<Backoff> NO_RETRIES =
() ->
new Backoff() {
@Override
public long nextDelayMillis() {
return STOP;
}
@Override
public int getRetryAttempts() {
return 0;
}
};
/**
* Creates a Backoff supplier for an optionally jittered exponential backoff. The supplier is
* ThreadSafe (non-synchronized calls to get() are fine), but the returned Backoff is not.
*
* @param initial The initial backoff duration.
* @param max The maximum backoff duration.
* @param multiplier The amount the backoff should increase in each iteration. Must be >1.
* @param jitter The amount the backoff should be randomly varied (0-1), with 0 providing no
* jitter, and 1 providing a duration that is 0-200% of the non-jittered duration.
* @param maxAttempts Maximal times to attempt a retry 0 means no retries.
*/
static Supplier<Backoff> exponential(
Duration initial, Duration max, double multiplier, double jitter, int maxAttempts) {
Preconditions.checkArgument(multiplier > 1, "multipler must be > 1");
Preconditions.checkArgument(jitter >= 0 && jitter <= 1, "jitter must be in the range (0, 1)");
Preconditions.checkArgument(maxAttempts >= 0, "maxAttempts must be >= 0");
return () ->
new Backoff() {
private final long maxMillis = max.toMillis();
private long nextDelayMillis = initial.toMillis();
private int attempts = 0;
@Override
public long nextDelayMillis() {
if (attempts == maxAttempts) {
return STOP;
}
attempts++;
double jitterRatio = jitter * (ThreadLocalRandom.current().nextDouble(2.0) - 1);
long result = (long) (nextDelayMillis * (1 + jitterRatio));
// Advance current by the non-jittered result.
nextDelayMillis = (long) (nextDelayMillis * multiplier);
if (nextDelayMillis > maxMillis) {
nextDelayMillis = maxMillis;
}
return result;
}
@Override
public int getRetryAttempts() {
return attempts;
}
};
}
}
public static final Predicate<Status> DEFAULT_IS_RETRIABLE =
st -> {
switch (st.getCode()) {
case CANCELLED:
return !Thread.currentThread().isInterrupted();
case UNKNOWN:
case DEADLINE_EXCEEDED:
case ABORTED:
case INTERNAL:
case UNAVAILABLE:
case UNAUTHENTICATED:
case RESOURCE_EXHAUSTED:
return true;
default:
return false;
}
};
public static final Predicate<Status> RETRY_ALL = Predicates.alwaysTrue();
public static final Predicate<Status> RETRY_NONE = Predicates.alwaysFalse();
public static final Retrier NO_RETRIES = new Retrier(Backoff.NO_RETRIES, RETRY_NONE);
private final Supplier<Backoff> backoffSupplier;
private final Predicate<Status> isRetriable;
@VisibleForTesting
Retrier(Supplier<Backoff> backoffSupplier, Predicate<Status> isRetriable) {
this.backoffSupplier = backoffSupplier;
this.isRetriable = isRetriable;
}
public Retrier(RemoteOptions options) {
this(
options.experimentalRemoteRetry
? Backoff.exponential(
Duration.ofMillis(options.experimentalRemoteRetryStartDelayMillis),
Duration.ofMillis(options.experimentalRemoteRetryMaxDelayMillis),
options.experimentalRemoteRetryMultiplier,
options.experimentalRemoteRetryJitter,
options.experimentalRemoteRetryMaxAttempts)
: Backoff.NO_RETRIES,
DEFAULT_IS_RETRIABLE);
}
/**
* Returns {@code true} if the {@link Status} is retriable.
*/
public boolean isRetriable(Status s) {
return isRetriable.apply(s);
}
/**
* Executes the given callable in a loop, retrying on retryable errors, as defined by the current
* backoff/retry policy. Will raise the last encountered retriable error, or the first
* non-retriable error.
*
* <p>This method never throws {@link StatusRuntimeException} even if the passed-in Callable does.
*
* @param c The callable to execute.
*/
public <T> T execute(Callable<T> c) throws InterruptedException, IOException {
Backoff backoff = backoffSupplier.get();
while (true) {
try {
return c.call();
} catch (PassThroughException e) {
throw (StatusRuntimeException) e.getCause();
} catch (RetryException e) {
throw e; // Nested retries are always pass-through.
} catch (StatusException | StatusRuntimeException e) {
Status st = Status.fromThrowable(e);
int attempts = backoff.getRetryAttempts();
long delay = backoff.nextDelayMillis();
if (st.getCode() == Status.Code.CANCELLED && Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
if (delay < 0 || !isRetriable.apply(st)) {
throw new RetryException(st.asRuntimeException(), attempts);
}
sleep(delay);
} catch (Exception e) {
// Generic catch because Callable is declared to throw Exception, we rethrow any unchecked
// exception as well as any exception we declared above.
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, IOException.class);
Throwables.throwIfInstanceOf(e, InterruptedException.class);
throw new RetryException(e, backoff.getRetryAttempts());
}
}
}
@VisibleForTesting
void sleep(long timeMillis) throws InterruptedException {
Preconditions.checkArgument(
timeMillis >= 0L, "timeMillis must not be negative: %s", timeMillis);
TimeUnit.MILLISECONDS.sleep(timeMillis);
}
public Backoff newBackoff() {
return backoffSupplier.get();
}
}