Fix race condition with releaseUploadChannel().

A race condition can occur in `HttpBlobStore.releaseUploadChannel()` when `ChannelHandlerContext` is closed by `HttpUploadHandler.channelRead0`. Because `ChannelHandlerContext.close` runs asynchronously and closes all pipelines in reverse order (before the channel is marked as closed), `HttpBlobStore.releaseUploadChannel()` can be executed while channel is still open and handlers are being removed, causing an exception to be thrown by the `ch.pipeline().remove(X)` calls. This fix saves the status of the keepAlive response from the `HttpUploadHandler` so that the channel can be fully closed before releasing.

Resolves #5952

Closes #5953.

PiperOrigin-RevId: 211437717
diff --git a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
index ca9a8eb..a1ec3b2 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/blobstore/http/HttpBlobStore.java
@@ -65,6 +65,7 @@
 import java.net.SocketAddress;
 import java.net.URI;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,7 +100,6 @@
  * <p>The implementation currently does not support transfer encoding chunked.
  */
 public final class HttpBlobStore implements SimpleBlobStore {
-
   private static final Pattern INVALID_TOKEN_ERROR =
       Pattern.compile("\\s*error\\s*=\\s*\"?invalid_token\"?");
 
@@ -107,6 +107,7 @@
   private final ChannelPool channelPool;
   private final URI uri;
   private final int timeoutMillis;
+  private final boolean useTls;
 
   private final Object closeLock = new Object();
 
@@ -159,7 +160,7 @@
       URI uri, int timeoutMillis, int remoteMaxConnections, @Nullable final Credentials creds,
       @Nullable SocketAddress socketAddress)
       throws Exception {
-    boolean useTls = uri.getScheme().equals("https");
+    useTls = uri.getScheme().equals("https");
     if (uri.getPort() == -1) {
       int port = useTls ? 443 : 80;
       uri =
@@ -237,6 +238,13 @@
               try {
                 Channel ch = channelAcquired.getNow();
                 ChannelPipeline p = ch.pipeline();
+
+                if (!isChannelPipelineEmpty(p)) {
+                  channelReady.setFailure(
+                      new IllegalStateException("Channel pipeline is not empty."));
+                  return;
+                }
+
                 p.addLast(new HttpResponseDecoder());
                 // The 10KiB limit was chosen at random. We only expect HTTP servers to respond with
                 // an error message in the body and that should always be less than 10KiB.
@@ -264,11 +272,18 @@
   @SuppressWarnings("FutureReturnValueIgnored")
   private void releaseUploadChannel(Channel ch) {
     if (ch.isOpen()) {
-      ch.pipeline().remove(HttpResponseDecoder.class);
-      ch.pipeline().remove(HttpObjectAggregator.class);
-      ch.pipeline().remove(HttpRequestEncoder.class);
-      ch.pipeline().remove(ChunkedWriteHandler.class);
-      ch.pipeline().remove(HttpUploadHandler.class);
+      try {
+        ch.pipeline().remove(HttpResponseDecoder.class);
+        ch.pipeline().remove(HttpObjectAggregator.class);
+        ch.pipeline().remove(HttpRequestEncoder.class);
+        ch.pipeline().remove(ChunkedWriteHandler.class);
+        ch.pipeline().remove(HttpUploadHandler.class);
+      } catch (NoSuchElementException e) {
+        // If the channel is in the process of closing but not yet closed, some handlers could have
+        // been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
+        // removed in reverse-order, if we get a NoSuchElement exception, the following handlers
+        // should have been removed.
+      }
     }
     channelPool.release(ch);
   }
@@ -288,6 +303,13 @@
               try {
                 Channel ch = channelAcquired.getNow();
                 ChannelPipeline p = ch.pipeline();
+
+                if (!isChannelPipelineEmpty(p)) {
+                  channelReady.setFailure(
+                      new IllegalStateException("Channel pipeline is not empty."));
+                  return;
+                }
+
                 ch.pipeline()
                     .addFirst("read-timeout-handler", new ReadTimeoutHandler(timeoutMillis));
                 p.addLast(new HttpClientCodec());
@@ -309,13 +331,27 @@
     if (ch.isOpen()) {
       // The channel might have been closed due to an error, in which case its pipeline
       // has already been cleared. Closed channels can't be reused.
-      ch.pipeline().remove(ReadTimeoutHandler.class);
-      ch.pipeline().remove(HttpClientCodec.class);
-      ch.pipeline().remove(HttpDownloadHandler.class);
+      try {
+        ch.pipeline().remove(ReadTimeoutHandler.class);
+        ch.pipeline().remove(HttpClientCodec.class);
+        ch.pipeline().remove(HttpDownloadHandler.class);
+      } catch (NoSuchElementException e) {
+        // If the channel is in the process of closing but not yet closed, some handlers could have
+        // been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
+        // removed in reverse-order, if we get a NoSuchElement exception, the following handlers
+        // should have been removed.
+      }
     }
     channelPool.release(ch);
   }
 
+  private boolean isChannelPipelineEmpty(ChannelPipeline pipeline) {
+    return (pipeline.first() == null)
+        || (useTls
+            && "ssl-handler".equals(pipeline.firstContext().name())
+            && pipeline.first() == pipeline.last());
+  }
+
   @Override
   public boolean containsKey(String key) {
     throw new UnsupportedOperationException("HTTP Caching does not use this method.");