Make the integration tests pass with gRPC client/server comms.

In particular:

- Make a SIGINT to the server interrupt every command
- Parse negative numbers on the command line correctly (std::stoi throws an exception, and I'd rather not start using C++ exceptions)
- Use "bytes" for command line arguments instead of "string" in the client/server proto . This is more principled, although we pretend all arguments are strings all over the place and it works for "blaze run" mostly by accident.

--
MOS_MIGRATED_REVID=120434432
diff --git a/src/main/cpp/util/numbers.cc b/src/main/cpp/util/numbers.cc
index 2e3b109..46feb81 100644
--- a/src/main/cpp/util/numbers.cc
+++ b/src/main/cpp/util/numbers.cc
@@ -52,10 +52,10 @@
   36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36, 36 };
 
 // Parse the sign.
-inline bool safe_parse_sign(const string &text  /*inout*/,
+inline bool safe_parse_sign(const char** rest, /*inout*/
                             bool* negative_ptr  /*output*/) {
-  const char* start = text.data();
-  const char* end = start + text.size();
+  const char* start = *rest;
+  const char* end = start + strlen(start);
 
   // Consume whitespace.
   while (start < end && ascii_isspace(start[0])) {
@@ -77,6 +77,7 @@
     }
   }
 
+  *rest = start;
   return true;
 }
 
@@ -104,13 +105,13 @@
 //
 // Overflow checking becomes simple.
 
-inline bool safe_parse_positive_int(const string &text, int* value_p) {
+inline bool safe_parse_positive_int(const char *text, int* value_p) {
   int value = 0;
   const int vmax = std::numeric_limits<int>::max();
   static_assert(vmax > 0, "");
   const int vmax_over_base = vmax / 10;
-  const char* start = text.data();
-  const char* end = start + text.size();
+  const char* start = text;
+  const char* end = start + strlen(text);
   // loop over digits
   for (; start < end; ++start) {
     unsigned char c = static_cast<unsigned char>(start[0]);
@@ -134,7 +135,7 @@
   return true;
 }
 
-inline bool safe_parse_negative_int(const string &text, int* value_p) {
+inline bool safe_parse_negative_int(const char *text, int* value_p) {
   int value = 0;
   const int vmin = std::numeric_limits<int>::min();
   static_assert(vmin < 0, "");
@@ -146,8 +147,8 @@
   if (vmin % 10 > 0) {
     vmin_over_base += 1;
   }
-  const char* start = text.data();
-  const char* end = start + text.size();
+  const char* start = text;
+  const char* end = start + strlen(text);
   // loop over digits
   for (; start < end; ++start) {
     unsigned char c = static_cast<unsigned char>(start[0]);
@@ -173,14 +174,15 @@
 
 bool safe_strto32(const string &text, int *value_p) {
   *value_p = 0;
+  const char* rest = text.c_str();
   bool negative;
-  if (!safe_parse_sign(text, &negative)) {
+  if (!safe_parse_sign(&rest, &negative)) {
     return false;
   }
   if (!negative) {
-    return safe_parse_positive_int(text, value_p);
+    return safe_parse_positive_int(rest, value_p);
   } else {
-    return safe_parse_negative_int(text, value_p);
+    return safe_parse_negative_int(rest, value_p);
   }
 }
 
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
index ace9459..2523136 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/BlazeRuntime.java
@@ -838,8 +838,19 @@
    * the program.
    */
   private static int serverMain(Iterable<BlazeModule> modules, OutErr outErr, String[] args) {
+    InterruptSignalHandler sigintHandler = null;
     try {
-      RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+      final RPCServer blazeServer = createBlazeRPCServer(modules, Arrays.asList(args));
+
+      // Register the signal handler.
+       sigintHandler = new InterruptSignalHandler() {
+        @Override
+        protected void onSignal() {
+          LOG.severe("User interrupt");
+          blazeServer.interrupt();
+        }
+      };
+
       blazeServer.serve();
       return ExitCode.SUCCESS.getNumericExitCode();
     } catch (OptionsParsingException e) {
@@ -851,6 +862,10 @@
     } catch (AbruptExitException e) {
       outErr.printErr(e.getMessage());
       return e.getExitCode().getNumericExitCode();
+    } finally {
+      if (sigintHandler != null) {
+        sigintHandler.uninstall();
+      }
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
index 26473e2..558b875 100644
--- a/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/AfUnixServer.java
@@ -19,7 +19,6 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteStreams;
 import com.google.devtools.build.lib.server.RPCService.UnknownCommandException;
-import com.google.devtools.build.lib.server.signal.InterruptSignalHandler;
 import com.google.devtools.build.lib.unix.LocalClientSocket;
 import com.google.devtools.build.lib.unix.LocalServerSocket;
 import com.google.devtools.build.lib.unix.LocalSocketAddress;
@@ -129,59 +128,54 @@
         serverDirectory, workspaceDir);
   }
 
+
+  private final AtomicBoolean inAction = new AtomicBoolean(false);
+  private final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
+  private final AtomicLong cmdNum = new AtomicLong();
+  private final Thread mainThread = Thread.currentThread();
+  private final Object interruptLock = new Object();
+
+  @Override
+  public void interrupt() {
+    // Only interrupt during actions - otherwise we may end up setting the interrupt bit
+    // at the end of a build and responding to it at the beginning of the subsequent build.
+    synchronized (interruptLock) {
+      if (allowingInterrupt.get()) {
+        mainThread.interrupt();
+      }
+    }
+
+    if (inAction.get()) {
+      Runnable interruptWatcher =
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                long originalCmd = cmdNum.get();
+                Thread.sleep(10 * 1000);
+                if (inAction.get() && cmdNum.get() == originalCmd) {
+                  // We're still operating on the same command.
+                  // Interrupt took too long.
+                  ThreadUtils.warnAboutSlowInterrupt();
+                }
+              } catch (InterruptedException e) {
+                // Ignore.
+              }
+            }
+          };
+      Thread interruptWatcherThread =
+          new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
+      interruptWatcherThread.setDaemon(true);
+      interruptWatcherThread.start();
+    }
+  }
+
   /**
    * Wait on a socket for business (answer requests). Note that this
    * method won't return until the server shuts down.
    */
   @Override
   public void serve() {
-    // Register the signal handler.
-    final AtomicBoolean inAction = new AtomicBoolean(false);
-    final AtomicBoolean allowingInterrupt = new AtomicBoolean(true);
-    final AtomicLong cmdNum = new AtomicLong();
-    final Thread mainThread = Thread.currentThread();
-    final Object interruptLock = new Object();
-
-    InterruptSignalHandler sigintHandler =
-        new InterruptSignalHandler() {
-          @Override
-          protected void onSignal() {
-            LOG.severe("User interrupt");
-
-            // Only interrupt during actions - otherwise we may end up setting the interrupt bit
-            // at the end of a build and responding to it at the beginning of the subsequent build.
-            synchronized (interruptLock) {
-              if (allowingInterrupt.get()) {
-                mainThread.interrupt();
-              }
-            }
-
-            if (inAction.get()) {
-              Runnable interruptWatcher =
-                  new Runnable() {
-                    @Override
-                    public void run() {
-                      try {
-                        long originalCmd = cmdNum.get();
-                        Thread.sleep(10 * 1000);
-                        if (inAction.get() && cmdNum.get() == originalCmd) {
-                          // We're still operating on the same command.
-                          // Interrupt took too long.
-                          ThreadUtils.warnAboutSlowInterrupt();
-                        }
-                      } catch (InterruptedException e) {
-                        // Ignore.
-                      }
-                    }
-                  };
-              Thread interruptWatcherThread =
-                  new Thread(interruptWatcher, "interrupt-watcher-" + cmdNum);
-              interruptWatcherThread.setDaemon(true);
-              interruptWatcherThread.start();
-            }
-          }
-        };
-
     try {
       while (!lameDuck) {
         try {
@@ -246,7 +240,6 @@
     } finally {
       rpcService.shutdown();
       LOG.info("Logging finished");
-      sigintHandler.uninstall();
     }
   }
 
diff --git a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
index 477f4b4..a5dbbf0 100644
--- a/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
+++ b/src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
@@ -14,6 +14,8 @@
 
 package com.google.devtools.build.lib.server;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.LockingMode;
 import com.google.devtools.build.lib.runtime.CommandExecutor;
 import com.google.devtools.build.lib.server.CommandProtos.CancelRequest;
@@ -25,6 +27,7 @@
 import com.google.devtools.build.lib.util.Clock;
 import com.google.devtools.build.lib.util.ExitCode;
 import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.util.ThreadUtils;
 import com.google.devtools.build.lib.util.io.OutErr;
 import com.google.devtools.build.lib.vfs.FileSystemUtils;
 import com.google.devtools.build.lib.vfs.Path;
@@ -39,10 +42,15 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.channels.ServerSocketChannel;
+import java.nio.charset.Charset;
 import java.security.SecureRandom;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * gRPC server class.
@@ -51,6 +59,11 @@
  * bootstrapping.
  */
 public class GrpcServerImpl extends RPCServer implements CommandServerGrpc.CommandServer {
+  // UTF-8 won't do because we want to be able to pass arbitrary binary strings.
+  // Not that the internals of Bazel handle that correctly, but why not make at least this little
+  // part correct?
+  private static final Charset CHARSET = Charset.forName("ISO-8859-1");
+
   private class RunningCommand implements AutoCloseable {
     private final Thread thread;
     private final String id;
@@ -129,12 +142,14 @@
   private static final String REQUEST_COOKIE_FILE = "request_cookie";
   private static final String RESPONSE_COOKIE_FILE = "response_cookie";
 
+  @GuardedBy("runningCommands")
   private final Map<String, RunningCommand> runningCommands = new HashMap<>();
   private final CommandExecutor commandExecutor;
   private final Clock clock;
   private final Path serverDirectory;
   private final String requestCookie;
   private final String responseCookie;
+  private final AtomicLong interruptCounter = new AtomicLong(0);
 
   private Server server;
   private int port;  // mutable so that we can overwrite it if port 0 is passed in
@@ -165,6 +180,47 @@
     return result.toString();
   }
 
+  private void startSlowInterruptWatcher(final ImmutableSet<String> commandIds) {
+    if (commandIds.isEmpty()) {
+      return;
+    }
+
+    Runnable interruptWatcher = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          boolean ok;
+          Thread.sleep(10 * 1000);
+          synchronized (runningCommands) {
+            ok = Collections.disjoint(commandIds, runningCommands.keySet());
+          }
+          if (!ok) {
+            // At least one command was not interrupted. Interrupt took too long.
+            ThreadUtils.warnAboutSlowInterrupt();
+          }
+        } catch (InterruptedException e) {
+          // Ignore.
+        }
+      }
+    };
+
+    Thread interruptWatcherThread =
+        new Thread(interruptWatcher, "interrupt-watcher-" + interruptCounter.incrementAndGet());
+    interruptWatcherThread.setDaemon(true);
+    interruptWatcherThread.start();
+  }
+
+  @Override
+  public void interrupt() {
+    synchronized (runningCommands) {
+      for (RunningCommand command : runningCommands.values()) {
+        command.thread.interrupt();
+      }
+
+      startSlowInterruptWatcher(ImmutableSet.copyOf(runningCommands.keySet()));
+    }
+  }
+
   @Override
   public void serve() throws IOException {
     Preconditions.checkState(!serving);
@@ -245,6 +301,11 @@
       return;
     }
 
+    ImmutableList.Builder<String> args = ImmutableList.builder();
+    for (ByteString requestArg : request.getArgList()) {
+      args.add(requestArg.toString(CHARSET));
+    }
+
     String commandId;
     int exitCode;
     try (RunningCommand command = new RunningCommand()) {
@@ -254,7 +315,7 @@
           new RpcOutputStream(observer, command.id, StreamType.STDERR));
 
       exitCode = commandExecutor.exec(
-          request.getArgList(), rpcOutErr,
+          args.build(), rpcOutErr,
           request.getBlockForLock() ? LockingMode.WAIT : LockingMode.ERROR_OUT,
           request.getClientDescription(), clock.currentTimeMillis());
     } catch (InterruptedException e) {
@@ -308,6 +369,8 @@
       if (command != null) {
         command.thread.interrupt();
       }
+
+      startSlowInterruptWatcher(ImmutableSet.of(request.getCommandId()));
     }
 
     streamObserver.onNext(CancelResponse.newBuilder().setCookie(responseCookie).build());
diff --git a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
index fc6b83f..a9663ea 100644
--- a/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
+++ b/src/main/java/com/google/devtools/build/lib/server/RPCServer.java
@@ -90,4 +90,9 @@
    * Start serving and block until the a shutdown command is received.
    */
   public abstract void serve() throws IOException;
+
+  /**
+   * Called when the server receives a SIGINT.
+   */
+  public abstract void interrupt();
 }
diff --git a/src/main/protobuf/command_server.proto b/src/main/protobuf/command_server.proto
index eef2422..69fc62e 100644
--- a/src/main/protobuf/command_server.proto
+++ b/src/main/protobuf/command_server.proto
@@ -25,7 +25,7 @@
 message RunRequest {
   // This must be the request cookie from the output base. A rudimentary form of authentication.
   string cookie = 1;
-  repeated string arg = 2;
+  repeated bytes arg = 2;
   bool block_for_lock = 3;  // If false, the client won't wait for another client to finish
   string client_description = 4;
 }
diff --git a/src/test/cpp/util/BUILD b/src/test/cpp/util/BUILD
index e47f35e..c9a4720 100644
--- a/src/test/cpp/util/BUILD
+++ b/src/test/cpp/util/BUILD
@@ -22,6 +22,15 @@
 )
 
 cc_test(
+    name = "numbers_test",
+    srcs = ["numbers_test.cc"],
+    deps = [
+        "//src/main/cpp/util",
+        "//third_party:gtest",
+    ],
+)
+
+cc_test(
     name = "strings_test",
     srcs = ["strings_test.cc"],
     shard_count = 2,
diff --git a/src/test/cpp/util/numbers_test.cc b/src/test/cpp/util/numbers_test.cc
new file mode 100644
index 0000000..71e6487
--- /dev/null
+++ b/src/test/cpp/util/numbers_test.cc
@@ -0,0 +1,38 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#include "src/main/cpp/util/numbers.h"
+#include "src/main/cpp/util/port.h"
+#include "gtest/gtest.h"
+
+namespace blaze_util {
+
+TEST(NumbersTest, TestSafeStrto32) {
+  int value;
+  ASSERT_TRUE(safe_strto32("0", &value));
+  ASSERT_EQ(0, value);
+  ASSERT_TRUE(safe_strto32("42", &value));
+  ASSERT_EQ(42, value);
+  ASSERT_TRUE(safe_strto32("007", &value));
+  ASSERT_EQ(7, value);
+  ASSERT_TRUE(safe_strto32("1234567", &value));
+  ASSERT_EQ(1234567, value);
+  ASSERT_TRUE(safe_strto32("-0", &value));
+  ASSERT_EQ(0, value);
+  ASSERT_TRUE(safe_strto32("-273", &value));
+  ASSERT_EQ(-273, value);
+  ASSERT_TRUE(safe_strto32("-0420", &value));
+  ASSERT_EQ(-420, value);
+}
+
+}  // namespace blaze_util