remote: add a ByteStreamBuildEventArtifactUploader

This change allows local files referenced by the BEP/BES protocol
to be uploaded to a ByteStream gRPC service.

The ByteStreamUploader is now implicitly also used by the BES
module which has a different lifecycle than the remote module.
We introduce reference counting to ensure that the channel is
closed after its no longer needed. This also fixes a bug where
we currently leak one socket per remote build until the Bazel
server is shut down.

RELNOTES: None
PiperOrigin-RevId: 204275316
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
new file mode 100644
index 0000000..75b46c4
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java
@@ -0,0 +1,236 @@
+// Copyright 2018 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
+import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType;
+import com.google.devtools.build.lib.buildeventstream.PathConverter;
+import com.google.devtools.build.lib.clock.JavaClock;
+import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.FixedBackoff;
+import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService;
+import com.google.devtools.build.lib.remote.Retrier.RetryException;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
+import com.google.devtools.build.lib.vfs.DigestHashFunction;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import io.grpc.Context;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+
+/** Test for {@link ByteStreamBuildEventArtifactUploader}. */
+@RunWith(JUnit4.class)
+public class ByteStreamBuildEventArtifactUploaderTest {
+
+  private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256);
+
+  private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+  private static ListeningScheduledExecutorService retryService;
+
+  private Server server;
+  private ManagedChannel channel;
+  private Context withEmptyMetadata;
+  private Context prevContext;
+  private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256);
+
+  @BeforeClass
+  public static void beforeEverything() {
+    retryService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1));
+  }
+
+  @Before
+  public final void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+
+    String serverName = "Server for " + this.getClass();
+    server =
+        InProcessServerBuilder.forName(serverName)
+            .fallbackHandlerRegistry(serviceRegistry)
+            .build()
+            .start();
+    channel = InProcessChannelBuilder.forName(serverName).build();
+    withEmptyMetadata =
+        TracingMetadataUtils.contextWithMetadata(
+            "none", "none", DIGEST_UTIL.asActionKey(Digest.getDefaultInstance()));
+    // Needs to be repeated in every test that uses the timeout setting, since the tests run
+    // on different threads than the setUp.
+    prevContext = withEmptyMetadata.attach();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // Needs to be repeated in every test that uses the timeout setting, since the tests run
+    // on different threads than the tearDown.
+    withEmptyMetadata.detach(prevContext);
+
+    server.shutdownNow();
+    server.awaitTermination();
+  }
+
+  @AfterClass
+  public static void afterEverything() {
+    retryService.shutdownNow();
+  }
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void uploadsShouldWork() throws Exception {
+    int numUploads = 2;
+    Map<String, byte[]> blobsByHash = new HashMap<>();
+    Map<Path, LocalFile> filesToUpload = new HashMap<>();
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      Path file = fs.getPath("/file" + i);
+      OutputStream out = file.getOutputStream();
+      int blobSize = rand.nextInt(100) + 1;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      out.write(blob);
+      out.close();
+      blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+      filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+    }
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
+
+    RemoteRetrier retrier =
+        new RemoteRetrier(
+            () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+    ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
+    ByteStreamBuildEventArtifactUploader artifactUploader =
+        new ByteStreamBuildEventArtifactUploader(
+            uploader, "localhost", withEmptyMetadata, "instance");
+
+    PathConverter pathConverter = artifactUploader.upload(filesToUpload).get();
+    for (Path file : filesToUpload.keySet()) {
+      String hash = BaseEncoding.base16().lowerCase().encode(file.getDigest());
+      long size = file.getFileSize();
+      String conversion = pathConverter.apply(file);
+      assertThat(conversion)
+          .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size);
+    }
+
+    artifactUploader.shutdown();
+
+    assertThat(uploader.refCnt()).isEqualTo(0);
+    assertThat(refCntChannel.isShutdown()).isTrue();
+  }
+
+  @Test
+  public void someUploadsFail() throws Exception {
+    // Test that if one of multiple file uploads fails, the upload future fails and that the
+    // error is propagated correctly.
+
+    int numUploads = 10;
+    Map<String, byte[]> blobsByHash = new HashMap<>();
+    Map<Path, LocalFile> filesToUpload = new HashMap<>();
+    Random rand = new Random();
+    for (int i = 0; i < numUploads; i++) {
+      Path file = fs.getPath("/file" + i);
+      OutputStream out = file.getOutputStream();
+      int blobSize = rand.nextInt(100) + 1;
+      byte[] blob = new byte[blobSize];
+      rand.nextBytes(blob);
+      out.write(blob);
+      out.flush();
+      out.close();
+      blobsByHash.put(DIGEST_UTIL.compute(file).getHash(), blob);
+      filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT));
+    }
+    String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next();
+    serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) {
+      @Override
+      public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response) {
+        StreamObserver<WriteRequest> delegate = super.write(response);
+        return new StreamObserver<WriteRequest>() {
+          @Override
+          public void onNext(WriteRequest value) {
+            if (value.getResourceName().contains(hashOfBlobThatShouldFail)) {
+              response.onError(Status.CANCELLED.asException());
+            } else {
+              delegate.onNext(value);
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            delegate.onError(t);
+          }
+
+          @Override
+          public void onCompleted() {
+            delegate.onCompleted();
+          }
+        };
+      }
+    });
+
+    RemoteRetrier retrier =
+        new RemoteRetrier(
+            () -> new FixedBackoff(1, 0), (e) -> true, retryService, Retrier.ALLOW_ALL_CALLS);
+    ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel);
+    ByteStreamUploader uploader =
+        new ByteStreamUploader("instance", refCntChannel, null, 3, retrier);
+    ByteStreamBuildEventArtifactUploader artifactUploader =
+        new ByteStreamBuildEventArtifactUploader(
+            uploader, "localhost", withEmptyMetadata, "instance");
+
+    try {
+      artifactUploader.upload(filesToUpload).get();
+      fail("exception expected.");
+    } catch (ExecutionException e) {
+      assertThat(e.getCause()).isInstanceOf(RetryException.class);
+      assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode());
+    }
+
+    artifactUploader.shutdown();
+
+    assertThat(uploader.refCnt()).isEqualTo(0);
+    assertThat(refCntChannel.isShutdown()).isTrue();
+  }
+}