Create a NestedSet method toListWithTimeout.
This will be used to time out NestedSet fetches.
PiperOrigin-RevId: 300637023
diff --git a/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSet.java b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSet.java
index 25911c4..49c9be4 100644
--- a/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSet.java
+++ b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSet.java
@@ -16,6 +16,7 @@
import static java.util.stream.Collectors.joining;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
@@ -26,6 +27,7 @@
import com.google.devtools.build.lib.skyframe.serialization.autocodec.AutoCodec;
import com.google.devtools.build.lib.util.ExitCode;
import com.google.protobuf.ByteString;
+import java.time.Duration;
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
@@ -34,6 +36,8 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -213,7 +217,9 @@
/** Same as {@link #getChildren}, except propagates {@link InterruptedException}. */
Object getChildrenInterruptibly() throws InterruptedException {
- return getChildrenInternal(InterruptStrategy.PROPAGATE);
+ return children instanceof ListenableFuture
+ ? MoreFutures.waitForFutureAndGet((ListenableFuture<Object[]>) children)
+ : children;
}
/**
@@ -253,11 +259,7 @@
case CRASH:
return getChildrenUninterruptibly();
case PROPAGATE:
- if (children instanceof ListenableFuture) {
- return MoreFutures.waitForFutureAndGet((ListenableFuture<Object[]>) children);
- } else {
- return children;
- }
+ return getChildrenInterruptibly();
}
throw new IllegalStateException("Unknown interrupt strategy " + interruptStrategy);
}
@@ -302,7 +304,33 @@
* but will propagate an {@code InterruptedException} if one is thrown.
*/
ImmutableList<E> toListInterruptibly() throws InterruptedException {
- return toList(InterruptStrategy.PROPAGATE);
+ return actualChildrenToList(getChildrenInterruptibly());
+ }
+
+ /**
+ * Returns an immutable list of all unique elements of the this set, similar to {@link #toList},
+ * but will propagate an {@code InterruptedException} if one is thrown and will throw {@link
+ * TimeoutException} if this set is deserializing and does not become ready within the given
+ * timeout.
+ *
+ * <p>Note that the timeout only applies to blocking for the deserialization future to become
+ * available. The actual list transformation is untimed.
+ */
+ ImmutableList<E> toListWithTimeout(Duration timeout)
+ throws InterruptedException, TimeoutException {
+ Object actualChildren;
+ if (children instanceof ListenableFuture) {
+ try {
+ actualChildren =
+ ((ListenableFuture<Object[]>) children).get(timeout.toNanos(), TimeUnit.NANOSECONDS);
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause(), InterruptedException.class);
+ throw new IllegalStateException(e);
+ }
+ } else {
+ actualChildren = children;
+ }
+ return actualChildrenToList(actualChildren);
}
/**
@@ -313,29 +341,22 @@
* efficiency, as it saves an iteration.
*/
public ImmutableList<E> toList() {
- try {
- return toList(InterruptStrategy.CRASH);
- } catch (InterruptedException e) {
- throw new IllegalStateException("InterruptedException should have already been caught", e);
- }
+ return actualChildrenToList(getChildrenUninterruptibly());
}
/**
- * Private implementation of toList which will either propagate an {@code InterruptedException} if
- * one occurs while waiting for a {@code Future} in {@link #getChildren} or will have {@link
- * #getChildrenInternal} handle it.
+ * Private implementation of toList which takes the actual children (the deserialized {@code
+ * Object[]} if {@link #children} is a {@link ListenableFuture}).
*/
- private ImmutableList<E> toList(InterruptStrategy interruptStrategy) throws InterruptedException {
- if (isSingleton()) {
- // No need to check for ListenableFuture members - singletons can't have them.
- return ImmutableList.of((E) children);
- }
- if (isEmpty()) {
+ private ImmutableList<E> actualChildrenToList(Object actualChildren) {
+ if (actualChildren == EMPTY_CHILDREN) {
return ImmutableList.of();
}
- return getOrder() == Order.LINK_ORDER
- ? expand(interruptStrategy).reverse()
- : expand(interruptStrategy);
+ if (!(actualChildren instanceof Object[])) {
+ return ImmutableList.of((E) actualChildren);
+ }
+ ImmutableList<E> list = expand((Object[]) actualChildren);
+ return getOrder() == Order.LINK_ORDER ? list.reverse() : list;
}
/**
@@ -435,16 +456,15 @@
* this.memo}: wrap our direct items in a list, call {@link #lockedExpand} to perform the initial
* {@link #walk}, or call {@link #replay} if we have a nontrivial memo.
*/
- private ImmutableList<E> expand(InterruptStrategy interruptStrategy) throws InterruptedException {
+ private ImmutableList<E> expand(Object[] children) {
// This value is only set in the constructor, so safe to test here with no lock.
if (memo == LEAF_MEMO) {
- return ImmutableList.copyOf(new ArraySharingCollection<>((Object[]) children));
+ return ImmutableList.copyOf(new ArraySharingCollection<>(children));
}
- CompactHashSet<E> members = lockedExpand(interruptStrategy);
+ CompactHashSet<E> members = lockedExpand(children);
if (members != null) {
return ImmutableList.copyOf(members);
}
- Object[] children = (Object[]) getChildrenInternal(interruptStrategy);
ImmutableList.Builder<E> output = ImmutableList.builderWithExpectedSize(orderAndSize >> 2);
replay(output, children, memo, 0);
return output.build();
@@ -478,12 +498,10 @@
* If this is the first call for this object, fills {@code this.memo} and returns a set from
* {@link #walk}. Otherwise returns null; the caller should use {@link #replay} instead.
*/
- private synchronized CompactHashSet<E> lockedExpand(InterruptStrategy interruptStrategy)
- throws InterruptedException {
+ private synchronized CompactHashSet<E> lockedExpand(Object[] children) {
if (memo != null) {
return null;
}
- Object[] children = (Object[]) getChildrenInternal(interruptStrategy);
CompactHashSet<E> members = CompactHashSet.createWithExpectedSize(128);
CompactHashSet<Object> sets = CompactHashSet.createWithExpectedSize(128);
sets.add(children);
diff --git a/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetImplTest.java b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetImplTest.java
index 2c235d3..ab91335 100644
--- a/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetImplTest.java
+++ b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetImplTest.java
@@ -16,11 +16,16 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.testing.EqualsTester;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -275,4 +280,35 @@
Thread.currentThread().interrupt();
assertThrows(InterruptedException.class, deserialzingNestedSet::getChildrenInterruptibly);
}
+
+ @Test
+ public void toListWithTimeout_propagatesInterrupt() {
+ NestedSet<String> deserialzingNestedSet =
+ NestedSet.withFuture(Order.STABLE_ORDER, SettableFuture.create());
+ Thread.currentThread().interrupt();
+ assertThrows(
+ InterruptedException.class,
+ () -> deserialzingNestedSet.toListWithTimeout(Duration.ofDays(1)));
+ }
+
+ @Test
+ public void toListWithTimeout_timesOut() {
+ NestedSet<String> deserialzingNestedSet =
+ NestedSet.withFuture(Order.STABLE_ORDER, SettableFuture.create());
+ assertThrows(
+ TimeoutException.class, () -> deserialzingNestedSet.toListWithTimeout(Duration.ofNanos(1)));
+ }
+
+ @Test
+ public void toListWithTimeout_waits() throws Exception {
+ SettableFuture<Object[]> future = SettableFuture.create();
+ NestedSet<String> deserialzingNestedSet = NestedSet.withFuture(Order.STABLE_ORDER, future);
+ Future<ImmutableList<String>> result =
+ Executors.newSingleThreadExecutor()
+ .submit(() -> deserialzingNestedSet.toListWithTimeout(Duration.ofMinutes(1)));
+ Thread.sleep(100);
+ assertThat(result.isDone()).isFalse();
+ future.set(new Object[] {"a", "b"});
+ assertThat(result.get()).containsExactly("a", "b");
+ }
}