Rewrite the Command API

Important: the simplified API now defaults to forwarding interrupts to
subprocesses. I did audit all the call sites, and I think this is a safe change
to make.

- Properly support timeouts with all implementations
- Simplify the API
  - only provide two flavours of blocking calls, which require no input and
    forward interrupts; this is the most common usage
  - provide a number of async calls, which optionally takes input, and a flag
    whether to forward interrupts
  - only support input streams, no byte arrays or other 'convenience features'
    that are rarely needed and unnecessarily increase the surface area
  - use java.time.Duration to specify timeout; for consistency, interpret a
    timeout of <= 0 as no timeout (i.e., including rather than excluding 0)
  - KillableObserver and subclasses are no longer part of the public API, but
    still used to implement timeouts if the Subprocess.Factory does not support
    them
- Update the documentation for Command
- Update all callers; most callers now use the simplified API

PiperOrigin-RevId: 164716782
diff --git a/src/main/java/com/google/devtools/build/lib/shell/Consumers.java b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
index 47515d7..b3864d3 100644
--- a/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
+++ b/src/main/java/com/google/devtools/build/lib/shell/Consumers.java
@@ -13,6 +13,7 @@
 // limitations under the License.
 package com.google.devtools.build.lib.shell;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
@@ -31,11 +32,10 @@
 /**
  * This class provides convenience methods for consuming (actively reading)
  * output and error streams with different consumption policies:
- * discarding ({@link #createDiscardingConsumers()},
  * accumulating ({@link #createAccumulatingConsumers()},
  * and streaming ({@link #createStreamingConsumers(OutputStream, OutputStream)}).
  */
-class Consumers {
+final class Consumers {
 
   private static final Logger log =
     Logger.getLogger("com.google.devtools.build.lib.shell.Command");
@@ -45,33 +45,26 @@
   private static final ExecutorService pool =
     Executors.newCachedThreadPool(new AccumulatorThreadFactory());
 
-  static OutErrConsumers createDiscardingConsumers() {
-    return new OutErrConsumers(new DiscardingConsumer(),
-                               new DiscardingConsumer());
-  }
-
   static OutErrConsumers createAccumulatingConsumers() {
-    return new OutErrConsumers(new AccumulatingConsumer(),
-                               new AccumulatingConsumer());
+    return new OutErrConsumers(new AccumulatingConsumer(), new AccumulatingConsumer());
   }
 
-  static OutErrConsumers createStreamingConsumers(OutputStream out,
-                                                  OutputStream err) {
-    return new OutErrConsumers(new StreamingConsumer(out),
-                               new StreamingConsumer(err));
+  static OutErrConsumers createStreamingConsumers(OutputStream out, OutputStream err) {
+    Preconditions.checkNotNull(out);
+    Preconditions.checkNotNull(err);
+    return new OutErrConsumers(new StreamingConsumer(out), new StreamingConsumer(err));
   }
 
   static class OutErrConsumers {
-
     private final OutputConsumer out;
     private final OutputConsumer err;
 
-    private OutErrConsumers(final OutputConsumer out, final OutputConsumer err){
+    private OutErrConsumers(final OutputConsumer out, final OutputConsumer err) {
       this.out = out;
       this.err = err;
     }
 
-    void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams){
+    void registerInputs(InputStream outInput, InputStream errInput, boolean closeStreams) {
       out.registerInput(outInput, closeStreams);
       err.registerInput(errInput, closeStreams);
     }