Refactor WorkRequestHandler to be an interface, of which Proto is one implementation.

This lays the foundation for WorkRequestHandler to be used for JSON workers as well.

RELNOTES: None.
PiperOrigin-RevId: 341078345
diff --git a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
index 6642dd2..ae78c44 100644
--- a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
+++ b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
@@ -23,6 +23,7 @@
 import com.google.devtools.build.buildjar.javac.plugins.BlazeJavaCompilerPlugin;
 import com.google.devtools.build.buildjar.javac.plugins.dependency.DependencyModule;
 import com.google.devtools.build.buildjar.javac.plugins.errorprone.ErrorPronePlugin;
+import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor;
 import com.google.devtools.build.lib.worker.WorkRequestHandler;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -41,8 +42,17 @@
   public static void main(String[] args) {
     BazelJavaBuilder builder = new BazelJavaBuilder();
     if (args.length == 1 && args[0].equals("--persistent_worker")) {
-      WorkRequestHandler workerHandler = new WorkRequestHandler(builder::parseAndBuild);
-      System.exit(workerHandler.processRequests(System.in, System.out, System.err));
+      WorkRequestHandler workerHandler =
+          new WorkRequestHandler(
+              builder::parseAndBuild,
+              System.err,
+              new ProtoWorkerMessageProcessor(System.in, System.out));
+      try {
+        workerHandler.processRequests();
+      } catch (IOException e) {
+        System.err.println(e.getMessage());
+        System.exit(1);
+      }
     } else {
       PrintWriter pw =
           new PrintWriter(new OutputStreamWriter(System.err, Charset.defaultCharset()));
diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD
index 6188dcf..4262f42 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD
@@ -50,6 +50,7 @@
 java_library(
     name = "work_request_handlers",
     srcs = [
+        "ProtoWorkerMessageProcessor.java",
         "WorkRequestHandler.java",
     ],
     deps = [
diff --git a/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java
new file mode 100644
index 0000000..182695d
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java
@@ -0,0 +1,54 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.worker;
+
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** Implementation of the Worker Protocol using Proto to communicate with Bazel. */
+public final class ProtoWorkerMessageProcessor
+    implements WorkRequestHandler.WorkerMessageProcessor {
+
+  /** This worker's stdin. */
+  private final InputStream stdin;
+
+  /** This worker's stdout. Only {@link WorkRequest}s should be written here. */
+  private final OutputStream stdout;
+
+  /** Constructs a {@link WorkRequestHandler} that reads and writes Protocol Buffers. */
+  public ProtoWorkerMessageProcessor(InputStream stdin, OutputStream stdout) {
+    this.stdin = stdin;
+    this.stdout = stdout;
+  }
+
+  @Override
+  public WorkRequest readWorkRequest() throws IOException {
+    return WorkRequest.parseDelimitedFrom(stdin);
+  }
+
+  @Override
+  public void writeWorkResponse(WorkResponse workResponse) throws IOException {
+    try {
+      workResponse.writeDelimitedTo(stdout);
+    } finally {
+      stdout.flush();
+    }
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
index c34de02..2f5deb4 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
@@ -17,7 +17,6 @@
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
 import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
@@ -29,59 +28,76 @@
  * (https://docs.bazel.build/versions/master/persistent-workers.html), including multiplex workers
  * (https://docs.bazel.build/versions/master/multiplex-worker.html).
  */
-public class WorkRequestHandler {
+public class WorkRequestHandler implements AutoCloseable {
 
+  /** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
+  public interface WorkerMessageProcessor {
+    /** Reads the next incoming request from this worker's stdin. */
+    public WorkRequest readWorkRequest() throws IOException;
+
+    /**
+     * Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
+     * responsible for flushing the stdout.
+     */
+    public void writeWorkResponse(WorkResponse workResponse) throws IOException;
+
+    /** Clean up. */
+    public void close() throws IOException;
+  }
+
+  /** The function to be called after each {@link WorkRequest} is read. */
   private final BiFunction<List<String>, PrintWriter, Integer> callback;
 
+  /** This worker's stderr. */
+  private final PrintStream stderr;
+
+  private final WorkerMessageProcessor messageProcessor;
+
   /**
    * Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
    * received. The first argument to {@code callback} is the set of command-line arguments, the
-   * second is where all error messages and similar should be written to.
+   * second is where all error messages and similar should be written to. The callback should return
+   * an exit code indicating success (0) or failure (nonzero).
    */
-  public WorkRequestHandler(BiFunction<List<String>, PrintWriter, Integer> callback) {
+  public WorkRequestHandler(
+      BiFunction<List<String>, PrintWriter, Integer> callback,
+      PrintStream stderr,
+      WorkerMessageProcessor messageProcessor) {
     this.callback = callback;
+    this.stderr = stderr;
+    this.messageProcessor = messageProcessor;
   }
 
   /**
-   * Runs an infinite loop of reading {@code WorkRequest} from {@code in}, running the callback,
-   * then writing the corresponding {@code WorkResponse} to {@code out}. If there is an error
+   * Runs an infinite loop of reading {@link WorkRequest} from {@code in}, running the callback,
+   * then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
    * reading or writing the requests or responses, it writes an error message on {@code err} and
    * returns. If {@code in} reaches EOF, it also returns.
-   *
-   * @return 0 if we reached EOF, 1 if there was an error.
    */
-  public int processRequests(InputStream in, PrintStream out, PrintStream err) {
+  public void processRequests() throws IOException {
     while (true) {
-      try {
-        WorkRequest request = WorkRequest.parseDelimitedFrom(in);
-
+      WorkRequest request = messageProcessor.readWorkRequest();
         if (request == null) {
           break;
         }
-
         if (request.getRequestId() != 0) {
-          Thread t = createResponseThread(request, out, err);
+        Thread t = createResponseThread(request);
           t.start();
         } else {
-          respondToRequest(request, out);
-        }
-      } catch (IOException e) {
-        e.printStackTrace(err);
-        return 1;
+        respondToRequest(request);
       }
     }
-    return 0;
   }
 
-  /** Creates a new {@code Thread} to process a multiplex request. */
-  public Thread createResponseThread(WorkRequest request, PrintStream out, PrintStream err) {
+  /** Creates a new {@link Thread} to process a multiplex request. */
+  public Thread createResponseThread(WorkRequest request) {
     Thread currentThread = Thread.currentThread();
     return new Thread(
         () -> {
           try {
-            respondToRequest(request, out);
+            respondToRequest(request);
           } catch (IOException e) {
-            e.printStackTrace(err);
+            e.printStackTrace(stderr);
             // In case of error, shut down the entire worker.
             currentThread.interrupt();
           }
@@ -89,9 +105,9 @@
         "multiplex-request-" + request.getRequestId());
   }
 
-  /** Responds to {@code request}, writing the {@code WorkResponse} proto to {@code out}. */
+  /** Handles and responds to the given {@link WorkRequest}. */
   @VisibleForTesting
-  void respondToRequest(WorkRequest request, PrintStream out) throws IOException {
+  void respondToRequest(WorkRequest request) throws IOException {
     try (StringWriter sw = new StringWriter();
         PrintWriter pw = new PrintWriter(sw)) {
       int exitCode;
@@ -109,14 +125,13 @@
               .setRequestId(request.getRequestId())
               .build();
       synchronized (this) {
-        workResponse.writeDelimitedTo(out);
+        messageProcessor.writeWorkResponse(workResponse);
       }
     }
-    out.flush();
+  }
 
-    // Hint to the system that now would be a good time to run a gc.  After a compile
-    // completes lots of objects should be available for collection and it should be cheap to
-    // collect them.
-    System.gc();
+  @Override
+  public void close() throws IOException {
+    messageProcessor.close();
   }
 }
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
index 5aebafe..1975b03 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
@@ -41,12 +41,16 @@
 
   @Test
   public void testNormalWorkRequest() throws IOException {
-    WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 1);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    WorkRequestHandler handler =
+        new WorkRequestHandler(
+            (args, err) -> 1,
+            new PrintStream(new ByteArrayOutputStream()),
+            new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
 
     List<String> args = Arrays.asList("--sources", "A.java");
     WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.respondToRequest(request, new PrintStream(out));
+    handler.respondToRequest(request);
 
     WorkResponse response =
         WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -57,12 +61,16 @@
 
   @Test
   public void testMultiplexWorkRequest() throws IOException {
-    WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 0);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    WorkRequestHandler handler =
+        new WorkRequestHandler(
+            (args, err) -> 0,
+            new PrintStream(new ByteArrayOutputStream()),
+            new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
 
     List<String> args = Arrays.asList("--sources", "A.java");
     WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.respondToRequest(request, new PrintStream(out));
+    handler.respondToRequest(request);
 
     WorkResponse response =
         WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -73,17 +81,19 @@
 
   @Test
   public void testOutput() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
     WorkRequestHandler handler =
         new WorkRequestHandler(
             (args, err) -> {
               err.println("Failed!");
               return 1;
-            });
+            },
+            new PrintStream(new ByteArrayOutputStream()),
+            new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
 
     List<String> args = Arrays.asList("--sources", "A.java");
     WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.respondToRequest(request, new PrintStream(out));
+    handler.respondToRequest(request);
 
     WorkResponse response =
         WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -94,16 +104,18 @@
 
   @Test
   public void testException() throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
     WorkRequestHandler handler =
         new WorkRequestHandler(
             (args, err) -> {
               throw new RuntimeException("Exploded!");
-            });
+            },
+            new PrintStream(new ByteArrayOutputStream()),
+            new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
 
     List<String> args = Arrays.asList("--sources", "A.java");
     WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    handler.respondToRequest(request, new PrintStream(out));
+    handler.respondToRequest(request);
 
     WorkResponse response =
         WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));