Fix flakiness of ByteStreamUploaderTest#failedUploadsShouldNotDeduplicate

The ByteStreamUploader has logic to internally deduplicate uploads. The test first triggers an upload that fails followed by an identical upload that succeeds. It asserts that the failed upload is not cached so that the subsequent upload is actually performed.

For a given file the logic for state transition of "being uploaded" to "uploaded/failed" is done in a listener of the upload future. This listener is executed *after* the future completes (i.e. after get()/isDone() returns true) and thus there would be a race between the second upload starting and the above mentioned state transition being executed. If the race was lost it would look to the second upload as if the first hadn't completed yet.

The fix is to introduce a future that only completes after the internal state tracking has been done.

RELNOTES: None
PiperOrigin-RevId: 245006516
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 24de889..97128b1 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -51,6 +51,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.logging.Level;
@@ -241,15 +242,28 @@
                 return null;
               },
               MoreExecutors.directExecutor());
-      uploadsInProgress.put(digest, uploadResult);
+      // A future that only completes once the upload and internal state updates have
+      // been completed.
+      SettableFuture<Void> uploadAndBookkeepingComplete = SettableFuture.create();
+      uploadsInProgress.put(digest, uploadAndBookkeepingComplete);
       uploadResult.addListener(
           () -> {
             synchronized (lock) {
               uploadsInProgress.remove(digest);
+              try {
+                uploadResult.get();
+                uploadAndBookkeepingComplete.set(null);
+              } catch (ExecutionException e) {
+                uploadAndBookkeepingComplete.setException(e.getCause());
+              } catch (CancellationException e) {
+                uploadAndBookkeepingComplete.cancel(true);
+              } catch (Throwable e) {
+                uploadAndBookkeepingComplete.setException(e);
+              }
             }
           },
           MoreExecutors.directExecutor());
-      return uploadResult;
+      return uploadAndBookkeepingComplete;
     }
   }