Execute sandboxfs reconfigurations in parallel.

I tried adding unit tests for this but doing so is very complicated in
their current form.  Given that this feature is still very experimental
and not in wide use, I'll postpone doing that until I'm able to prove
its worth.

For now, I ran a few builds by hand using the still-unreleased sandboxfs
0.2.0 with parallel support and confirmed this to work.

RELNOTES: None.
PiperOrigin-RevId: 280566455
diff --git a/src/main/java/com/google/devtools/build/lib/sandbox/RealSandboxfs02Process.java b/src/main/java/com/google/devtools/build/lib/sandbox/RealSandboxfs02Process.java
index acc54e6..b98119c 100644
--- a/src/main/java/com/google/devtools/build/lib/sandbox/RealSandboxfs02Process.java
+++ b/src/main/java/com/google/devtools/build/lib/sandbox/RealSandboxfs02Process.java
@@ -18,6 +18,7 @@
 import static com.google.common.base.Preconditions.checkState;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.devtools.build.lib.shell.Subprocess;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
@@ -26,10 +27,20 @@
 import com.google.gson.stream.JsonWriter;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A sandboxfs implementation that uses an external sandboxfs binary to manage the mount point.
@@ -38,17 +49,117 @@
  */
 final class RealSandboxfs02Process extends RealSandboxfsProcess {
 
+  private static final Logger log = Logger.getLogger(RealSandboxfsProcess.class.getName());
+
   /**
    * Writer with which to send data to the sandboxfs instance. Null only after {@link #destroy()}
    * has been invoked.
    */
-  private final JsonWriter processStdIn;
+  @GuardedBy("this")
+  private JsonWriter processStdIn;
 
   /**
-   * Reader with which to receive data from the sandboxfs instance. Null only after {@link
-   * #destroy()} has been invoked.
+   * Collection of active reconfiguration requests.
+   *
+   * <p>Each entry in this map is keyed by the identifier of the sandbox being affected by a
+   * reconfiguration request and points to a future that is set when the request completes.
+   *
+   * <p>New entries can be added to this map at any time, but only before {@link #destroy} is
+   * called. Once the sandboxfs instance has been destroyed, we do not expect any new requests to
+   * come in. However, existing requests will be drained by the {@link ResponsesReader} thread.
    */
-  private final JsonReader processStdOut;
+  private final ConcurrentMap<String, SettableFuture<Void>> inFlightRequests =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Thread that reads responses from sandboxfs and dispatches them to the futures maintained by
+   * {@link #inFlightRequests}.
+   */
+  private final Thread responsesReader;
+
+  /** Representation of a response returned by sandboxfs. */
+  private static class Response {
+    /** Identifier given in the request. Null if this carries a fatal error. */
+    @Nullable final String id;
+
+    /**
+     * Error message returned by sandboxfs if not null. If {@link #id} is not null, then this error
+     * corresponds to a specific request and is recoverable. Otherwise corresponds to a fatal
+     * condition, in which case sandboxfs will have stopped listening for requests.
+     */
+    @Nullable final String error;
+
+    /** Constructs a new response with the given values. */
+    Response(@Nullable String id, @Nullable String error) {
+      this.id = id;
+      this.error = error;
+    }
+  }
+
+  /**
+   * A thread that reads responses from the sandboxfs output stream and dispatches them to the
+   * futures awaiting for them.
+   */
+  private static class ResponsesReader extends Thread {
+
+    private final JsonReader reader;
+    private final ConcurrentMap<String, SettableFuture<Void>> inFlightRequests;
+
+    ResponsesReader(
+        JsonReader reader, ConcurrentMap<String, SettableFuture<Void>> inFlightRequests) {
+      this.reader = reader;
+      this.inFlightRequests = inFlightRequests;
+    }
+
+    /** Waits for responses and dispatches them. */
+    private void processResponses() throws IOException {
+      while (!Thread.interrupted() && reader.peek() != JsonToken.END_DOCUMENT) {
+        Response response = readResponse(reader);
+        if (response.id == null) {
+          // Non-recoverable error: abort.
+          throw new IOException(response.error != null ? response.error : "No error reported");
+        }
+
+        SettableFuture<Void> future = inFlightRequests.remove(response.id);
+        if (future == null) {
+          throw new IOException("sandboxfs returned response for unknown id " + response.id);
+        }
+        if (response.error == null) {
+          future.set(null);
+        } else {
+          future.setException(new IOException(response.error));
+        }
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        processResponses();
+      } catch (EOFException e) {
+        // OK, nothing to do.
+      } catch (IOException e) {
+        log.log(Level.WARNING, "Failed to read responses from sandboxfs", e);
+      }
+
+      // sandboxfs has either replied with an unrecoverable error or has stopped providing
+      // responses. Either way, we have to clean up any pending in-flight requests to unblock the
+      // threads waiting for them.
+      //
+      // Given that we only get here once destroy() has been called, we do not expect any new
+      // requests to show up in the inFlightRequests map. This is why we do not synchronize
+      // accesses to the map during the iteration.
+      while (!inFlightRequests.isEmpty()) {
+        Iterator<Map.Entry<String, SettableFuture<Void>>> iter =
+            inFlightRequests.entrySet().iterator();
+        while (iter.hasNext()) {
+          Map.Entry<String, SettableFuture<Void>> entry = iter.next();
+          entry.getValue().cancel(true);
+          iter.remove();
+        }
+      }
+    }
+  }
 
   /**
    * Initializes a new sandboxfs process instance.
@@ -61,42 +172,61 @@
     this.processStdIn =
         new JsonWriter(
             new BufferedWriter(new OutputStreamWriter(process.getOutputStream(), UTF_8)));
-    this.processStdOut =
+    JsonReader processStdOut =
         new JsonReader(new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8)));
 
     // Must use lenient writing and parsing to accept a stream of separate top-level JSON objects.
     this.processStdIn.setLenient(true);
-    this.processStdOut.setLenient(true);
+    processStdOut.setLenient(true);
+
+    responsesReader = new ResponsesReader(processStdOut, inFlightRequests);
+    responsesReader.start();
+  }
+
+  @Override
+  public synchronized void destroy() {
+    super.destroy();
+
+    responsesReader.interrupt();
+    try {
+      responsesReader.join();
+    } catch (InterruptedException e) {
+      log.warning("Interrupted while waiting for responses processor thread");
+      Thread.currentThread().interrupt();
+    }
+
+    processStdIn = null;
   }
 
   /**
-   * Waits for sandboxfs to accept the reconfiguration request.
+   * Waits for a single response from sandboxfs and returns it.
    *
-   * @param wantId name of the sandbox for which we are waiting a response
-   * @throws IOException if sandboxfs fails to accept the request for any reason
+   * @param input the stream connected to sandboxfs's stdout
+   * @return the response obtained from the stream
+   * @throws IOException if sandboxfs fails to read from the stream for any reason, including EOF
    */
-  private synchronized void waitAck(String wantId) throws IOException {
-    processStdOut.beginObject();
+  private static Response readResponse(JsonReader input) throws IOException {
+    input.beginObject();
     String id = null;
     String error = null;
-    while (processStdOut.hasNext()) {
-      String name = processStdOut.nextName();
+    while (input.hasNext()) {
+      String name = input.nextName();
       switch (name) {
         case "error":
-          if (processStdOut.peek() == JsonToken.NULL) {
-            processStdOut.nextNull();
+          if (input.peek() == JsonToken.NULL) {
+            input.nextNull();
           } else {
             checkState(error == null);
-            error = processStdOut.nextString();
+            error = input.nextString();
           }
           break;
 
         case "id":
-          if (processStdOut.peek() == JsonToken.NULL) {
-            processStdOut.nextNull();
+          if (input.peek() == JsonToken.NULL) {
+            input.nextNull();
           } else {
             checkState(id == null);
-            id = processStdOut.nextString();
+            id = input.nextString();
           }
           break;
 
@@ -104,12 +234,45 @@
           throw new IOException("Invalid field name in response: " + name);
       }
     }
-    processStdOut.endObject();
-    if (id == null || !id.equals(wantId)) {
-      throw new IOException("Got response with ID " + id + " but expected " + wantId);
-    }
-    if (error != null) {
-      throw new IOException(error);
+    input.endObject();
+    return new Response(id, error);
+  }
+
+  /**
+   * Registers a new in-flight operation for the given sandbox identifier.
+   *
+   * <p>The caller must wait for the returned operation using {@link #waitForRequest}.
+   *
+   * @param id the identifier of the sandbox for which the request will be issued. There can only be
+   *     one in-flight request per identifier.
+   * @return the future used to wait for the request's completion
+   */
+  private SettableFuture<Void> newRequest(String id) {
+    SettableFuture<Void> future = SettableFuture.create();
+    SettableFuture<Void> other = inFlightRequests.put(id, future);
+    checkState(other == null, "Cannot have two in-flight requests for sandbox '%s'", id);
+    return future;
+  }
+
+  /**
+   * Waits for a request to complete and unregisters its in-flight operation.
+   *
+   * @param future the value returned by {@link #newRequest}.
+   * @throws IOException if the request cannot be waited for or if it raised an error
+   */
+  private static void waitForRequest(SettableFuture<Void> future) throws IOException {
+    try {
+      future.get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new AssertionError("Unexpected exception type thrown by readResponse()", cause);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted while waiting for sandboxfs response");
     }
   }
 
@@ -130,44 +293,50 @@
 
   @Override
   @SuppressWarnings("UnnecessaryParentheses")
-  public synchronized void createSandbox(String id, List<Mapping> mappings) throws IOException {
+  public void createSandbox(String id, List<Mapping> mappings) throws IOException {
     checkArgument(!PathFragment.containsSeparator(id));
 
-    processStdIn.beginObject();
-    {
-      processStdIn.name("CreateSandbox");
+    SettableFuture<Void> future = newRequest(id);
+    synchronized (this) {
       processStdIn.beginObject();
       {
-        processStdIn.name("id");
-        processStdIn.value(id);
-        processStdIn.name("mappings");
-        processStdIn.beginArray();
-        for (Mapping mapping : mappings) {
-          writeMapping(processStdIn, mapping);
+        processStdIn.name("CreateSandbox");
+        processStdIn.beginObject();
+        {
+          processStdIn.name("id");
+          processStdIn.value(id);
+          processStdIn.name("mappings");
+          processStdIn.beginArray();
+          for (Mapping mapping : mappings) {
+            writeMapping(processStdIn, mapping);
+          }
+          processStdIn.endArray();
         }
-        processStdIn.endArray();
+        processStdIn.endObject();
       }
       processStdIn.endObject();
-    }
-    processStdIn.endObject();
 
-    processStdIn.flush();
-    waitAck(id);
+      processStdIn.flush();
+    }
+    waitForRequest(future);
   }
 
   @Override
   @SuppressWarnings("UnnecessaryParentheses")
-  public synchronized void destroySandbox(String id) throws IOException {
+  public void destroySandbox(String id) throws IOException {
     checkArgument(!PathFragment.containsSeparator(id));
 
-    processStdIn.beginObject();
-    {
-      processStdIn.name("DestroySandbox");
-      processStdIn.value(id);
-    }
-    processStdIn.endObject();
+    SettableFuture<Void> future = newRequest(id);
+    synchronized (this) {
+      processStdIn.beginObject();
+      {
+        processStdIn.name("DestroySandbox");
+        processStdIn.value(id);
+      }
+      processStdIn.endObject();
 
-    processStdIn.flush();
-    waitAck(id);
+      processStdIn.flush();
+    }
+    waitForRequest(future);
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/sandbox/BaseRealSandboxfsProcessTest.java b/src/test/java/com/google/devtools/build/lib/sandbox/BaseRealSandboxfsProcessTest.java
index 49b0b8b..21de6e9 100644
--- a/src/test/java/com/google/devtools/build/lib/sandbox/BaseRealSandboxfsProcessTest.java
+++ b/src/test/java/com/google/devtools/build/lib/sandbox/BaseRealSandboxfsProcessTest.java
@@ -93,7 +93,7 @@
         new PrintWriter(
             new BufferedWriter(
                 new OutputStreamWriter(fakeSandboxfs.getOutputStream(), StandardCharsets.UTF_8)))) {
-      writer.println("#! /bin/sh");
+      writer.println("#! /bin/bash");
 
       // Ignore requests for termination. The real sandboxfs process must be sent a SIGTERM to stop
       // serving, but in our case we want to terminate cleanly after waiting for all input to be
@@ -111,13 +111,36 @@
       writer.println("  echo \"${arg}\" >>" + capturedArgs + ";");
       writer.println("done;");
 
-      // Emit all responses first to avoid blocking any reads from the Java side.
+      // Attempt to "parse" requests coming through stdin by just counting brace pairs, assuming
+      // that the input is composed of a stream of JSON objects. Then, for each request, emit one
+      // response.
+      //
+      // We must do this because the unordered response processor required to parse 0.2.0 output
+      // expects responses to come only after their requests have been issued. Ideally we'd match
+      // our mock responses to specific requests to allow for testing of unordered responses, but
+      // for now assume all requests and responses in the test are correctly ordered.
+      //
+      // TODO(jmmv): This has become pretty awful. Should rethink unit testing.
       for (String response : responses) {
-        writer.println("echo '" + response + "';");
+        writer.println("braces=0; started=no");
+        writer.println("while read -d '' -n 1 ch; do");
+        writer.println("  case \"${ch}\" in");
+        writer.println("    '{') braces=$((braces + 1)); started=yes ;;");
+        writer.println("    '[') braces=$((braces + 1)); started=yes ;;");
+        writer.println("    ']') braces=$((braces - 1)) ;;");
+        writer.println("    '}') braces=$((braces - 1)) ;;");
+        writer.println("  esac");
+        writer.println("  [[ \"${ch}\" != '' ]] || ch='\n'");
+        writer.println("  printf '%c' \"${ch}\" >>" + capturedRequests);
+        writer.println("  if [[ \"${started}\" = yes && \"${braces}\" -eq 0 ]]; then");
+        writer.println("    echo '" + response + "';");
+        writer.println("    break;");
+        writer.println("  fi");
+        writer.println("done");
       }
 
-      // And finally capture all requests for later inspection.
-      writer.println("cat >" + capturedRequests + ";");
+      // Capture any stray requests not expected by the test data.
+      writer.println("cat >>" + capturedRequests);
     }
     fakeSandboxfs.setExecutable(true);