remote: unix socket support for remote http caching

This adds support for Unix sockets to Bazel for the
remote http cache. See corresponding issue #5098
for discussion.

RELNOTES: Introduce the --remote_cache_proxy flag,
which allows for remote http caching to connect
via a unix domain socket.
PiperOrigin-RevId: 204111667
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 18e38ed..5c4ef45 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -37,6 +37,7 @@
         "//src/main/java/com/google/devtools/common/options",
         "//third_party:auth",
         "//third_party:guava",
+        "//third_party:netty",
         "//third_party/grpc:grpc-jar",
         "//third_party/protobuf:protobuf_java",
         "//third_party/protobuf:protobuf_java_util",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index bb61082..90f4207 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -36,6 +36,17 @@
   public String remoteHttpCache;
 
   @Option(
+      name = "remote_cache_proxy",
+      defaultValue = "null",
+      documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+      effectTags = {OptionEffectTag.UNKNOWN},
+      help =
+          "Connect to the remote cache through a proxy. Currently this flag can only be used to "
+              + "configure a Unix domain socket (unix:/path/to/socket) for the HTTP cache."
+  )
+  public String remoteCacheProxy;
+
+  @Option(
       name = "remote_max_connections",
       defaultValue = "100",
       documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
index 47a14bf..ef8ffe2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreFactory.java
@@ -22,6 +22,7 @@
 import com.google.devtools.build.lib.remote.blobstore.http.HttpBlobStore;
 import com.google.devtools.build.lib.vfs.Path;
 import com.google.devtools.build.lib.vfs.PathFragment;
+import io.netty.channel.unix.DomainSocketAddress;
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
@@ -37,11 +38,20 @@
 
   public static SimpleBlobStore createRest(RemoteOptions options, Credentials creds) {
     try {
-      return new HttpBlobStore(
-          URI.create(options.remoteHttpCache),
-          (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout),
-          options.remoteMaxConnections,
-          creds);
+      URI uri = URI.create(options.remoteHttpCache);
+      int timeoutMillis = (int) TimeUnit.SECONDS.toMillis(options.remoteTimeout);
+
+      if (options.remoteCacheProxy != null) {
+        if (options.remoteCacheProxy.startsWith("unix:")) {
+          return HttpBlobStore.create(
+            new DomainSocketAddress(options.remoteCacheProxy.replaceFirst("^unix:", "")),
+              uri, timeoutMillis, options.remoteMaxConnections, creds);
+        } else {
+          throw new Exception("Remote cache proxy unsupported: " + options.remoteCacheProxy);
+        }
+      } else {
+        return HttpBlobStore.create(uri, timeoutMillis, options.remoteMaxConnections, creds);
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
index a017d2a..0bcc68b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
@@ -24,12 +24,20 @@
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueDomainSocketChannel;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.pool.ChannelPool;
 import io.netty.channel.pool.ChannelPoolHandler;
 import io.netty.channel.pool.FixedChannelPool;
 import io.netty.channel.pool.SimpleChannelPool;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -53,11 +61,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
@@ -92,7 +103,7 @@
   private static final Pattern INVALID_TOKEN_ERROR =
       Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
 
-  private final NioEventLoopGroup eventLoop = new NioEventLoopGroup(2 /* number of threads */);
+  private final EventLoopGroup eventLoop;
   private final ChannelPool channelPool;
   private final URI uri;
   private final int timeoutMillis;
@@ -105,10 +116,44 @@
   @GuardedBy("credentialsLock")
   private long lastRefreshTime;
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  public HttpBlobStore(
+  public static HttpBlobStore create(URI uri, int timeoutMillis,
+      int remoteMaxConnections, @Nullable final Credentials creds)
+      throws Exception {
+    return new HttpBlobStore(
+        NioEventLoopGroup::new,
+        NioSocketChannel.class,
+        uri, timeoutMillis, remoteMaxConnections, creds,
+        null);
+  }
+
+  public static HttpBlobStore create(
+      DomainSocketAddress domainSocketAddress,
       URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds)
       throws Exception {
+
+      if (KQueue.isAvailable()) {
+        return new HttpBlobStore(
+            KQueueEventLoopGroup::new,
+            KQueueDomainSocketChannel.class,
+            uri, timeoutMillis, remoteMaxConnections, creds,
+            domainSocketAddress);
+      } else if (Epoll.isAvailable()) {
+        return new HttpBlobStore(
+            EpollEventLoopGroup::new,
+            EpollDomainSocketChannel.class,
+            uri, timeoutMillis, remoteMaxConnections, creds,
+            domainSocketAddress);
+      } else {
+        throw new Exception("Unix domain sockets are unsupported on this platform");
+      }
+  }
+
+  private HttpBlobStore(
+      Function<Integer, EventLoopGroup> newEventLoopGroup,
+      Class<? extends Channel> channelClass,
+      URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds,
+      @Nullable SocketAddress socketAddress)
+      throws Exception {
     boolean useTls = uri.getScheme().equals("https");
     if (uri.getPort() == -1) {
       int port = useTls ? 443 : 80;
@@ -123,6 +168,10 @@
               uri.getFragment());
     }
     this.uri = uri;
+    if (socketAddress == null) {
+      socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
+    }
+
     final SslContext sslCtx;
     if (useTls) {
       // OpenSsl gives us a > 2x speed improvement on fast networks, but requires netty tcnative
@@ -132,12 +181,15 @@
     } else {
       sslCtx = null;
     }
+
+    this.eventLoop = newEventLoopGroup.apply(2);
     Bootstrap clientBootstrap =
         new Bootstrap()
-            .channel(NioSocketChannel.class)
+            .channel(channelClass)
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
             .group(eventLoop)
-            .remoteAddress(uri.getHost(), uri.getPort());
+            .remoteAddress(socketAddress);
+
     ChannelPoolHandler channelPoolHandler =
         new ChannelPoolHandler() {
           @Override