remote: add a ByteStreamBuildEventArtifactUploader

This change allows local files referenced by the BEP/BES protocol
to be uploaded to a ByteStream gRPC service.

The ByteStreamUploader is now implicitly also used by the BES
module which has a different lifecycle than the remote module.
We introduce reference counting to ensure that the channel is
closed after its no longer needed. This also fixes a bug where
we currently leak one socket per remote build until the Bazel
server is shut down.

RELNOTES: None
PiperOrigin-RevId: 204275316
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 7b2dc7a..06ad391 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -34,8 +34,8 @@
 import com.google.devtools.remoteexecution.v1test.RequestMetadata;
 import com.google.protobuf.ByteString;
 import io.grpc.BindableService;
-import io.grpc.Channel;
 import io.grpc.Context;
+import io.grpc.ManagedChannel;
 import io.grpc.Metadata;
 import io.grpc.Server;
 import io.grpc.ServerCall;
@@ -90,7 +90,7 @@
   private static ListeningScheduledExecutorService retryService;
 
   private Server server;
-  private Channel channel;
+  private ManagedChannel channel;
   private Context withEmptyMetadata;
   private Context prevContext;
 
@@ -137,7 +137,8 @@
     Context prevContext = withEmptyMetadata.attach();
     RemoteRetrier retrier =
         new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
     new Random().nextBytes(blob);
@@ -193,7 +194,7 @@
           }
         });
 
-    uploader.uploadBlob(chunker);
+    uploader.uploadBlob(chunker, true);
 
     // This test should not have triggered any retries.
     Mockito.verifyZeroInteractions(mockBackoff);
@@ -209,7 +210,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(
             () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     int numUploads = 10;
     Map<String, byte[]> blobsByHash = new HashMap<>();
@@ -224,70 +226,9 @@
       blobsByHash.put(chunker.digest().getHash(), blob);
     }
 
-    Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
 
-    serviceRegistry.addService(new ByteStreamImplBase() {
-      @Override
-      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
-        return new StreamObserver<WriteRequest>() {
-
-          private String digestHash;
-          private byte[] receivedData;
-          private long nextOffset;
-
-          @Override
-          public void onNext(WriteRequest writeRequest) {
-            if (nextOffset == 0) {
-              String resourceName = writeRequest.getResourceName();
-              assertThat(resourceName).isNotEmpty();
-
-              String[] components = resourceName.split("/");
-              assertThat(components).hasLength(6);
-              digestHash = components[4];
-              assertThat(blobsByHash).containsKey(digestHash);
-              receivedData = new byte[Integer.parseInt(components[5])];
-            }
-            assertThat(digestHash).isNotNull();
-            // An upload for a given blob has a 10% chance to fail once during its lifetime.
-            // This is to exercise the retry mechanism a bit.
-            boolean shouldFail =
-                rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
-            if (shouldFail) {
-              uploadsFailedOnce.add(digestHash);
-              response.onError(Status.INTERNAL.asException());
-              return;
-            }
-
-            ByteString data = writeRequest.getData();
-            System.arraycopy(
-                data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
-            nextOffset += data.size();
-
-            boolean lastWrite = nextOffset == receivedData.length;
-            assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
-          }
-
-          @Override
-          public void onError(Throwable throwable) {
-            fail("onError should never be called.");
-          }
-
-          @Override
-          public void onCompleted() {
-            byte[] expectedBlob = blobsByHash.get(digestHash);
-            assertThat(receivedData).isEqualTo(expectedBlob);
-
-            WriteResponse writeResponse =
-                WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
-
-            response.onNext(writeResponse);
-            response.onCompleted();
-          }
-        };
-      }
-    });
-
-    uploader.uploadBlobs(builders);
+    uploader.uploadBlobs(builders, true);
 
     blockUntilInternalStateConsistent(uploader);
 
@@ -302,7 +243,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(
             () -> new FixedBackoff(5, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     List<String> toUpload = ImmutableList.of("aaaaaaaaaa", "bbbbbbbbbb", "cccccccccc");
     List<Chunker> builders = new ArrayList<>(toUpload.size());
@@ -372,7 +314,7 @@
               "build-req-id", "command-id", DIGEST_UTIL.asActionKey(chunker.digest()));
       ctx.call(
           () -> {
-            uploads.add(uploader.uploadBlobAsync(chunker));
+            uploads.add(uploader.uploadBlobAsync(chunker, true));
             return null;
           });
     }
@@ -393,7 +335,8 @@
     Context prevContext = withEmptyMetadata.attach();
     RemoteRetrier retrier =
         new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     byte[] blob = new byte[CHUNK_SIZE * 10];
     Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -435,8 +378,8 @@
       }
     });
 
-    Future<?> upload1 = uploader.uploadBlobAsync(chunker);
-    Future<?> upload2 = uploader.uploadBlobAsync(chunker);
+    Future<?> upload1 = uploader.uploadBlobAsync(chunker, true);
+    Future<?> upload2 = uploader.uploadBlobAsync(chunker, true);
 
     blocker.countDown();
 
@@ -455,7 +398,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(
             () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     byte[] blob = new byte[CHUNK_SIZE];
     Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
@@ -469,7 +413,7 @@
     });
 
     try {
-      uploader.uploadBlob(chunker);
+      uploader.uploadBlob(chunker, true);
       fail("Should have thrown an exception.");
     } catch (RetryException e) {
       assertThat(e.getAttempts()).isEqualTo(2);
@@ -485,7 +429,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(
             () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     CountDownLatch cancellations = new CountDownLatch(2);
 
@@ -520,8 +465,8 @@
     byte[] blob2 = new byte[CHUNK_SIZE + 1];
     Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE, DIGEST_UTIL);
 
-    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1);
-    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2);
+    ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1, true);
+    ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2, true);
 
     assertThat(uploader.uploadsInProgress()).isTrue();
 
@@ -545,7 +490,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(
             () -> new FixedBackoff(1, 10), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
-    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
 
     serviceRegistry.addService(new ByteStreamImplBase() {
       @Override
@@ -564,7 +510,7 @@
     byte[] blob = new byte[1];
     Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
     try {
-      uploader.uploadBlob(chunker);
+      uploader.uploadBlob(chunker, true);
       fail("Should have thrown an exception.");
     } catch (RetryException e) {
       assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -579,7 +525,8 @@
     RemoteRetrier retrier =
         new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
     ByteStreamUploader uploader =
-        new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
+        new ByteStreamUploader(/* instanceName */ null,
+            new ReferenceCountedChannel(channel), null, 3, retrier);
 
     serviceRegistry.addService(new ByteStreamImplBase() {
       @Override
@@ -608,7 +555,7 @@
     byte[] blob = new byte[1];
     Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
 
-    uploader.uploadBlob(chunker);
+    uploader.uploadBlob(chunker, true);
 
     withEmptyMetadata.detach(prevContext);
   }
@@ -623,7 +570,8 @@
             retryService,
             Retrier.ALLOW_ALL_CALLS);
     ByteStreamUploader uploader =
-        new ByteStreamUploader(/* instanceName */ null, channel, null, 3, retrier);
+        new ByteStreamUploader(/* instanceName */ null,
+            new ReferenceCountedChannel(channel), null, 3, retrier);
 
     AtomicInteger numCalls = new AtomicInteger();
 
@@ -640,7 +588,7 @@
     Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
 
     try {
-      uploader.uploadBlob(chunker);
+      uploader.uploadBlob(chunker, true);
       fail("Should have thrown an exception.");
     } catch (RetryException e) {
       assertThat(numCalls.get()).isEqualTo(1);
@@ -649,6 +597,67 @@
     withEmptyMetadata.detach(prevContext);
   }
 
+  @Test
+  public void deduplicationOfUploadsShouldWork() throws Exception {
+    Context prevContext = withEmptyMetadata.attach();
+    RemoteRetrier retrier =
+        new RemoteRetrier(() -> mockBackoff, (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+    ByteStreamUploader uploader = new ByteStreamUploader(INSTANCE_NAME,
+        new ReferenceCountedChannel(channel), null, 3, retrier);
+
+    byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
+    new Random().nextBytes(blob);
+
+    Chunker chunker = new Chunker(blob, CHUNK_SIZE, DIGEST_UTIL);
+
+    AtomicInteger numUploads = new AtomicInteger();
+    serviceRegistry.addService(new ByteStreamImplBase() {
+      @Override
+      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
+        numUploads.incrementAndGet();
+        return new StreamObserver<WriteRequest>() {
+
+          long nextOffset = 0;
+
+          @Override
+          public void onNext(WriteRequest writeRequest) {
+            nextOffset += writeRequest.getData().size();
+            boolean lastWrite = blob.length == nextOffset;
+            assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+          }
+
+          @Override
+          public void onError(Throwable throwable) {
+            fail("onError should never be called.");
+          }
+
+          @Override
+          public void onCompleted() {
+            assertThat(nextOffset).isEqualTo(blob.length);
+
+            WriteResponse response =
+                WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
+            streamObserver.onNext(response);
+            streamObserver.onCompleted();
+          }
+        };
+      }
+    });
+
+    uploader.uploadBlob(chunker, true);
+    // This should not trigger an upload.
+    uploader.uploadBlob(chunker, false);
+
+    assertThat(numUploads.get()).isEqualTo(1);
+
+    // This test should not have triggered any retries.
+    Mockito.verifyZeroInteractions(mockBackoff);
+
+    blockUntilInternalStateConsistent(uploader);
+
+    withEmptyMetadata.detach(prevContext);
+  }
+
   private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
     @Override
     public void onNext(WriteRequest writeRequest) {
@@ -663,7 +672,7 @@
     }
   }
 
-  private static class FixedBackoff implements Retrier.Backoff {
+  static class FixedBackoff implements Retrier.Backoff {
 
     private final int maxRetries;
     private final int delayMillis;
@@ -690,6 +699,80 @@
     }
   }
 
+  /**
+   * An byte stream service where an upload for a given blob may or may not fail on the first
+   * attempt but is guaranteed to succeed on the second try.
+   */
+  static class MaybeFailOnceUploadService extends ByteStreamImplBase {
+
+    private final Map<String, byte[]> blobsByHash;
+    private final Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
+    private final Random rand = new Random();
+
+    MaybeFailOnceUploadService(Map<String, byte[]> blobsByHash) {
+      this.blobsByHash = blobsByHash;
+    }
+
+    @Override
+    public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+      return new StreamObserver<WriteRequest>() {
+
+        private String digestHash;
+        private byte[] receivedData;
+        private long nextOffset;
+
+        @Override
+        public void onNext(WriteRequest writeRequest) {
+          if (nextOffset == 0) {
+            String resourceName = writeRequest.getResourceName();
+            assertThat(resourceName).isNotEmpty();
+
+            String[] components = resourceName.split("/");
+            assertThat(components).hasLength(6);
+            digestHash = components[4];
+            assertThat(blobsByHash).containsKey(digestHash);
+            receivedData = new byte[Integer.parseInt(components[5])];
+          }
+          assertThat(digestHash).isNotNull();
+          // An upload for a given blob has a 10% chance to fail once during its lifetime.
+          // This is to exercise the retry mechanism a bit.
+          boolean shouldFail =
+              rand.nextInt(10) == 0 && !uploadsFailedOnce.contains(digestHash);
+          if (shouldFail) {
+            uploadsFailedOnce.add(digestHash);
+            response.onError(Status.INTERNAL.asException());
+            return;
+          }
+
+          ByteString data = writeRequest.getData();
+          System.arraycopy(
+              data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());
+          nextOffset += data.size();
+
+          boolean lastWrite = nextOffset == receivedData.length;
+          assertThat(writeRequest.getFinishWrite()).isEqualTo(lastWrite);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          fail("onError should never be called.");
+        }
+
+        @Override
+        public void onCompleted() {
+          byte[] expectedBlob = blobsByHash.get(digestHash);
+          assertThat(receivedData).isEqualTo(expectedBlob);
+
+          WriteResponse writeResponse =
+              WriteResponse.newBuilder().setCommittedSize(receivedData.length).build();
+
+          response.onNext(writeResponse);
+          response.onCompleted();
+        }
+      };
+    }
+  }
+
   private void blockUntilInternalStateConsistent(ByteStreamUploader uploader) throws Exception {
     // Poll until all upload futures have been removed from the internal hash map. The polling is
     // necessary, as listeners are executed after Future.get() calls are notified about completion.