Refactor WorkRequestHandler to be an interface, of which Proto is one implementation.
This lays the foundation for WorkRequestHandler to be used for JSON workers as well.
RELNOTES: None.
PiperOrigin-RevId: 341078345
diff --git a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
index 6642dd2..ae78c44 100644
--- a/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
+++ b/src/java_tools/buildjar/java/com/google/devtools/build/buildjar/BazelJavaBuilder.java
@@ -23,6 +23,7 @@
import com.google.devtools.build.buildjar.javac.plugins.BlazeJavaCompilerPlugin;
import com.google.devtools.build.buildjar.javac.plugins.dependency.DependencyModule;
import com.google.devtools.build.buildjar.javac.plugins.errorprone.ErrorPronePlugin;
+import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor;
import com.google.devtools.build.lib.worker.WorkRequestHandler;
import java.io.IOException;
import java.io.OutputStreamWriter;
@@ -41,8 +42,17 @@
public static void main(String[] args) {
BazelJavaBuilder builder = new BazelJavaBuilder();
if (args.length == 1 && args[0].equals("--persistent_worker")) {
- WorkRequestHandler workerHandler = new WorkRequestHandler(builder::parseAndBuild);
- System.exit(workerHandler.processRequests(System.in, System.out, System.err));
+ WorkRequestHandler workerHandler =
+ new WorkRequestHandler(
+ builder::parseAndBuild,
+ System.err,
+ new ProtoWorkerMessageProcessor(System.in, System.out));
+ try {
+ workerHandler.processRequests();
+ } catch (IOException e) {
+ System.err.println(e.getMessage());
+ System.exit(1);
+ }
} else {
PrintWriter pw =
new PrintWriter(new OutputStreamWriter(System.err, Charset.defaultCharset()));
diff --git a/src/main/java/com/google/devtools/build/lib/worker/BUILD b/src/main/java/com/google/devtools/build/lib/worker/BUILD
index 6188dcf..4262f42 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/worker/BUILD
@@ -50,6 +50,7 @@
java_library(
name = "work_request_handlers",
srcs = [
+ "ProtoWorkerMessageProcessor.java",
"WorkRequestHandler.java",
],
deps = [
diff --git a/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java
new file mode 100644
index 0000000..182695d
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/worker/ProtoWorkerMessageProcessor.java
@@ -0,0 +1,54 @@
+// Copyright 2020 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.worker;
+
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
+import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** Implementation of the Worker Protocol using Proto to communicate with Bazel. */
+public final class ProtoWorkerMessageProcessor
+ implements WorkRequestHandler.WorkerMessageProcessor {
+
+ /** This worker's stdin. */
+ private final InputStream stdin;
+
+ /** This worker's stdout. Only {@link WorkRequest}s should be written here. */
+ private final OutputStream stdout;
+
+ /** Constructs a {@link WorkRequestHandler} that reads and writes Protocol Buffers. */
+ public ProtoWorkerMessageProcessor(InputStream stdin, OutputStream stdout) {
+ this.stdin = stdin;
+ this.stdout = stdout;
+ }
+
+ @Override
+ public WorkRequest readWorkRequest() throws IOException {
+ return WorkRequest.parseDelimitedFrom(stdin);
+ }
+
+ @Override
+ public void writeWorkResponse(WorkResponse workResponse) throws IOException {
+ try {
+ workResponse.writeDelimitedTo(stdout);
+ } finally {
+ stdout.flush();
+ }
+ }
+
+ @Override
+ public void close() {}
+}
diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
index c34de02..2f5deb4 100644
--- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
+++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java
@@ -17,7 +17,6 @@
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -29,59 +28,76 @@
* (https://docs.bazel.build/versions/master/persistent-workers.html), including multiplex workers
* (https://docs.bazel.build/versions/master/multiplex-worker.html).
*/
-public class WorkRequestHandler {
+public class WorkRequestHandler implements AutoCloseable {
+ /** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
+ public interface WorkerMessageProcessor {
+ /** Reads the next incoming request from this worker's stdin. */
+ public WorkRequest readWorkRequest() throws IOException;
+
+ /**
+ * Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
+ * responsible for flushing the stdout.
+ */
+ public void writeWorkResponse(WorkResponse workResponse) throws IOException;
+
+ /** Clean up. */
+ public void close() throws IOException;
+ }
+
+ /** The function to be called after each {@link WorkRequest} is read. */
private final BiFunction<List<String>, PrintWriter, Integer> callback;
+ /** This worker's stderr. */
+ private final PrintStream stderr;
+
+ private final WorkerMessageProcessor messageProcessor;
+
/**
* Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
* received. The first argument to {@code callback} is the set of command-line arguments, the
- * second is where all error messages and similar should be written to.
+ * second is where all error messages and similar should be written to. The callback should return
+ * an exit code indicating success (0) or failure (nonzero).
*/
- public WorkRequestHandler(BiFunction<List<String>, PrintWriter, Integer> callback) {
+ public WorkRequestHandler(
+ BiFunction<List<String>, PrintWriter, Integer> callback,
+ PrintStream stderr,
+ WorkerMessageProcessor messageProcessor) {
this.callback = callback;
+ this.stderr = stderr;
+ this.messageProcessor = messageProcessor;
}
/**
- * Runs an infinite loop of reading {@code WorkRequest} from {@code in}, running the callback,
- * then writing the corresponding {@code WorkResponse} to {@code out}. If there is an error
+ * Runs an infinite loop of reading {@link WorkRequest} from {@code in}, running the callback,
+ * then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
* reading or writing the requests or responses, it writes an error message on {@code err} and
* returns. If {@code in} reaches EOF, it also returns.
- *
- * @return 0 if we reached EOF, 1 if there was an error.
*/
- public int processRequests(InputStream in, PrintStream out, PrintStream err) {
+ public void processRequests() throws IOException {
while (true) {
- try {
- WorkRequest request = WorkRequest.parseDelimitedFrom(in);
-
+ WorkRequest request = messageProcessor.readWorkRequest();
if (request == null) {
break;
}
-
if (request.getRequestId() != 0) {
- Thread t = createResponseThread(request, out, err);
+ Thread t = createResponseThread(request);
t.start();
} else {
- respondToRequest(request, out);
- }
- } catch (IOException e) {
- e.printStackTrace(err);
- return 1;
+ respondToRequest(request);
}
}
- return 0;
}
- /** Creates a new {@code Thread} to process a multiplex request. */
- public Thread createResponseThread(WorkRequest request, PrintStream out, PrintStream err) {
+ /** Creates a new {@link Thread} to process a multiplex request. */
+ public Thread createResponseThread(WorkRequest request) {
Thread currentThread = Thread.currentThread();
return new Thread(
() -> {
try {
- respondToRequest(request, out);
+ respondToRequest(request);
} catch (IOException e) {
- e.printStackTrace(err);
+ e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
currentThread.interrupt();
}
@@ -89,9 +105,9 @@
"multiplex-request-" + request.getRequestId());
}
- /** Responds to {@code request}, writing the {@code WorkResponse} proto to {@code out}. */
+ /** Handles and responds to the given {@link WorkRequest}. */
@VisibleForTesting
- void respondToRequest(WorkRequest request, PrintStream out) throws IOException {
+ void respondToRequest(WorkRequest request) throws IOException {
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
int exitCode;
@@ -109,14 +125,13 @@
.setRequestId(request.getRequestId())
.build();
synchronized (this) {
- workResponse.writeDelimitedTo(out);
+ messageProcessor.writeWorkResponse(workResponse);
}
}
- out.flush();
+ }
- // Hint to the system that now would be a good time to run a gc. After a compile
- // completes lots of objects should be available for collection and it should be cheap to
- // collect them.
- System.gc();
+ @Override
+ public void close() throws IOException {
+ messageProcessor.close();
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
index 5aebafe..1975b03 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java
@@ -41,12 +41,16 @@
@Test
public void testNormalWorkRequest() throws IOException {
- WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 1);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ WorkRequestHandler handler =
+ new WorkRequestHandler(
+ (args, err) -> 1,
+ new PrintStream(new ByteArrayOutputStream()),
+ new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- handler.respondToRequest(request, new PrintStream(out));
+ handler.respondToRequest(request);
WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -57,12 +61,16 @@
@Test
public void testMultiplexWorkRequest() throws IOException {
- WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 0);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ WorkRequestHandler handler =
+ new WorkRequestHandler(
+ (args, err) -> 0,
+ new PrintStream(new ByteArrayOutputStream()),
+ new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- handler.respondToRequest(request, new PrintStream(out));
+ handler.respondToRequest(request);
WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -73,17 +81,19 @@
@Test
public void testOutput() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> {
err.println("Failed!");
return 1;
- });
+ },
+ new PrintStream(new ByteArrayOutputStream()),
+ new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- handler.respondToRequest(request, new PrintStream(out));
+ handler.respondToRequest(request);
WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
@@ -94,16 +104,18 @@
@Test
public void testException() throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> {
throw new RuntimeException("Exploded!");
- });
+ },
+ new PrintStream(new ByteArrayOutputStream()),
+ new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));
List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- handler.respondToRequest(request, new PrintStream(out));
+ handler.respondToRequest(request);
WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));