Pull out `NestedSetCache` to a top-level class, renaming it `NestedSetSerializationCache`.

The whole thing is already `@VisibleForTesting` anyway, and I'm planning to add more logic to it for configuration trimming (preview: `NestedSet`s that differ only in configuration will have the same serialized representation and thus fingerprint).

Add more thorough unit tests for `NestedSetSerializationCache`.

PiperOrigin-RevId: 402946970
diff --git a/src/main/java/com/google/devtools/build/lib/collect/nestedset/BUILD b/src/main/java/com/google/devtools/build/lib/collect/nestedset/BUILD
index d7d4d29..7099785 100644
--- a/src/main/java/com/google/devtools/build/lib/collect/nestedset/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/collect/nestedset/BUILD
@@ -17,6 +17,7 @@
         "NestedSet.java",
         "NestedSetBuilder.java",
         "NestedSetCodecWithStore.java",
+        "NestedSetSerializationCache.java",
         "NestedSetStore.java",
         "NestedSetVisitor.java",
         "Order.java",
diff --git a/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCache.java b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCache.java
new file mode 100644
index 0000000..9c74ce0
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCache.java
@@ -0,0 +1,147 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.collect.nestedset;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.bugreport.BugReporter;
+import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.FingerprintComputationResult;
+import com.google.devtools.build.lib.skyframe.serialization.SerializationConstants;
+import com.google.protobuf.ByteString;
+import javax.annotation.Nullable;
+
+/**
+ * A bidirectional, in-memory, weak cache for fingerprint ⟺ {@link NestedSet} associations.
+ *
+ * <p>For use by {@link NestedSetStore} to minimize work during {@link NestedSet} (de)serialization.
+ */
+class NestedSetSerializationCache {
+
+  /**
+   * Fingerprint to array cache.
+   *
+   * <p>The values in this cache are always {@code Object[]} or {@code ListenableFuture<Object[]>}.
+   * We avoid a common wrapper object both for memory efficiency and because our cache eviction
+   * policy is based on value GC, and wrapper objects would defeat that.
+   *
+   * <p>While a fetch for the contents is outstanding, the value in the cache will be a {@link
+   * ListenableFuture}. When it is resolved, it is replaced with the unwrapped {@code Object[]}.
+   * This is done because if the array is a transitive member, its future may be GC'd, and we want
+   * entries to stay in this cache while the contents are still live.
+   */
+  private final Cache<ByteString, Object> fingerprintToContents =
+      Caffeine.newBuilder()
+          .initialCapacity(SerializationConstants.DESERIALIZATION_POOL_SIZE)
+          .weakValues()
+          .build();
+
+  /** {@code Object[]} contents to fingerprint. Maintained for fast fingerprinting. */
+  private final Cache<Object[], FingerprintComputationResult> contentsToFingerprint =
+      Caffeine.newBuilder()
+          .initialCapacity(SerializationConstants.DESERIALIZATION_POOL_SIZE)
+          .weakKeys()
+          .build();
+
+  private final BugReporter bugReporter;
+
+  NestedSetSerializationCache(BugReporter bugReporter) {
+    this.bugReporter = bugReporter;
+  }
+
+  /**
+   * Returns contents (an {@code Object[]} or a {@code ListenableFuture<Object[]>}) for the {@link
+   * NestedSet} associated with the given fingerprint if there was already one. Otherwise associates
+   * {@code future} with {@code fingerprint} and returns {@code null}.
+   *
+   * <p>Upon a {@code null} return, the caller should ensure that the given future is eventually set
+   * with the fetched contents.
+   *
+   * <p>Upon a non-{@code null} return, the caller should discard the given future in favor of the
+   * returned contents, blocking for them if the return value is itself a future.
+   *
+   * @param fingerprint the fingerprint of the desired {@link NestedSet} contents
+   * @param future a freshly created {@link SettableFuture}
+   */
+  @Nullable
+  Object putIfAbsent(ByteString fingerprint, SettableFuture<Object[]> future) {
+    checkArgument(!future.isDone(), "Must pass a fresh future: %s", future);
+    Object existing = fingerprintToContents.asMap().putIfAbsent(fingerprint, future);
+    if (existing != null) {
+      return existing;
+    }
+    // This is the first request of this fingerprint.
+    unwrapWhenDone(fingerprint, future);
+    return null;
+  }
+
+  /**
+   * Registers a {@link FutureCallback} that associates the provided {@code fingerprint} and the
+   * contents of the future, when it completes.
+   *
+   * <p>There may be a race between this call and calls to {@link #put}. Those races are benign,
+   * since the fingerprint should be the same regardless. We may pessimistically end up having a
+   * future to wait on for serialization that isn't actually necessary, but that isn't a big
+   * concern.
+   */
+  private void unwrapWhenDone(ByteString fingerprint, ListenableFuture<Object[]> futureContents) {
+    Futures.addCallback(
+        futureContents,
+        new FutureCallback<Object[]>() {
+          @Override
+          public void onSuccess(Object[] contents) {
+            // Replace the cache entry with the unwrapped contents, since the Future may be GC'd.
+            fingerprintToContents.put(fingerprint, contents);
+            // There may already be an entry here, but it's better to put a fingerprint result with
+            // an immediate future, since then later readers won't need to block unnecessarily. It
+            // would be nice to check the old value, but Cache#put doesn't provide it to us.
+            contentsToFingerprint.put(
+                contents, FingerprintComputationResult.create(fingerprint, immediateVoidFuture()));
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            // Failure to fetch the NestedSet contents is unexpected, but the failed future can be
+            // stored as the NestedSet children. This way the exception is only propagated if the
+            // NestedSet is consumed (unrolled).
+            bugReporter.sendBugReport(t);
+          }
+        },
+        directExecutor());
+  }
+
+  /**
+   * Retrieves the fingerprint associated with the given {@link NestedSet} contents, or {@code null}
+   * if the given contents are not known.
+   */
+  @Nullable
+  FingerprintComputationResult fingerprintForContents(Object[] contents) {
+    return contentsToFingerprint.getIfPresent(contents);
+  }
+
+  // TODO(janakr): Currently, racing threads can overwrite each other's
+  // fingerprintComputationResult, leading to confusion and potential performance drag. Fix this.
+  void put(FingerprintComputationResult result, Object[] contents) {
+    contentsToFingerprint.put(contents, result);
+    fingerprintToContents.put(result.fingerprint(), contents);
+  }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetStore.java b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetStore.java
index ca8b5fd..48672fc 100644
--- a/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetStore.java
+++ b/src/main/java/com/google/devtools/build/lib/collect/nestedset/NestedSetStore.java
@@ -13,10 +13,10 @@
 // limitations under the License.
 package com.google.devtools.build.lib.collect.nestedset;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
@@ -25,11 +25,9 @@
 import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.bugreport.BugReporter;
 import com.google.devtools.build.lib.skyframe.serialization.DeserializationContext;
-import com.google.devtools.build.lib.skyframe.serialization.SerializationConstants;
 import com.google.devtools.build.lib.skyframe.serialization.SerializationContext;
 import com.google.devtools.build.lib.skyframe.serialization.SerializationException;
 import com.google.protobuf.ByteString;
@@ -41,7 +39,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
 
@@ -130,117 +127,6 @@
     }
   }
 
-  /** An in-memory cache for fingerprint <-> NestedSet associations. */
-  @VisibleForTesting
-  static class NestedSetCache {
-    private final BugReporter bugReporter;
-
-    /**
-     * Fingerprint to array cache.
-     *
-     * <p>The values in this cache are always {@code Object[]} or {@code
-     * ListenableFuture<Object[]>}. We avoid a common wrapper object both for memory efficiency and
-     * because our cache eviction policy is based on value GC, and wrapper objects would defeat
-     * that.
-     *
-     * <p>While a fetch for the contents is outstanding, the value in the cache will be a {@link
-     * ListenableFuture}. When it is resolved, it is replaced with the unwrapped {@code Object[]}.
-     * This is done because if the array is a transitive member, its future may be GC'd, and we want
-     * entries to stay in this cache while the contents are still live.
-     */
-    private final Cache<ByteString, Object> fingerprintToContents =
-        Caffeine.newBuilder()
-            .initialCapacity(SerializationConstants.DESERIALIZATION_POOL_SIZE)
-            .weakValues()
-            .build();
-
-    /** {@code Object[]} contents to fingerprint. Maintained for fast fingerprinting. */
-    private final Cache<Object[], FingerprintComputationResult> contentsToFingerprint =
-        Caffeine.newBuilder()
-            .initialCapacity(SerializationConstants.DESERIALIZATION_POOL_SIZE)
-            .weakKeys()
-            .build();
-
-    NestedSetCache() {
-      this(BugReporter.defaultInstance());
-    }
-
-    private NestedSetCache(BugReporter bugReporter) {
-      this.bugReporter = bugReporter;
-    }
-
-    /**
-     * Returns children (an {@code Object[]} or a {@code ListenableFuture<Object[]>}) for NestedSet
-     * contents associated with the given fingerprint if there was already one. Otherwise associates
-     * {@code future} with {@code fingerprint} and returns null.
-     *
-     * <p>Since the associated future is used as the basis for equality comparisons for deserialized
-     * nested sets, it is critical that multiple calls with the same fingerprint don't override the
-     * association.
-     */
-    @VisibleForTesting
-    @Nullable
-    Object putIfAbsent(ByteString fingerprint, ListenableFuture<Object[]> future) {
-      Object result = fingerprintToContents.get(fingerprint, unused -> future);
-      if (result.equals(future)) {
-        // This is the first request of this fingerprint. We should put it.
-        putAsync(fingerprint, future);
-        return null;
-      }
-      return result;
-    }
-
-    /**
-     * Retrieves the fingerprint associated with the given NestedSet contents, or null if the given
-     * contents are not known.
-     */
-    @VisibleForTesting
-    @Nullable
-    FingerprintComputationResult fingerprintForContents(Object[] contents) {
-      return contentsToFingerprint.getIfPresent(contents);
-    }
-
-    /**
-     * Associates the provided {@code fingerprint} and contents of the future, when it completes.
-     *
-     * <p>There may be a race between this call and calls to {@link #put}. Those races are benign,
-     * since the fingerprint should be the same regardless. We may pessimistically end up having a
-     * future to wait on for serialization that isn't actually necessary, but that isn't a big
-     * concern.
-     */
-    private void putAsync(ByteString fingerprint, ListenableFuture<Object[]> futureContents) {
-      futureContents.addListener(
-          () -> {
-            try {
-              Object[] contents = Futures.getDone(futureContents);
-              // Replace the cache entry with the unwrapped contents, since the Future may be GC'd.
-              fingerprintToContents.put(fingerprint, contents);
-              // There may already be an entry here, but it's better to put a fingerprint result
-              // with an immediate future, since then later readers won't need to block
-              // unnecessarily. It would be nice to check the old value, but Cache#put
-              // doesn't provide it to us.
-              contentsToFingerprint.put(
-                  contents,
-                  FingerprintComputationResult.create(fingerprint, immediateFuture(null)));
-
-            } catch (ExecutionException e) {
-              // Failure to fetch the NestedSet contents is unexpected, but the failed future can
-              // be stored as the NestedSet children. This way the exception is only propagated if
-              // the NestedSet is consumed (unrolled).
-              bugReporter.sendBugReport(e);
-            }
-          },
-          MoreExecutors.directExecutor());
-    }
-
-    // TODO(janakr): Currently, racing threads can overwrite each other's
-    // fingerprintComputationResult, leading to confusion and potential performance drag. Fix this.
-    public void put(FingerprintComputationResult fingerprintComputationResult, Object[] contents) {
-      contentsToFingerprint.put(contents, fingerprintComputationResult);
-      fingerprintToContents.put(fingerprintComputationResult.fingerprint(), contents);
-    }
-  }
-
   /** The result of a fingerprint computation, including the status of its storage. */
   @AutoValue
   abstract static class FingerprintComputationResult {
@@ -251,18 +137,17 @@
 
     abstract ByteString fingerprint();
 
-    @VisibleForTesting
     abstract ListenableFuture<Void> writeStatus();
   }
 
-  private final NestedSetCache nestedSetCache;
-  private final NestedSetStorageEndpoint nestedSetStorageEndpoint;
+  private final NestedSetSerializationCache nestedSetCache;
+  private final NestedSetStorageEndpoint endpoint;
   private final Executor executor;
 
   /** Creates a NestedSetStore with the provided {@link NestedSetStorageEndpoint} as a backend. */
   @VisibleForTesting
-  public NestedSetStore(NestedSetStorageEndpoint nestedSetStorageEndpoint) {
-    this(nestedSetStorageEndpoint, new NestedSetCache(), MoreExecutors.directExecutor());
+  public NestedSetStore(NestedSetStorageEndpoint endpoint) {
+    this(endpoint, directExecutor(), BugReporter.defaultInstance());
   }
 
   /**
@@ -270,20 +155,18 @@
    * deserialization.
    */
   public NestedSetStore(
-      NestedSetStorageEndpoint nestedSetStorageEndpoint,
-      Executor executor,
-      BugReporter bugReporter) {
-    this(nestedSetStorageEndpoint, new NestedSetCache(bugReporter), executor);
+      NestedSetStorageEndpoint endpoint, Executor executor, BugReporter bugReporter) {
+    this(endpoint, new NestedSetSerializationCache(bugReporter), executor);
   }
 
   @VisibleForTesting
   NestedSetStore(
-      NestedSetStorageEndpoint nestedSetStorageEndpoint,
-      NestedSetCache nestedSetCache,
+      NestedSetStorageEndpoint endpoint,
+      NestedSetSerializationCache nestedSetCache,
       Executor executor) {
-    this.nestedSetStorageEndpoint = nestedSetStorageEndpoint;
-    this.nestedSetCache = nestedSetCache;
-    this.executor = executor;
+    this.endpoint = checkNotNull(endpoint);
+    this.nestedSetCache = checkNotNull(nestedSetCache);
+    this.executor = checkNotNull(executor);
   }
 
   /** Creates a NestedSetStore with an in-memory storage backend. */
@@ -345,7 +228,7 @@
     byte[] serializedBytes = byteArrayOutputStream.toByteArray();
     ByteString fingerprint =
         ByteString.copyFrom(Hashing.md5().hashBytes(serializedBytes).asBytes());
-    futureBuilder.add(nestedSetStorageEndpoint.put(fingerprint, serializedBytes));
+    futureBuilder.add(endpoint.put(fingerprint, serializedBytes));
 
     // If this is a NestedSet<NestedSet>, serialization of the contents will itself have writes.
     ListenableFuture<Void> innerWriteFutures =
@@ -355,8 +238,7 @@
     }
 
     ListenableFuture<Void> writeFuture =
-        Futures.whenAllComplete(futureBuilder.build())
-            .call(() -> null, MoreExecutors.directExecutor());
+        Futures.whenAllComplete(futureBuilder.build()).call(() -> null, directExecutor());
     FingerprintComputationResult fingerprintComputationResult =
         FingerprintComputationResult.create(fingerprint, writeFuture);
 
@@ -393,7 +275,7 @@
     if (contents != null) {
       return contents;
     }
-    ListenableFuture<byte[]> retrieved = nestedSetStorageEndpoint.get(fingerprint);
+    ListenableFuture<byte[]> retrieved = endpoint.get(fingerprint);
     Stopwatch fetchStopwatch = Stopwatch.createStarted();
     future.setFuture(
         Futures.transformAsync(
diff --git a/src/test/java/com/google/devtools/build/lib/collect/BUILD b/src/test/java/com/google/devtools/build/lib/collect/BUILD
index ec68e4e..2c3b447 100644
--- a/src/test/java/com/google/devtools/build/lib/collect/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/collect/BUILD
@@ -22,6 +22,7 @@
     ),
     deps = [
         "//src/main/java/com/google/devtools/build/lib/actions:commandline_item",
+        "//src/main/java/com/google/devtools/build/lib/bugreport",
         "//src/main/java/com/google/devtools/build/lib/collect",
         "//src/main/java/com/google/devtools/build/lib/collect/nestedset",
         "//src/main/java/com/google/devtools/build/lib/collect/nestedset:fingerprint_cache",
diff --git a/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetCodecTest.java b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetCodecTest.java
index e5d066a..e019c97 100644
--- a/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetCodecTest.java
+++ b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetCodecTest.java
@@ -15,22 +15,26 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
-import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableClassToInstanceMap;
 import com.google.common.testing.GcFinalization;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.bugreport.BugReporter;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.InMemoryNestedSetStorageEndpoint;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.MissingNestedSetException;
-import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.NestedSetCache;
 import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.NestedSetStorageEndpoint;
 import com.google.devtools.build.lib.skyframe.serialization.AutoRegistry;
 import com.google.devtools.build.lib.skyframe.serialization.DeserializationContext;
@@ -49,7 +53,6 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 
 /** Tests for {@link NestedSet} serialization. */
 @RunWith(JUnit4.class)
@@ -248,22 +251,6 @@
   }
 
   @Test
-  public void cacheEntryHasLifetimeOfContents() {
-    NestedSetCache cache = new NestedSetCache();
-    Object[] contents = new Object[0];
-    ByteString fingerprint = ByteString.copyFrom(new byte[2]);
-    cache.put(
-        NestedSetStore.FingerprintComputationResult.create(fingerprint, immediateVoidFuture()),
-        contents);
-    GcFinalization.awaitFullGc();
-    assertThat(cache.putIfAbsent(fingerprint, immediateFuture(null))).isEqualTo(contents);
-    WeakReference<Object[]> weakRef = new WeakReference<>(contents);
-    contents = null;
-    fingerprint = null;
-    GcFinalization.awaitClear(weakRef);
-  }
-
-  @Test
   public void serializationWeaklyCachesNestedSet() throws Exception {
     // Avoid NestedSetBuilder.wrap/create - they use their own cache which interferes with what
     // we're testing.
@@ -277,12 +264,10 @@
 
   @Test
   public void testDeserializationInParallel() throws Exception {
-    NestedSetStorageEndpoint nestedSetStorageEndpoint =
-        Mockito.spy(new InMemoryNestedSetStorageEndpoint());
-    NestedSetCache emptyNestedSetCache = mock(NestedSetCache.class);
+    NestedSetStorageEndpoint nestedSetStorageEndpoint = spy(new InMemoryNestedSetStorageEndpoint());
+    NestedSetSerializationCache emptyNestedSetCache = mock(NestedSetSerializationCache.class);
     NestedSetStore nestedSetStore =
-        new NestedSetStore(
-            nestedSetStorageEndpoint, emptyNestedSetCache, MoreExecutors.directExecutor());
+        new NestedSetStore(nestedSetStorageEndpoint, emptyNestedSetCache, directExecutor());
 
     ObjectCodecs objectCodecs = createCodecs(nestedSetStore);
 
@@ -306,12 +291,11 @@
             .computeFingerprintAndStore(
                 (Object[]) set.getChildren(), objectCodecs.getSerializationContext())
             .fingerprint();
-    Mockito.verify(nestedSetStorageEndpoint, Mockito.times(3))
-        .put(fingerprintCaptor.capture(), any());
-    Mockito.doReturn(subset1Future)
+    verify(nestedSetStorageEndpoint, times(3)).put(fingerprintCaptor.capture(), any());
+    doReturn(subset1Future)
         .when(nestedSetStorageEndpoint)
         .get(fingerprintCaptor.getAllValues().get(0));
-    Mockito.doReturn(subset2Future)
+    doReturn(subset2Future)
         .when(nestedSetStorageEndpoint)
         .get(fingerprintCaptor.getAllValues().get(1));
     when(emptyNestedSetCache.putIfAbsent(any(), any())).thenAnswer(invocation -> null);
@@ -324,7 +308,7 @@
     // At this point, we expect deserializationFuture to be waiting on both of the underlying
     // fetches, which should have both been started.
     assertThat(deserializationFuture.isDone()).isFalse();
-    Mockito.verify(nestedSetStorageEndpoint, Mockito.times(3)).get(any());
+    verify(nestedSetStorageEndpoint, times(3)).get(any());
 
     // Once the underlying fetches complete, we expect deserialization to complete.
     subset1Future.set(ByteString.copyFrom("mock bytes", Charset.defaultCharset()).toByteArray());
@@ -335,17 +319,17 @@
   @Test
   public void racingDeserialization() throws Exception {
     NestedSetStorageEndpoint nestedSetStorageEndpoint = mock(NestedSetStorageEndpoint.class);
-    NestedSetCache nestedSetCache = Mockito.spy(new NestedSetCache());
+    NestedSetSerializationCache nestedSetCache =
+        spy(new NestedSetSerializationCache(BugReporter.defaultInstance()));
     NestedSetStore nestedSetStore =
-        new NestedSetStore(
-            nestedSetStorageEndpoint, nestedSetCache, MoreExecutors.directExecutor());
+        new NestedSetStore(nestedSetStorageEndpoint, nestedSetCache, directExecutor());
     DeserializationContext deserializationContext = mock(DeserializationContext.class);
     ByteString fingerprint = ByteString.copyFromUtf8("fingerprint");
     // Future never completes, so we don't have to exercise that code in NestedSetStore.
     SettableFuture<byte[]> storageFuture = SettableFuture.create();
     when(nestedSetStorageEndpoint.get(fingerprint)).thenReturn(storageFuture);
     CountDownLatch fingerprintRequested = new CountDownLatch(2);
-    Mockito.doAnswer(
+    doAnswer(
             invocation -> {
               fingerprintRequested.countDown();
               @SuppressWarnings("unchecked")
@@ -355,7 +339,7 @@
               return result;
             })
         .when(nestedSetCache)
-        .putIfAbsent(Mockito.eq(fingerprint), any());
+        .putIfAbsent(eq(fingerprint), any());
     AtomicReference<ListenableFuture<Object[]>> asyncResult = new AtomicReference<>();
     Thread asyncThread =
         new Thread(
@@ -377,7 +361,7 @@
         (ListenableFuture<Object[]>)
             nestedSetStore.getContentsAndDeserialize(fingerprint, deserializationContext);
     asyncThread.join();
-    Mockito.verify(nestedSetStorageEndpoint, times(1)).get(Mockito.eq(fingerprint));
+    verify(nestedSetStorageEndpoint, times(1)).get(eq(fingerprint));
     assertThat(result).isSameInstanceAs(asyncResult.get());
     assertThat(result.isDone()).isFalse();
   }
@@ -385,17 +369,17 @@
   @Test
   public void bugInRacingSerialization() throws Exception {
     NestedSetStorageEndpoint nestedSetStorageEndpoint = mock(NestedSetStorageEndpoint.class);
-    NestedSetCache nestedSetCache = Mockito.spy(new NestedSetCache());
+    NestedSetSerializationCache nestedSetCache =
+        spy(new NestedSetSerializationCache(BugReporter.defaultInstance()));
     NestedSetStore nestedSetStore =
-        new NestedSetStore(
-            nestedSetStorageEndpoint, nestedSetCache, MoreExecutors.directExecutor());
+        new NestedSetStore(nestedSetStorageEndpoint, nestedSetCache, directExecutor());
     SerializationContext serializationContext = mock(SerializationContext.class);
     Object[] contents = {new Object()};
     when(serializationContext.getNewMemoizingContext()).thenReturn(serializationContext);
     when(nestedSetStorageEndpoint.put(any(), any()))
         .thenAnswer(invocation -> SettableFuture.create());
     CountDownLatch fingerprintRequested = new CountDownLatch(2);
-    Mockito.doAnswer(
+    doAnswer(
             invocation -> {
               fingerprintRequested.countDown();
               NestedSetStore.FingerprintComputationResult result =
@@ -423,7 +407,7 @@
         nestedSetStore.computeFingerprintAndStore(contents, serializationContext);
     asyncThread.join();
     // TODO(janakr): This should be one fetch, but we currently do two.
-    Mockito.verify(nestedSetStorageEndpoint, times(2)).put(any(), any());
+    verify(nestedSetStorageEndpoint, times(2)).put(any(), any());
     // TODO(janakr): These should be the same element.
     assertThat(result).isNotEqualTo(asyncResult.get());
   }
diff --git a/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCacheTest.java b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCacheTest.java
new file mode 100644
index 0000000..9b1a6c1
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/collect/nestedset/NestedSetSerializationCacheTest.java
@@ -0,0 +1,174 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.collect.nestedset;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.testing.GcFinalization;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.devtools.build.lib.bugreport.BugReporter;
+import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.FingerprintComputationResult;
+import com.google.devtools.build.lib.collect.nestedset.NestedSetStore.MissingNestedSetException;
+import com.google.protobuf.ByteString;
+import java.lang.ref.WeakReference;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link NestedSetSerializationCache}. */
+@RunWith(JUnit4.class)
+public final class NestedSetSerializationCacheTest {
+
+  private final NestedSetSerializationCache cache =
+      new NestedSetSerializationCache(BugReporter.defaultInstance());
+
+  @Test
+  public void putIfAbsent_newFingerprint_returnsNull() {
+    ByteString fingerprint1 = ByteString.copyFromUtf8("abc");
+    ByteString fingerprint2 = ByteString.copyFromUtf8("xyz");
+    SettableFuture<Object[]> future1 = SettableFuture.create();
+    SettableFuture<Object[]> future2 = SettableFuture.create();
+
+    assertThat(cache.putIfAbsent(fingerprint1, future1)).isNull();
+    assertThat(cache.putIfAbsent(fingerprint2, future2)).isNull();
+  }
+
+  @Test
+  public void putIfAbsent_existingFingerprint_returnsExistingFuture() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future1 = SettableFuture.create();
+
+    assertThat(cache.putIfAbsent(fingerprint, future1)).isNull();
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(future1);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(future1);
+  }
+
+  @Test
+  public void putIfAbsent_rejectsAlreadyDoneFuture() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future = SettableFuture.create();
+    future.set(new Object[0]);
+
+    assertThrows(IllegalArgumentException.class, () -> cache.putIfAbsent(fingerprint, future));
+  }
+
+  @Test
+  public void putIfAbsent_futureCompletes_unwrapsContents() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future1 = SettableFuture.create();
+    SettableFuture<Object[]> future2 = SettableFuture.create();
+    Object[] contents = new Object[0];
+
+    cache.putIfAbsent(fingerprint, future1);
+    future1.set(contents);
+
+    assertThat(cache.putIfAbsent(fingerprint, future2)).isSameInstanceAs(contents);
+  }
+
+  @Test
+  public void putIfAbsent_futureCompletes_cachesFingerprint() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future = SettableFuture.create();
+    Object[] contents = new Object[0];
+
+    cache.putIfAbsent(fingerprint, future);
+    future.set(contents);
+
+    FingerprintComputationResult result = cache.fingerprintForContents(contents);
+    assertThat(result.fingerprint()).isEqualTo(fingerprint);
+    assertThat(result.writeStatus().isDone()).isTrue();
+  }
+
+  @Test
+  public void putIfAbsent_futureFails_notifiesBugReporter() {
+    BugReporter mockBugReporter = mock(BugReporter.class);
+    NestedSetSerializationCache cacheWithCustomBugReporter =
+        new NestedSetSerializationCache(mockBugReporter);
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future = SettableFuture.create();
+    Exception e = new MissingNestedSetException(fingerprint);
+
+    cacheWithCustomBugReporter.putIfAbsent(fingerprint, future);
+    future.setException(e);
+
+    verify(mockBugReporter).sendBugReport(e);
+  }
+
+  @Test
+  public void put_cachesBothDirections() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    Object[] contents = new Object[0];
+    FingerprintComputationResult result =
+        FingerprintComputationResult.create(fingerprint, immediateVoidFuture());
+
+    cache.put(result, contents);
+
+    assertThat(cache.fingerprintForContents(contents)).isEqualTo(result);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(contents);
+  }
+
+  @Test
+  public void putIfAbsent_cacheEntriesHaveLifetimeOfContents() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    SettableFuture<Object[]> future = SettableFuture.create();
+
+    cache.putIfAbsent(fingerprint, future);
+
+    // Before completing, still cached while future in memory.
+    GcFinalization.awaitFullGc();
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(future);
+
+    // After completing, still cached while contents in memory even if future is gone.
+    WeakReference<SettableFuture<Object[]>> futureRef = new WeakReference<>(future);
+    Object[] contents = new Object[0];
+    future.set(contents);
+    future = null;
+    GcFinalization.awaitClear(futureRef);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(contents);
+    FingerprintComputationResult result = cache.fingerprintForContents(contents);
+    assertThat(result.fingerprint()).isEqualTo(fingerprint);
+    assertThat(result.writeStatus().isDone()).isTrue();
+
+    // Cleared after references are gone, and the cycle of putIfAbsent starts over.
+    WeakReference<Object[]> contentsRef = new WeakReference<>(contents);
+    contents = null;
+    GcFinalization.awaitClear(contentsRef);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isNull();
+  }
+
+  @Test
+  public void put_cacheEntriesHaveLifetimeOfContents() {
+    ByteString fingerprint = ByteString.copyFromUtf8("abc");
+    Object[] contents = new Object[0];
+    FingerprintComputationResult result =
+        FingerprintComputationResult.create(fingerprint, immediateVoidFuture());
+
+    cache.put(result, contents);
+
+    // Still cached while in memory.
+    GcFinalization.awaitFullGc();
+    assertThat(cache.fingerprintForContents(contents)).isEqualTo(result);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isSameInstanceAs(contents);
+
+    // Cleared after references are gone, and the cycle of putIfAbsent starts over.
+    WeakReference<Object[]> ref = new WeakReference<>(contents);
+    contents = null;
+    GcFinalization.awaitClear(ref);
+    assertThat(cache.putIfAbsent(fingerprint, SettableFuture.create())).isNull();
+  }
+}