Rewrite blob upload to use temporary files

Previously, it was allocating in-memory buffers for upload, which caused it to
run out of memory on large file uploads (or on many small uploads running
simultaneously).

Unfortunately, we don't create the temporary file in the right location (due
to separation of the BlobStore from the ByteStreamServer), so we copy the file
again to write it to the OnDiskBlobStore, which isn't ideal. There'll need to
be another BlobStore API change to make that work, when the InMemoryBlobStore
is gone.

PiperOrigin-RevId: 160974550
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index 8c0fcdc..daa4ab0 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -241,7 +241,7 @@
     }
   }
 
-  private Digest uploadBlob(Digest digest, InputStream in)
+  public Digest uploadBlob(Digest digest, InputStream in)
       throws IOException, InterruptedException {
     blobStore.put(digest.getHash(), in);
     return digest;
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index 87ca590..cf3ccd0 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -14,6 +14,7 @@
 
 package com.google.devtools.build.remote;
 
+import static java.util.logging.Level.SEVERE;
 import static java.util.logging.Level.WARNING;
 
 import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
@@ -25,12 +26,18 @@
 import com.google.devtools.build.lib.remote.Chunker;
 import com.google.devtools.build.lib.remote.Digests;
 import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
+import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.remoteexecution.v1test.Digest;
 import com.google.protobuf.ByteString;
 import com.google.rpc.Code;
 import com.google.rpc.Status;
 import io.grpc.protobuf.StatusProto;
 import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.UUID;
 import java.util.logging.Logger;
 import javax.annotation.Nullable;
 
@@ -38,6 +45,7 @@
 final class ByteStreamServer extends ByteStreamImplBase {
   private static final Logger logger = Logger.getLogger(ByteStreamServer.class.getName());
   private final SimpleBlobStoreActionCache cache;
+  private final Path workPath;
 
   static @Nullable Digest parseDigestFromResourceName(String resourceName) {
     try {
@@ -53,8 +61,9 @@
     }
   }
 
-  public ByteStreamServer(SimpleBlobStoreActionCache cache) {
+  public ByteStreamServer(SimpleBlobStoreActionCache cache, Path workPath) {
     this.cache = cache;
+    this.workPath = workPath;
   }
 
   @Override
@@ -87,12 +96,22 @@
 
   @Override
   public StreamObserver<WriteRequest> write(final StreamObserver<WriteResponse> responseObserver) {
+    Path temp = workPath.getRelative("upload").getRelative(UUID.randomUUID().toString());
+    try {
+      FileSystemUtils.createDirectoryAndParents(temp.getParentDirectory());
+      temp.getOutputStream().close();
+    } catch (IOException e) {
+      logger.log(SEVERE, "Failed to create temporary file for upload", e);
+      responseObserver.onError(StatusUtils.internalError(e));
+      // We need to make sure that subsequent onNext or onCompleted calls don't make any further
+      // calls on the responseObserver after the onError above, so we return a no-op observer.
+      return new NoOpStreamObserver<>();
+    }
     return new StreamObserver<WriteRequest>() {
-      byte[] blob;
-      Digest digest;
-      long offset;
-      String resourceName;
-      boolean closed;
+      private Digest digest;
+      private long offset;
+      private String resourceName;
+      private boolean closed;
 
       @Override
       public void onNext(WriteRequest request) {
@@ -103,7 +122,6 @@
         if (digest == null) {
           resourceName = request.getResourceName();
           digest = parseDigestFromResourceName(resourceName);
-          blob = new byte[(int) digest.getSizeBytes()];
         }
 
         if (digest == null) {
@@ -137,7 +155,13 @@
         long size = request.getData().size();
 
         if (size > 0) {
-          request.getData().copyTo(blob, (int) offset);
+          try (OutputStream out = temp.getOutputStream(true)) {
+            request.getData().writeTo(out);
+          } catch (IOException e) {
+            responseObserver.onError(StatusUtils.internalError(e));
+            closed = true;
+            return;
+          }
           offset += size;
         }
 
@@ -149,6 +173,7 @@
                   "finish_write",
                   "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
           closed = true;
+          return;
         }
       }
 
@@ -156,6 +181,11 @@
       public void onError(Throwable t) {
         logger.log(WARNING, "Write request failed remotely.", t);
         closed = true;
+        try {
+          temp.delete();
+        } catch (IOException e) {
+          logger.log(WARNING, "Could not delete temp file.", e);
+        }
       }
 
       @Override
@@ -176,7 +206,15 @@
         }
 
         try {
-          Digest d = cache.uploadBlob(blob);
+          Digest d = Digests.computeDigest(temp);
+          try (InputStream in = temp.getInputStream()) {
+            cache.uploadBlob(d, in);
+          }
+          try {
+            temp.delete();
+          } catch (IOException e) {
+            logger.log(WARNING, "Could not delete temp file.", e);
+          }
 
           if (!d.equals(digest)) {
             responseObserver.onError(
@@ -197,4 +235,18 @@
       }
     };
   }
+
+  private static class NoOpStreamObserver<T> implements StreamObserver<T> {
+    @Override
+    public void onNext(T value) {
+    }
+
+    @Override
+    public void onError(Throwable t) {
+    }
+
+    @Override
+    public void onCompleted() {
+    }
+  }
 }
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 49e63d7..f245ae6 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -77,16 +77,32 @@
   }
 
   public RemoteWorker(
-      RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache, Path sandboxPath)
+      FileSystem fs, RemoteWorkerOptions workerOptions, SimpleBlobStoreActionCache cache,
+      Path sandboxPath)
       throws IOException {
     this.workerOptions = workerOptions;
     this.actionCacheServer = new ActionCacheServer(cache);
-    this.bsServer = new ByteStreamServer(cache);
+    Path workPath;
+    if (workerOptions.workPath != null) {
+      workPath = fs.getPath(workerOptions.workPath);
+    } else {
+      // TODO(ulfjack): The plan is to make the on-disk storage the default, so we always need to
+      // provide a path to the remote worker, and we can then also use that as the work path. E.g.:
+      // /given/path/cas/
+      // /given/path/upload/
+      // /given/path/work/
+      // We could technically use a different path for temporary files and execution, but we want
+      // the cas/ directory to be on the same file system as the upload/ and work/ directories so
+      // that we can atomically move files between them, and / or use hard-links for the exec
+      // directories.
+      // For now, we use a temporary path if no work path was provided.
+      workPath = fs.getPath("/tmp/remote-worker");
+    }
+    this.bsServer = new ByteStreamServer(cache, workPath);
     this.casServer = new CasServer(cache);
 
     if (workerOptions.workPath != null) {
       ConcurrentHashMap<String, ExecuteRequest> operationsCache = new ConcurrentHashMap<>();
-      Path workPath = getFileSystem().getPath(workerOptions.workPath);
       FileSystemUtils.createDirectoryAndParents(workPath);
       watchServer = new WatcherServer(workPath, cache, workerOptions, operationsCache, sandboxPath);
       execServer = new ExecutionServer(operationsCache);
@@ -157,9 +173,10 @@
       rootLog.getHandlers()[0].setLevel(FINE);
     }
 
+    FileSystem fs = getFileSystem();
     Path sandboxPath = null;
     if (remoteWorkerOptions.sandboxing) {
-      sandboxPath = prepareSandboxRunner(remoteWorkerOptions);
+      sandboxPath = prepareSandboxRunner(fs, remoteWorkerOptions);
     }
 
     logger.info("Initializing in-memory cache server.");
@@ -169,7 +186,7 @@
     }
     if ((remoteWorkerOptions.casPath != null)
         && (!PathFragment.create(remoteWorkerOptions.casPath).isAbsolute()
-            || !getFileSystem().getPath(remoteWorkerOptions.casPath).exists())) {
+            || !fs.getPath(remoteWorkerOptions.casPath).exists())) {
       logger.severe("--cas_path must refer to an existing, absolute path!");
       System.exit(1);
       return;
@@ -179,19 +196,19 @@
         usingRemoteCache
             ? SimpleBlobStoreFactory.create(remoteOptions)
             : remoteWorkerOptions.casPath != null
-                ? new OnDiskBlobStore(getFileSystem().getPath(remoteWorkerOptions.casPath))
+                ? new OnDiskBlobStore(fs.getPath(remoteWorkerOptions.casPath))
                 : new ConcurrentMapBlobStore(new ConcurrentHashMap<String, byte[]>());
 
     RemoteWorker worker =
         new RemoteWorker(
-            remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
+            fs, remoteWorkerOptions, new SimpleBlobStoreActionCache(blobStore), sandboxPath);
 
     final Server server = worker.startServer();
     worker.createPidFile();
     server.awaitTermination();
   }
 
-  private static Path prepareSandboxRunner(RemoteWorkerOptions remoteWorkerOptions) {
+  private static Path prepareSandboxRunner(FileSystem fs, RemoteWorkerOptions remoteWorkerOptions) {
     if (OS.getCurrent() != OS.LINUX) {
       logger.severe("Sandboxing requested, but it is currently only available on Linux.");
       System.exit(1);
@@ -212,7 +229,7 @@
 
     Path sandboxPath = null;
     try {
-      sandboxPath = getFileSystem().getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
+      sandboxPath = fs.getPath(remoteWorkerOptions.workPath).getChild("linux-sandbox");
       try (FileOutputStream fos = new FileOutputStream(sandboxPath.getPathString())) {
         ByteStreams.copy(sandbox, fos);
       }