Progressively retrying uploads to bytestream

Attempt to continue uploads through retriable exceptions, using the ProgressiveBackoff to reset as progress is made. This adds a write status query after each retriable exception to determine whether progress has been made. A service that does not support write
resumption, either with persistent write resets or failure status responses, the behavior is identical to the non-progressive implementation.

The ProgressiveBackoff was also added to the read side for executeAsync invocations to ensure that it was used instead of the retrier supplier, and the injected backoff executeAsync Retrier method was exposed.

Closes #8179, #7846

PiperOrigin-RevId: 246311713
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
index c8c2a81..110f3cb 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -20,6 +20,7 @@
 import build.bazel.remote.execution.v2.Digest;
 import com.google.bytestream.ByteStreamProto.WriteRequest;
 import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.hash.HashCode;
 import com.google.common.io.BaseEncoding;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -124,7 +125,7 @@
   @Test
   public void uploadsShouldWork() throws Exception {
     int numUploads = 2;
-    Map<String, byte[]> blobsByHash = new HashMap<>();
+    Map<HashCode, byte[]> blobsByHash = new HashMap<>();
     Map<Path, LocalFile> filesToUpload = new HashMap<>();
     Random rand = new Random();
     for (int i = 0; i < numUploads; i++) {
@@ -135,7 +136,7 @@
       rand.nextBytes(blob);
       out.write(blob);
       out.close();
-      blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+      blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
       filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
     }
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
@@ -185,7 +186,7 @@
     // error is propagated correctly.
 
     int numUploads = 10;
-    Map<String, byte[]> blobsByHash = new HashMap<>();
+    Map<HashCode, byte[]> blobsByHash = new HashMap<>();
     Map<Path, LocalFile> filesToUpload = new HashMap<>();
     Random rand = new Random();
     for (int i = 0; i < numUploads; i++) {
@@ -197,10 +198,10 @@
       out.write(blob);
       out.flush();
       out.close();
-      blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+      blobsByHash.put(HashCode.fromString(DIGEST_UTIL.compute(file).getHash()), blob);
       filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
     }
-    String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next();
+    String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next().toString();
     serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) {
       @Override
       public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {