Add integration tests for `GrpcRemoteDownloader` (https://github.com/bazelbuild/bazel/pull/28490)

This required adding support for the Remote Asset API to the remote worker used in tests.

Closes #28490.

PiperOrigin-RevId: 865832629
Change-Id: Ifdeb8ab3b9093ed0daf1ecd5aa545408a9948090
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
index f9118fd..41607cf 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -60,6 +60,7 @@
         "//src/main/java/com/google/devtools/build/lib/vfs",
         "//src/main/protobuf:spawn_java_proto",
         "//third_party:guava",
+        "//third_party:jsr305",
         "@com_google_protobuf//:protobuf_java",
         "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
     ],
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
index 9e3e3ba..4fd23d4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
@@ -22,6 +22,7 @@
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import javax.annotation.WillCloseWhenClosed;
 
 /**
  * An {@link OutputStream} that maintains a {@link Digest} of the data written to it.
@@ -39,7 +40,7 @@
    *
    * <p>The {@link OutputStream} should not be written to before or after the hand-off.
    */
-  public DigestOutputStream(HashFunction hashFunction, OutputStream out) {
+  public DigestOutputStream(HashFunction hashFunction, @WillCloseWhenClosed OutputStream out) {
     super(checkNotNull(out));
     this.hasher = checkNotNull(hashFunction.newHasher());
   }
diff --git a/src/test/shell/bazel/remote/BUILD b/src/test/shell/bazel/remote/BUILD
index 6365b68..222ed32 100644
--- a/src/test/shell/bazel/remote/BUILD
+++ b/src/test/shell/bazel/remote/BUILD
@@ -141,3 +141,25 @@
         "@bazel_tools//tools/bash/runfiles",
     ],
 )
+
+sh_test(
+    name = "remote_downloader_test",
+    size = "large",
+    srcs = ["remote_downloader_test.sh"],
+    data = [
+        ":remote_utils",
+        "//src/test/shell/bazel:test-deps",
+        "//src/tools/remote:worker",
+        "@bazel_tools//tools/bash/runfiles",
+        "@rules_java//toolchains:current_java_runtime",
+    ],
+    env = {
+        "JAVA_ROOTPATH": "$(JAVA_ROOTPATH)",
+    },
+    tags = [
+        "requires-network",
+    ],
+    toolchains = [
+        "@rules_java//toolchains:current_java_runtime",
+    ],
+)
diff --git a/src/test/shell/bazel/remote/remote_downloader_test.sh b/src/test/shell/bazel/remote/remote_downloader_test.sh
new file mode 100755
index 0000000..b307e8e
--- /dev/null
+++ b/src/test/shell/bazel/remote/remote_downloader_test.sh
@@ -0,0 +1,325 @@
+#!/usr/bin/env bash
+#
+# Copyright 2026 The Bazel Authors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tests for the remote downlaader backed by the Remote Asset API.
+
+set -euo pipefail
+
+# --- begin runfiles.bash initialization ---
+if [[ ! -d "${RUNFILES_DIR:-/dev/null}" && ! -f "${RUNFILES_MANIFEST_FILE:-/dev/null}" ]]; then
+  if [[ -f "$0.runfiles_manifest" ]]; then
+    export RUNFILES_MANIFEST_FILE="$0.runfiles_manifest"
+  elif [[ -f "$0.runfiles/MANIFEST" ]]; then
+    export RUNFILES_MANIFEST_FILE="$0.runfiles/MANIFEST"
+  elif [[ -f "$0.runfiles/bazel_tools/tools/bash/runfiles/runfiles.bash" ]]; then
+    export RUNFILES_DIR="$0.runfiles"
+  fi
+fi
+if [[ -f "${RUNFILES_DIR:-/dev/null}/bazel_tools/tools/bash/runfiles/runfiles.bash" ]]; then
+  source "${RUNFILES_DIR}/bazel_tools/tools/bash/runfiles/runfiles.bash"
+elif [[ -f "${RUNFILES_MANIFEST_FILE:-/dev/null}" ]]; then
+  source "$(grep -m1 "^bazel_tools/tools/bash/runfiles/runfiles.bash " \
+            "$RUNFILES_MANIFEST_FILE" | cut -d ' ' -f 2-)"
+else
+  echo >&2 "ERROR: cannot find @bazel_tools//tools/bash/runfiles:runfiles.bash"
+  exit 1
+fi
+# --- end runfiles.bash initialization ---
+
+source "$(rlocation "io_bazel/src/test/shell/integration_test_setup.sh")" \
+  || { echo "integration_test_setup.sh not found!" >&2; exit 1; }
+source "$(rlocation "io_bazel/src/test/shell/bazel/remote_helpers.sh")" \
+  || { echo "remote_helpers.sh not found!" >&2; exit 1; }
+source "$(rlocation "io_bazel/src/test/shell/bazel/remote/remote_utils.sh")" \
+  || { echo "remote_utils.sh not found!" >&2; exit 1; }
+
+function set_up() {
+  start_worker
+}
+
+function tear_down() {
+  bazel clean >& $TEST_log
+  stop_worker
+  shutdown_server
+}
+
+function test_remote_downloader_http_archive() {
+  local archive_dir="${TEST_TMPDIR}/archive"
+  mkdir -p "${archive_dir}"
+  cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+    name = "files",
+    srcs = ["data.txt"],
+    visibility = ["//visibility:public"],
+)
+EOF
+  echo "Hello from remote archive" > "${archive_dir}/data.txt"
+  touch "${archive_dir}/REPO.bazel"
+
+  # Create the archive
+  local archive_file="${TEST_TMPDIR}/mylib.tar.gz"
+  tar -czf "${archive_file}" -C "${archive_dir}" .
+  local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+  # Serve the archive
+  serve_file "${archive_file}"
+
+  # Set up the workspace
+  mkdir -p main
+  cd main
+  cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+    name = "mylib",
+    url = "http://127.0.0.1:${nc_port}/served_file.$$",
+    sha256 = "${sha256}",
+    type = "tar.gz",
+)
+EOF
+
+  cat > BUILD.bazel <<'EOF'
+filegroup(
+    name = "test",
+    srcs = ["@mylib//:files"],
+)
+EOF
+
+  # Build using the remote downloader
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:test >& $TEST_log \
+      || fail "Failed to build with remote downloader"
+
+  # Verify the content was downloaded correctly
+  local output_base=$(bazel info output_base)
+  local output_file="${output_base}/external/+http_archive+mylib/data.txt"
+  assert_contains "Hello from remote archive" "${output_file}"
+}
+
+function test_remote_downloader_checksum_mismatch() {
+  # Test that a checksum mismatch from the remote downloader is handled correctly
+  local archive_dir="${TEST_TMPDIR}/archive2"
+  mkdir -p "${archive_dir}"
+  cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+    name = "data",
+    srcs = ["content.txt"],
+    visibility = ["//visibility:public"],
+)
+EOF
+  echo "Checksum mismatch content" > "${archive_dir}/content.txt"
+  touch "${archive_dir}/REPO.bazel"
+
+  local archive_file="${TEST_TMPDIR}/pkg.zip"
+  (cd "${archive_dir}" && zip -r "${archive_file}" .)
+  # Use a wrong checksum intentionally
+  local wrong_integrity="sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
+
+  serve_file "${archive_file}"
+
+  mkdir -p main2
+  cd main2
+  cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+    name = "pkg",
+    url = "http://127.0.0.1:${nc_port}/served_file.$$",
+    integrity = "${wrong_integrity}",
+    type = "zip",
+)
+EOF
+
+  cat > BUILD.bazel <<'EOF'
+filegroup(
+    name = "check",
+    srcs = ["@pkg//:data"],
+)
+EOF
+
+  # The build should fail due to checksum mismatch
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:check >& $TEST_log \
+      && fail "Expected build to fail due to checksum mismatch"
+
+  expect_log "Checksum was"
+  expect_log "${wrong_integrity}"
+}
+
+function test_remote_downloader_canonical_id() {
+  # Test that the canonical_id qualifier is respected - same URL with different
+  # canonical IDs should be treated as different entries
+
+  local archive_dir="${TEST_TMPDIR}/archive4"
+  mkdir -p "${archive_dir}"
+  cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+    name = "files",
+    srcs = ["version.txt"],
+    visibility = ["//visibility:public"],
+)
+EOF
+  echo "Version 1" > "${archive_dir}/version.txt"
+  touch "${archive_dir}/REPO.bazel"
+
+  local archive_file="${TEST_TMPDIR}/canonical.tar.gz"
+  tar -czf "${archive_file}" -C "${archive_dir}" .
+  local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+  serve_file "${archive_file}"
+
+  mkdir -p main4
+  cd main4
+
+  # First, fetch with one canonical ID
+  cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+    name = "canonical_repo",
+    url = "http://127.0.0.1:${nc_port}/served_file.$$",
+    sha256 = "${sha256}",
+    type = "tar.gz",
+    canonical_id = "version-1",
+)
+EOF
+
+  cat > BUILD.bazel <<'EOF'
+filegroup(
+    name = "test",
+    srcs = ["@canonical_repo//:files"],
+)
+EOF
+
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:test >& $TEST_log \
+      || fail "Failed first build with canonical_id"
+
+  # Verify the first content
+  local output_base=$(bazel info output_base)
+  local output_file="${output_base}/external/+http_archive+canonical_repo/version.txt"
+  assert_contains "Version 1" "${output_file}"
+
+  # Now update the archive with different content
+  echo "Version 2" > "${archive_dir}/version.txt"
+  tar -czf "${archive_file}" -C "${archive_dir}" .
+  local sha256_v2=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+  # Update the served file (serve_file copies to a different location)
+  cat "${archive_file}" > "${TEST_TMPDIR}/served_file.$$"
+
+  # Clean to force re-fetch with new repository definition
+  bazel clean >& $TEST_log
+
+  # Update MODULE.bazel with new canonical ID and checksum
+  cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+    name = "canonical_repo",
+    url = "http://127.0.0.1:${nc_port}/served_file.$$",
+    sha256 = "${sha256_v2}",
+    type = "tar.gz",
+    canonical_id = "version-2",
+)
+EOF
+
+  # Build again with the new canonical ID - the remote downloader should
+  # recognize this as a different request due to the different canonical_id
+  # and re-download the file
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:test >& $TEST_log \
+      || fail "Failed to build with new canonical_id"
+
+  # Verify the updated content was fetched
+  assert_contains "Version 2" "${output_file}"
+}
+
+function test_remote_downloader_caching() {
+  # Test that the remote downloader caches downloaded files - when the same
+  # URL with the same checksum is requested again, it should use the cached
+  # version without re-downloading.
+
+  local archive_dir="${TEST_TMPDIR}/archive5"
+  mkdir -p "${archive_dir}"
+  cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+    name = "files",
+    srcs = ["cached.txt"],
+    visibility = ["//visibility:public"],
+)
+EOF
+  echo "Cached content" > "${archive_dir}/cached.txt"
+  touch "${archive_dir}/REPO.bazel"
+
+  local archive_file="${TEST_TMPDIR}/cached.tar.gz"
+  tar -czf "${archive_file}" -C "${archive_dir}" .
+  local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+  serve_file "${archive_file}"
+
+  mkdir -p main5
+  cd main5
+
+  cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+    name = "cached_repo",
+    url = "http://127.0.0.1:${nc_port}/served_file.$$",
+    sha256 = "${sha256}",
+    type = "tar.gz",
+)
+EOF
+
+  cat > BUILD.bazel <<'EOF'
+filegroup(
+    name = "test",
+    srcs = ["@cached_repo//:files"],
+)
+EOF
+
+  # First build - should download from the origin
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:test >& $TEST_log \
+      || fail "Failed first build"
+
+  # Shut down the file server to verify that the second fetch uses cache
+  shutdown_server
+
+  # Clean Bazel's local cache but keep the remote worker running
+  bazel clean --expunge >& $TEST_log
+
+  # Second build - should use the cached version from the remote downloader
+  # (no HTTP server running, so it would fail if trying to re-download)
+  bazel build \
+      --remote_cache=grpc://localhost:${worker_port} \
+      --experimental_remote_downloader=grpc://localhost:${worker_port} \
+      //:test >& $TEST_log \
+      || fail "Failed second build - should have used cache"
+
+  # Verify the content
+  local output_base=$(bazel info output_base)
+  local output_file="${output_base}/external/+http_archive+cached_repo/cached.txt"
+  assert_contains "Cached content" "${output_file}"
+}
+
+run_suite "Remote downloader tests"
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index a952ef9..f0cea66 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -30,6 +30,7 @@
     visibility = ["//src/tools/remote:__subpackages__"],
     deps = [
         "//src/main/java/com/google/devtools/build/lib/actions",
+        "//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader",
         "//src/main/java/com/google/devtools/build/lib/events",
         "//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
         "//src/main/java/com/google/devtools/build/lib/exec/local",
@@ -65,6 +66,8 @@
         "@googleapis//google/bytestream:bytestream_java_proto",
         "@googleapis//google/longrunning:longrunning_java_proto",
         "@googleapis//google/rpc:rpc_java_proto",
+        "@remoteapis//:build_bazel_remote_asset_v1_remote_asset_java_grpc",
+        "@remoteapis//:build_bazel_remote_asset_v1_remote_asset_java_proto",
         "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
         "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
         "@rules_java//java/runfiles",
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java
new file mode 100644
index 0000000..b120057
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java
@@ -0,0 +1,332 @@
+// Copyright 2026 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.remote.worker;
+
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import build.bazel.remote.asset.v1.FetchBlobRequest;
+import build.bazel.remote.asset.v1.FetchBlobResponse;
+import build.bazel.remote.asset.v1.FetchDirectoryRequest;
+import build.bazel.remote.asset.v1.FetchDirectoryResponse;
+import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
+import build.bazel.remote.asset.v1.Qualifier;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.flogger.GoogleLogger;
+import com.google.devtools.build.lib.bazel.repository.downloader.Checksum;
+import com.google.devtools.build.lib.bazel.repository.downloader.HashOutputStream;
+import com.google.devtools.build.lib.bazel.repository.downloader.UnrecoverableHttpException;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.util.DigestOutputStream;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.rpc.Code;
+import io.grpc.StatusException;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.SequencedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+/** A basic implementation of a {@link FetchImplBase} service. */
+final class FetchServer extends FetchImplBase {
+  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+
+  private static final String QUALIFIER_CANONICAL_ID = "bazel.canonical_id";
+  private static final String QUALIFIER_CHECKSUM_SRI = "checksum.sri";
+  private static final String QUALIFIER_HTTP_HEADER_PREFIX = "http_header:";
+  private static final String QUALIFIER_HTTP_HEADER_URL_PREFIX = "http_header_url:";
+
+  private final OnDiskBlobStoreCache cache;
+  private final DigestUtil digestUtil;
+  private final Path tempPath;
+  private final ConcurrentHashMap<CacheKey, CacheValue> knownUrls = new ConcurrentHashMap<>();
+
+  private record CacheKey(String url, @Nullable String canonicalId) {}
+
+  private record CacheValue(Digest digest, Instant downloadedAt) {}
+
+  private record Qualifiers(
+      @Nullable Checksum expectedChecksum,
+      ImmutableMap<String, String> globalHeaders,
+      ImmutableTable<Integer, String, String> urlSpecificHeaders,
+      @Nullable String canonicalId) {}
+
+  @Override
+  public void fetchBlob(
+      FetchBlobRequest request, StreamObserver<FetchBlobResponse> responseObserver) {
+    if (request.getUrisCount() == 0) {
+      responseObserver.onError(
+          StatusUtils.invalidArgumentError("uris", "at least one URI must be provided"));
+      return;
+    }
+
+    Qualifiers qualifiers;
+    try {
+      qualifiers = parseQualifiers(request.getQualifiersList());
+    } catch (StatusException e) {
+      responseObserver.onError(e);
+      return;
+    }
+
+    Instant cutoff =
+        request.hasOldestContentAccepted()
+            ? Instant.now()
+                .minus(Duration.ofSeconds(request.getOldestContentAccepted().getSeconds()))
+            : Instant.MIN;
+    Optional<Digest> cacheHit = checkCache(request.getUrisList(), qualifiers.canonicalId(), cutoff);
+    if (cacheHit.isPresent()) {
+      responseObserver.onNext(
+          FetchBlobResponse.newBuilder()
+              .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.OK_VALUE).build())
+              .setUri(request.getUris(0))
+              .setBlobDigest(cacheHit.get())
+              .setDigestFunction(digestUtil.getDigestFunction())
+              .build());
+      responseObserver.onCompleted();
+      return;
+    }
+
+    Path tempDownloadDir;
+    try {
+      tempPath.createDirectoryAndParents();
+      tempDownloadDir = tempPath.createTempDirectory("download-");
+    } catch (IOException e) {
+      responseObserver.onError(StatusUtils.internalError(e));
+      return;
+    }
+    try {
+      DownloadResult result = tryDownload(request, qualifiers, tempDownloadDir);
+
+      RequestMetadata requestMetadata = TracingMetadataUtils.fromCurrentContext();
+      RemoteActionExecutionContext context = RemoteActionExecutionContext.create(requestMetadata);
+      getFromFuture(cache.uploadFile(context, result.digest(), result.path()));
+      addToCache(result.uri(), qualifiers.canonicalId(), result.digest());
+
+      responseObserver.onNext(
+          FetchBlobResponse.newBuilder()
+              .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.OK_VALUE).build())
+              .setUri(result.uri())
+              .setBlobDigest(result.digest())
+              .setDigestFunction(digestUtil.getDigestFunction())
+              .build());
+      responseObserver.onCompleted();
+    } catch (IOException e) {
+      responseObserver.onNext(
+          FetchBlobResponse.newBuilder()
+              .setStatus(
+                  com.google.rpc.Status.newBuilder()
+                      .setCode(determineCode(e).getNumber())
+                      .setMessage("Failed to fetch from any URI: " + e.getMessage())
+                      .build())
+              .setUri(request.getUris(0))
+              .build());
+      responseObserver.onCompleted();
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      logger.atWarning().withCause(e).log("Failed to upload blob to CAS");
+      responseObserver.onError(StatusUtils.internalError(e));
+    } finally {
+      try {
+        tempDownloadDir.deleteTree();
+      } catch (IOException e) {
+        logger.atWarning().withCause(e).log(
+            "Failed to delete temporary download directory %s", tempDownloadDir);
+      }
+    }
+  }
+
+  @Override
+  public void fetchDirectory(
+      FetchDirectoryRequest request, StreamObserver<FetchDirectoryResponse> responseObserver) {
+    // FetchDirectory is not used by Bazel's GrpcRemoteDownloader client.
+    responseObserver.onError(
+        io.grpc.Status.UNIMPLEMENTED
+            .withDescription("FetchDirectory is not implemented")
+            .asRuntimeException());
+  }
+
+  public FetchServer(OnDiskBlobStoreCache cache, DigestUtil digestUtil, Path tempPath) {
+    this.cache = cache;
+    this.digestUtil = digestUtil;
+    this.tempPath = tempPath;
+  }
+
+  private static Qualifiers parseQualifiers(Iterable<Qualifier> qualifiersList)
+      throws StatusException {
+    Checksum expectedChecksum = null;
+    var globalHeaders = ImmutableMap.<String, String>builder();
+    var urlSpecificHeaders = ImmutableTable.<Integer, String, String>builder();
+    String canonicalId = null;
+
+    for (var qualifier : qualifiersList) {
+      String name = qualifier.getName();
+      String value = qualifier.getValue();
+
+      if (name.equals(QUALIFIER_CANONICAL_ID)) {
+        canonicalId = value;
+      } else if (name.equals(QUALIFIER_CHECKSUM_SRI)) {
+        try {
+          expectedChecksum = Checksum.fromSubresourceIntegrity(value);
+        } catch (Checksum.InvalidChecksumException e) {
+          throw StatusUtils.invalidArgumentError(
+              "qualifiers",
+              "invalid '%s' qualifier: %s".formatted(QUALIFIER_CHECKSUM_SRI, e.getMessage()));
+        }
+      } else if (name.startsWith(QUALIFIER_HTTP_HEADER_URL_PREFIX)) {
+        // Format: http_header_url:<url_index>:<header_name>
+        String remainder = name.substring(QUALIFIER_HTTP_HEADER_URL_PREFIX.length());
+        int colonIndex = remainder.indexOf(':');
+        if (colonIndex > 0) {
+          try {
+            int urlIndex = Integer.parseInt(remainder.substring(0, colonIndex));
+            String headerName = remainder.substring(colonIndex + 1);
+            urlSpecificHeaders.put(urlIndex, headerName, value);
+          } catch (NumberFormatException e) {
+            throw StatusUtils.invalidArgumentError(
+                "qualifiers",
+                "invalid '%s' qualifier: %s"
+                    .formatted(QUALIFIER_HTTP_HEADER_URL_PREFIX, e.getMessage()));
+          }
+        }
+      } else if (name.startsWith(QUALIFIER_HTTP_HEADER_PREFIX)) {
+        String headerName = name.substring(QUALIFIER_HTTP_HEADER_PREFIX.length());
+        globalHeaders.put(headerName, value);
+      } else {
+        throw StatusUtils.invalidArgumentError(
+            "qualifiers", "unknown qualifier: '%s'".formatted(name));
+      }
+    }
+
+    return new Qualifiers(
+        expectedChecksum,
+        globalHeaders.buildOrThrow(),
+        urlSpecificHeaders.buildOrThrow(),
+        canonicalId);
+  }
+
+  private Optional<Digest> checkCache(
+      Iterable<String> uris, @Nullable String canonicalId, Instant cutoff) {
+    for (var uri : uris) {
+      var cacheValue = knownUrls.get(new CacheKey(uri, canonicalId));
+      if (cacheValue != null && cacheValue.downloadedAt.isAfter(cutoff)) {
+        return Optional.of(cacheValue.digest);
+      }
+    }
+    return Optional.empty();
+  }
+
+  private void addToCache(String uri, @Nullable String canonicalId, Digest digest) {
+    knownUrls.put(new CacheKey(uri, canonicalId), new CacheValue(digest, Instant.now()));
+  }
+
+  private record DownloadResult(String uri, Path path, Digest digest) {}
+
+  private DownloadResult tryDownload(
+      FetchBlobRequest request, Qualifiers qualifiers, Path tempDownloadDir) throws IOException {
+    IOException lastException = null;
+
+    for (int i = 0; i < request.getUrisCount(); i++) {
+      String uri = request.getUris(i);
+      Path downloadPath = tempDownloadDir.getChild("attempt_" + i);
+      try {
+        var out = downloadPath.getOutputStream();
+        var digestOut =
+            new DigestOutputStream(
+                downloadPath.getFileSystem().getDigestFunction().getHashFunction(), out);
+        var maybeChecksumOut =
+            qualifiers.expectedChecksum() != null
+                ? new HashOutputStream(digestOut, qualifiers.expectedChecksum())
+                : digestOut;
+        try (maybeChecksumOut) {
+          var headers = new LinkedHashMap<>(qualifiers.globalHeaders());
+          headers.putAll(qualifiers.urlSpecificHeaders().row(i));
+          fetchFromUrl(
+              uri,
+              headers,
+              Duration.ofSeconds(request.getTimeout().getSeconds()),
+              maybeChecksumOut);
+          return new DownloadResult(uri, downloadPath, digestOut.digest());
+        }
+      } catch (IOException e) {
+        try {
+          downloadPath.delete();
+        } catch (IOException ex) {
+          logger.atWarning().withCause(ex).log(
+              "Failed to delete partially downloaded file %s", downloadPath);
+        }
+        lastException = e;
+        logger.atFine().withCause(e).log("Failed to fetch from %s", uri);
+      }
+    }
+
+    throw lastException != null ? lastException : new IOException("No URIs to fetch");
+  }
+
+  private Code determineCode(@Nullable IOException lastException) {
+    return switch (lastException) {
+      case SocketTimeoutException e -> Code.DEADLINE_EXCEEDED;
+      case FileNotFoundException e -> Code.NOT_FOUND;
+      // See HashOutputStream#verifyHash.
+      case UnrecoverableHttpException e when e.getMessage().startsWith("Checksum was ") ->
+          Code.ABORTED;
+      case null, default -> Code.UNKNOWN;
+    };
+  }
+
+  private void fetchFromUrl(
+      String urlString, SequencedMap<String, String> headers, Duration timeout, OutputStream out)
+      throws IOException {
+    HttpURLConnection connection;
+    try {
+      connection = (HttpURLConnection) new URI(urlString).toURL().openConnection();
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid URI: " + urlString, e);
+    }
+    var timeoutMillis = timeout.equals(Duration.ZERO) ? 30000 : (int) timeout.toMillis();
+    try {
+      connection.setRequestMethod("GET");
+      connection.setConnectTimeout(timeoutMillis);
+      connection.setReadTimeout(timeoutMillis);
+      headers.forEach(connection::setRequestProperty);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new IOException("HTTP request failed with status " + responseCode);
+      }
+
+      try (var in = connection.getInputStream()) {
+        in.transferTo(out);
+      }
+    } finally {
+      connection.disconnect();
+    }
+  }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index f2fbced..69ddc65 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -18,6 +18,7 @@
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.logging.Level.FINE;
 
+import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
 import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
 import build.bazel.remote.execution.v2.ActionResult;
 import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesImplBase;
@@ -105,6 +106,7 @@
   private final ContentAddressableStorageImplBase casServer;
   private final ExecutionImplBase execServer;
   private final CapabilitiesImplBase capabilitiesServer;
+  private final FetchImplBase fetchServer;
 
   static FileSystem getFileSystem() {
     final DigestHashFunction hashFunction;
@@ -206,6 +208,7 @@
       execServer = null;
     }
     this.capabilitiesServer = new CapabilitiesServer(digestUtil, execServer != null, workerOptions);
+    this.fetchServer = new FetchServer(cache, digestUtil, workPath.getRelative("fetch-temp"));
   }
 
   public Server startServer() throws IOException {
@@ -223,7 +226,8 @@
             .addService(ServerInterceptors.intercept(actionCacheServer, interceptors))
             .addService(ServerInterceptors.intercept(bsServer, interceptors))
             .addService(ServerInterceptors.intercept(casServer, interceptors))
-            .addService(ServerInterceptors.intercept(capabilitiesServer, interceptors));
+            .addService(ServerInterceptors.intercept(capabilitiesServer, interceptors))
+            .addService(ServerInterceptors.intercept(fetchServer, interceptors));
 
     if (workerOptions.tlsCertificate != null) {
       b.sslContext(getSslContextBuilder(workerOptions).build());