Fix #3416: catch the ALREADY_EXISTS status code on upload, and treat it as success.

This can happen per spec, if multiple builds try to upload the same blob concurrently.
Also, added this to the RemoteWorker, per spec.

PiperOrigin-RevId: 162647548
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 4ee108e..aa8f9b5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -39,6 +39,7 @@
 import io.grpc.ClientCall;
 import io.grpc.Metadata;
 import io.grpc.Status;
+import io.grpc.Status.Code;
 import io.grpc.StatusException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -352,10 +353,10 @@
 
             @Override
             public void onClose(Status status, Metadata trailers) {
-              if (!status.isOk()) {
-                listener.failure(status);
-              } else {
+              if (status.isOk() || Code.ALREADY_EXISTS.equals(status.getCode())) {
                 listener.success();
+              } else {
+                listener.failure(status);
               }
             }
 
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 1c97f21..c637a8d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -145,15 +145,15 @@
     ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
 
     // Only upload data that was missing from the cache.
-    ArrayList<ActionInput> actionInputs = new ArrayList<>();
-    ArrayList<Directory> treeNodes = new ArrayList<>();
-    repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
+    ArrayList<ActionInput> missingActionInputs = new ArrayList<>();
+    ArrayList<Directory> missingTreeNodes = new ArrayList<>();
+    repository.getDataFromDigests(missingDigests, missingActionInputs, missingTreeNodes);
 
-    if (!treeNodes.isEmpty()) {
+    if (!missingTreeNodes.isEmpty()) {
       // TODO(olaola): split this into multiple requests if total size is > 10MB.
       BatchUpdateBlobsRequest.Builder treeBlobRequest =
           BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
-      for (Directory d : treeNodes) {
+      for (Directory d : missingTreeNodes) {
         byte[] data = d.toByteArray();
         treeBlobRequest
             .addRequestsBuilder()
@@ -173,17 +173,12 @@
           });
     }
     uploadBlob(command.toByteArray());
-    if (!actionInputs.isEmpty()) {
+    if (!missingActionInputs.isEmpty()) {
       List<Chunker> inputsToUpload = new ArrayList<>();
       ActionInputFileCache inputFileCache = repository.getInputFileCache();
-      for (ActionInput actionInput : actionInputs) {
-        Digest digest = Digests.getDigestFromInputCache(actionInput, inputFileCache);
-        if (missingDigests.contains(digest)) {
-          Chunker chunker = new Chunker(actionInput, inputFileCache, execRoot);
-          inputsToUpload.add(chunker);
-        }
+      for (ActionInput actionInput : missingActionInputs) {
+        inputsToUpload.add(new Chunker(actionInput, inputFileCache, execRoot));
       }
-
       uploader.uploadBlobs(inputsToUpload);
     }
   }
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 81b63fa..c43a09f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -59,6 +59,7 @@
 import io.grpc.stub.StreamObserver;
 import io.grpc.util.MutableHandlerRegistry;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -284,6 +285,49 @@
     assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
   }
 
+  @Test
+  public void testUploadBlobRemoteAlreadyExists() throws Exception {
+    final GrpcRemoteCache client = newClient();
+    final Digest digest = Digests.computeDigestUtf8("abcdefg");
+    serviceRegistry.addService(
+        new ContentAddressableStorageImplBase() {
+          @Override
+          public void findMissingBlobs(
+              FindMissingBlobsRequest request,
+              StreamObserver<FindMissingBlobsResponse> responseObserver) {
+            responseObserver.onNext(
+                FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+            responseObserver.onCompleted();
+          }
+        });
+    final AtomicBoolean sentError = new AtomicBoolean(false);
+    serviceRegistry.addService(
+        new ByteStreamImplBase() {
+          @Override
+          public StreamObserver<WriteRequest> write(
+              final StreamObserver<WriteResponse> responseObserver) {
+            return new StreamObserver<WriteRequest>() {
+              @Override
+              public void onNext(WriteRequest request) {
+                responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
+                sentError.set(true);
+              }
+
+              @Override
+              public void onCompleted() {
+              }
+
+              @Override
+              public void onError(Throwable t) {
+                fail("An error occurred: " + t);
+              }
+            };
+          }
+        });
+    assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
+    assertThat(sentError.get()).isTrue();
+  }
+
   static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> {
     private final StreamObserver<WriteResponse> responseObserver;
     private final String contents;
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index acc6685..50f8857 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -132,6 +132,25 @@
           return;
         }
 
+        if (offset == 0) {
+          try {
+            if (cache.containsKey(digest)) {
+              responseObserver.onError(StatusUtils.alreadyExistsError(digest));
+              closed = true;
+              return;
+            }
+          } catch (InterruptedException e) {
+            responseObserver.onError(StatusUtils.interruptedError(digest));
+            Thread.currentThread().interrupt();
+            closed = true;
+            return;
+          } catch (IOException e) {
+            responseObserver.onError(StatusUtils.internalError(e));
+            closed = true;
+            return;
+          }
+        }
+
         if (request.getWriteOffset() != offset) {
           responseObserver.onError(
               StatusUtils.invalidArgumentError(
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
index 494ee58..be606d2 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/StatusUtils.java
@@ -51,6 +51,17 @@
         .build();
   }
 
+  static StatusException alreadyExistsError(Digest digest) {
+    return StatusProto.toStatusException(alreadyExistsStatus(digest));
+  }
+
+  static com.google.rpc.Status alreadyExistsStatus(Digest digest) {
+    return Status.newBuilder()
+        .setCode(Code.ALREADY_EXISTS.getNumber())
+        .setMessage("Digest already uploaded:" + digest)
+        .build();
+  }
+
   static StatusException interruptedError(Digest digest) {
     return StatusProto.toStatusException(interruptedStatus(digest));
   }