Add integration tests for `GrpcRemoteDownloader` (https://github.com/bazelbuild/bazel/pull/28490)
This required adding support for the Remote Asset API to the remote worker used in tests.
Closes #28490.
PiperOrigin-RevId: 865832629
Change-Id: Ifdeb8ab3b9093ed0daf1ecd5aa545408a9948090
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
index f9118fd..41607cf 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -60,6 +60,7 @@
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/protobuf:spawn_java_proto",
"//third_party:guava",
+ "//third_party:jsr305",
"@com_google_protobuf//:protobuf_java",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
index 9e3e3ba..4fd23d4 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestOutputStream.java
@@ -22,6 +22,7 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import javax.annotation.WillCloseWhenClosed;
/**
* An {@link OutputStream} that maintains a {@link Digest} of the data written to it.
@@ -39,7 +40,7 @@
*
* <p>The {@link OutputStream} should not be written to before or after the hand-off.
*/
- public DigestOutputStream(HashFunction hashFunction, OutputStream out) {
+ public DigestOutputStream(HashFunction hashFunction, @WillCloseWhenClosed OutputStream out) {
super(checkNotNull(out));
this.hasher = checkNotNull(hashFunction.newHasher());
}
diff --git a/src/test/shell/bazel/remote/BUILD b/src/test/shell/bazel/remote/BUILD
index 6365b68..222ed32 100644
--- a/src/test/shell/bazel/remote/BUILD
+++ b/src/test/shell/bazel/remote/BUILD
@@ -141,3 +141,25 @@
"@bazel_tools//tools/bash/runfiles",
],
)
+
+sh_test(
+ name = "remote_downloader_test",
+ size = "large",
+ srcs = ["remote_downloader_test.sh"],
+ data = [
+ ":remote_utils",
+ "//src/test/shell/bazel:test-deps",
+ "//src/tools/remote:worker",
+ "@bazel_tools//tools/bash/runfiles",
+ "@rules_java//toolchains:current_java_runtime",
+ ],
+ env = {
+ "JAVA_ROOTPATH": "$(JAVA_ROOTPATH)",
+ },
+ tags = [
+ "requires-network",
+ ],
+ toolchains = [
+ "@rules_java//toolchains:current_java_runtime",
+ ],
+)
diff --git a/src/test/shell/bazel/remote/remote_downloader_test.sh b/src/test/shell/bazel/remote/remote_downloader_test.sh
new file mode 100755
index 0000000..b307e8e
--- /dev/null
+++ b/src/test/shell/bazel/remote/remote_downloader_test.sh
@@ -0,0 +1,325 @@
+#!/usr/bin/env bash
+#
+# Copyright 2026 The Bazel Authors. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tests for the remote downlaader backed by the Remote Asset API.
+
+set -euo pipefail
+
+# --- begin runfiles.bash initialization ---
+if [[ ! -d "${RUNFILES_DIR:-/dev/null}" && ! -f "${RUNFILES_MANIFEST_FILE:-/dev/null}" ]]; then
+ if [[ -f "$0.runfiles_manifest" ]]; then
+ export RUNFILES_MANIFEST_FILE="$0.runfiles_manifest"
+ elif [[ -f "$0.runfiles/MANIFEST" ]]; then
+ export RUNFILES_MANIFEST_FILE="$0.runfiles/MANIFEST"
+ elif [[ -f "$0.runfiles/bazel_tools/tools/bash/runfiles/runfiles.bash" ]]; then
+ export RUNFILES_DIR="$0.runfiles"
+ fi
+fi
+if [[ -f "${RUNFILES_DIR:-/dev/null}/bazel_tools/tools/bash/runfiles/runfiles.bash" ]]; then
+ source "${RUNFILES_DIR}/bazel_tools/tools/bash/runfiles/runfiles.bash"
+elif [[ -f "${RUNFILES_MANIFEST_FILE:-/dev/null}" ]]; then
+ source "$(grep -m1 "^bazel_tools/tools/bash/runfiles/runfiles.bash " \
+ "$RUNFILES_MANIFEST_FILE" | cut -d ' ' -f 2-)"
+else
+ echo >&2 "ERROR: cannot find @bazel_tools//tools/bash/runfiles:runfiles.bash"
+ exit 1
+fi
+# --- end runfiles.bash initialization ---
+
+source "$(rlocation "io_bazel/src/test/shell/integration_test_setup.sh")" \
+ || { echo "integration_test_setup.sh not found!" >&2; exit 1; }
+source "$(rlocation "io_bazel/src/test/shell/bazel/remote_helpers.sh")" \
+ || { echo "remote_helpers.sh not found!" >&2; exit 1; }
+source "$(rlocation "io_bazel/src/test/shell/bazel/remote/remote_utils.sh")" \
+ || { echo "remote_utils.sh not found!" >&2; exit 1; }
+
+function set_up() {
+ start_worker
+}
+
+function tear_down() {
+ bazel clean >& $TEST_log
+ stop_worker
+ shutdown_server
+}
+
+function test_remote_downloader_http_archive() {
+ local archive_dir="${TEST_TMPDIR}/archive"
+ mkdir -p "${archive_dir}"
+ cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+ name = "files",
+ srcs = ["data.txt"],
+ visibility = ["//visibility:public"],
+)
+EOF
+ echo "Hello from remote archive" > "${archive_dir}/data.txt"
+ touch "${archive_dir}/REPO.bazel"
+
+ # Create the archive
+ local archive_file="${TEST_TMPDIR}/mylib.tar.gz"
+ tar -czf "${archive_file}" -C "${archive_dir}" .
+ local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+ # Serve the archive
+ serve_file "${archive_file}"
+
+ # Set up the workspace
+ mkdir -p main
+ cd main
+ cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "mylib",
+ url = "http://127.0.0.1:${nc_port}/served_file.$$",
+ sha256 = "${sha256}",
+ type = "tar.gz",
+)
+EOF
+
+ cat > BUILD.bazel <<'EOF'
+filegroup(
+ name = "test",
+ srcs = ["@mylib//:files"],
+)
+EOF
+
+ # Build using the remote downloader
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:test >& $TEST_log \
+ || fail "Failed to build with remote downloader"
+
+ # Verify the content was downloaded correctly
+ local output_base=$(bazel info output_base)
+ local output_file="${output_base}/external/+http_archive+mylib/data.txt"
+ assert_contains "Hello from remote archive" "${output_file}"
+}
+
+function test_remote_downloader_checksum_mismatch() {
+ # Test that a checksum mismatch from the remote downloader is handled correctly
+ local archive_dir="${TEST_TMPDIR}/archive2"
+ mkdir -p "${archive_dir}"
+ cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+ name = "data",
+ srcs = ["content.txt"],
+ visibility = ["//visibility:public"],
+)
+EOF
+ echo "Checksum mismatch content" > "${archive_dir}/content.txt"
+ touch "${archive_dir}/REPO.bazel"
+
+ local archive_file="${TEST_TMPDIR}/pkg.zip"
+ (cd "${archive_dir}" && zip -r "${archive_file}" .)
+ # Use a wrong checksum intentionally
+ local wrong_integrity="sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
+
+ serve_file "${archive_file}"
+
+ mkdir -p main2
+ cd main2
+ cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "pkg",
+ url = "http://127.0.0.1:${nc_port}/served_file.$$",
+ integrity = "${wrong_integrity}",
+ type = "zip",
+)
+EOF
+
+ cat > BUILD.bazel <<'EOF'
+filegroup(
+ name = "check",
+ srcs = ["@pkg//:data"],
+)
+EOF
+
+ # The build should fail due to checksum mismatch
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:check >& $TEST_log \
+ && fail "Expected build to fail due to checksum mismatch"
+
+ expect_log "Checksum was"
+ expect_log "${wrong_integrity}"
+}
+
+function test_remote_downloader_canonical_id() {
+ # Test that the canonical_id qualifier is respected - same URL with different
+ # canonical IDs should be treated as different entries
+
+ local archive_dir="${TEST_TMPDIR}/archive4"
+ mkdir -p "${archive_dir}"
+ cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+ name = "files",
+ srcs = ["version.txt"],
+ visibility = ["//visibility:public"],
+)
+EOF
+ echo "Version 1" > "${archive_dir}/version.txt"
+ touch "${archive_dir}/REPO.bazel"
+
+ local archive_file="${TEST_TMPDIR}/canonical.tar.gz"
+ tar -czf "${archive_file}" -C "${archive_dir}" .
+ local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+ serve_file "${archive_file}"
+
+ mkdir -p main4
+ cd main4
+
+ # First, fetch with one canonical ID
+ cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "canonical_repo",
+ url = "http://127.0.0.1:${nc_port}/served_file.$$",
+ sha256 = "${sha256}",
+ type = "tar.gz",
+ canonical_id = "version-1",
+)
+EOF
+
+ cat > BUILD.bazel <<'EOF'
+filegroup(
+ name = "test",
+ srcs = ["@canonical_repo//:files"],
+)
+EOF
+
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:test >& $TEST_log \
+ || fail "Failed first build with canonical_id"
+
+ # Verify the first content
+ local output_base=$(bazel info output_base)
+ local output_file="${output_base}/external/+http_archive+canonical_repo/version.txt"
+ assert_contains "Version 1" "${output_file}"
+
+ # Now update the archive with different content
+ echo "Version 2" > "${archive_dir}/version.txt"
+ tar -czf "${archive_file}" -C "${archive_dir}" .
+ local sha256_v2=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+ # Update the served file (serve_file copies to a different location)
+ cat "${archive_file}" > "${TEST_TMPDIR}/served_file.$$"
+
+ # Clean to force re-fetch with new repository definition
+ bazel clean >& $TEST_log
+
+ # Update MODULE.bazel with new canonical ID and checksum
+ cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "canonical_repo",
+ url = "http://127.0.0.1:${nc_port}/served_file.$$",
+ sha256 = "${sha256_v2}",
+ type = "tar.gz",
+ canonical_id = "version-2",
+)
+EOF
+
+ # Build again with the new canonical ID - the remote downloader should
+ # recognize this as a different request due to the different canonical_id
+ # and re-download the file
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:test >& $TEST_log \
+ || fail "Failed to build with new canonical_id"
+
+ # Verify the updated content was fetched
+ assert_contains "Version 2" "${output_file}"
+}
+
+function test_remote_downloader_caching() {
+ # Test that the remote downloader caches downloaded files - when the same
+ # URL with the same checksum is requested again, it should use the cached
+ # version without re-downloading.
+
+ local archive_dir="${TEST_TMPDIR}/archive5"
+ mkdir -p "${archive_dir}"
+ cat > "${archive_dir}/BUILD.bazel" <<'EOF'
+filegroup(
+ name = "files",
+ srcs = ["cached.txt"],
+ visibility = ["//visibility:public"],
+)
+EOF
+ echo "Cached content" > "${archive_dir}/cached.txt"
+ touch "${archive_dir}/REPO.bazel"
+
+ local archive_file="${TEST_TMPDIR}/cached.tar.gz"
+ tar -czf "${archive_file}" -C "${archive_dir}" .
+ local sha256=$(sha256sum "${archive_file}" | cut -f 1 -d ' ')
+
+ serve_file "${archive_file}"
+
+ mkdir -p main5
+ cd main5
+
+ cat > $(setup_module_dot_bazel) <<EOF
+http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+http_archive(
+ name = "cached_repo",
+ url = "http://127.0.0.1:${nc_port}/served_file.$$",
+ sha256 = "${sha256}",
+ type = "tar.gz",
+)
+EOF
+
+ cat > BUILD.bazel <<'EOF'
+filegroup(
+ name = "test",
+ srcs = ["@cached_repo//:files"],
+)
+EOF
+
+ # First build - should download from the origin
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:test >& $TEST_log \
+ || fail "Failed first build"
+
+ # Shut down the file server to verify that the second fetch uses cache
+ shutdown_server
+
+ # Clean Bazel's local cache but keep the remote worker running
+ bazel clean --expunge >& $TEST_log
+
+ # Second build - should use the cached version from the remote downloader
+ # (no HTTP server running, so it would fail if trying to re-download)
+ bazel build \
+ --remote_cache=grpc://localhost:${worker_port} \
+ --experimental_remote_downloader=grpc://localhost:${worker_port} \
+ //:test >& $TEST_log \
+ || fail "Failed second build - should have used cache"
+
+ # Verify the content
+ local output_base=$(bazel info output_base)
+ local output_file="${output_base}/external/+http_archive+cached_repo/cached.txt"
+ assert_contains "Cached content" "${output_file}"
+}
+
+run_suite "Remote downloader tests"
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
index a952ef9..f0cea66 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD
@@ -30,6 +30,7 @@
visibility = ["//src/tools/remote:__subpackages__"],
deps = [
"//src/main/java/com/google/devtools/build/lib/actions",
+ "//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader",
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/exec:bin_tools",
"//src/main/java/com/google/devtools/build/lib/exec/local",
@@ -65,6 +66,8 @@
"@googleapis//google/bytestream:bytestream_java_proto",
"@googleapis//google/longrunning:longrunning_java_proto",
"@googleapis//google/rpc:rpc_java_proto",
+ "@remoteapis//:build_bazel_remote_asset_v1_remote_asset_java_grpc",
+ "@remoteapis//:build_bazel_remote_asset_v1_remote_asset_java_proto",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_grpc",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
"@rules_java//java/runfiles",
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java
new file mode 100644
index 0000000..b120057
--- /dev/null
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/FetchServer.java
@@ -0,0 +1,332 @@
+// Copyright 2026 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.remote.worker;
+
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+
+import build.bazel.remote.asset.v1.FetchBlobRequest;
+import build.bazel.remote.asset.v1.FetchBlobResponse;
+import build.bazel.remote.asset.v1.FetchDirectoryRequest;
+import build.bazel.remote.asset.v1.FetchDirectoryResponse;
+import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
+import build.bazel.remote.asset.v1.Qualifier;
+import build.bazel.remote.execution.v2.Digest;
+import build.bazel.remote.execution.v2.RequestMetadata;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.flogger.GoogleLogger;
+import com.google.devtools.build.lib.bazel.repository.downloader.Checksum;
+import com.google.devtools.build.lib.bazel.repository.downloader.HashOutputStream;
+import com.google.devtools.build.lib.bazel.repository.downloader.UnrecoverableHttpException;
+import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
+import com.google.devtools.build.lib.remote.util.DigestOutputStream;
+import com.google.devtools.build.lib.remote.util.DigestUtil;
+import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.rpc.Code;
+import io.grpc.StatusException;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.Optional;
+import java.util.SequencedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+/** A basic implementation of a {@link FetchImplBase} service. */
+final class FetchServer extends FetchImplBase {
+ private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+
+ private static final String QUALIFIER_CANONICAL_ID = "bazel.canonical_id";
+ private static final String QUALIFIER_CHECKSUM_SRI = "checksum.sri";
+ private static final String QUALIFIER_HTTP_HEADER_PREFIX = "http_header:";
+ private static final String QUALIFIER_HTTP_HEADER_URL_PREFIX = "http_header_url:";
+
+ private final OnDiskBlobStoreCache cache;
+ private final DigestUtil digestUtil;
+ private final Path tempPath;
+ private final ConcurrentHashMap<CacheKey, CacheValue> knownUrls = new ConcurrentHashMap<>();
+
+ private record CacheKey(String url, @Nullable String canonicalId) {}
+
+ private record CacheValue(Digest digest, Instant downloadedAt) {}
+
+ private record Qualifiers(
+ @Nullable Checksum expectedChecksum,
+ ImmutableMap<String, String> globalHeaders,
+ ImmutableTable<Integer, String, String> urlSpecificHeaders,
+ @Nullable String canonicalId) {}
+
+ @Override
+ public void fetchBlob(
+ FetchBlobRequest request, StreamObserver<FetchBlobResponse> responseObserver) {
+ if (request.getUrisCount() == 0) {
+ responseObserver.onError(
+ StatusUtils.invalidArgumentError("uris", "at least one URI must be provided"));
+ return;
+ }
+
+ Qualifiers qualifiers;
+ try {
+ qualifiers = parseQualifiers(request.getQualifiersList());
+ } catch (StatusException e) {
+ responseObserver.onError(e);
+ return;
+ }
+
+ Instant cutoff =
+ request.hasOldestContentAccepted()
+ ? Instant.now()
+ .minus(Duration.ofSeconds(request.getOldestContentAccepted().getSeconds()))
+ : Instant.MIN;
+ Optional<Digest> cacheHit = checkCache(request.getUrisList(), qualifiers.canonicalId(), cutoff);
+ if (cacheHit.isPresent()) {
+ responseObserver.onNext(
+ FetchBlobResponse.newBuilder()
+ .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.OK_VALUE).build())
+ .setUri(request.getUris(0))
+ .setBlobDigest(cacheHit.get())
+ .setDigestFunction(digestUtil.getDigestFunction())
+ .build());
+ responseObserver.onCompleted();
+ return;
+ }
+
+ Path tempDownloadDir;
+ try {
+ tempPath.createDirectoryAndParents();
+ tempDownloadDir = tempPath.createTempDirectory("download-");
+ } catch (IOException e) {
+ responseObserver.onError(StatusUtils.internalError(e));
+ return;
+ }
+ try {
+ DownloadResult result = tryDownload(request, qualifiers, tempDownloadDir);
+
+ RequestMetadata requestMetadata = TracingMetadataUtils.fromCurrentContext();
+ RemoteActionExecutionContext context = RemoteActionExecutionContext.create(requestMetadata);
+ getFromFuture(cache.uploadFile(context, result.digest(), result.path()));
+ addToCache(result.uri(), qualifiers.canonicalId(), result.digest());
+
+ responseObserver.onNext(
+ FetchBlobResponse.newBuilder()
+ .setStatus(com.google.rpc.Status.newBuilder().setCode(Code.OK_VALUE).build())
+ .setUri(result.uri())
+ .setBlobDigest(result.digest())
+ .setDigestFunction(digestUtil.getDigestFunction())
+ .build());
+ responseObserver.onCompleted();
+ } catch (IOException e) {
+ responseObserver.onNext(
+ FetchBlobResponse.newBuilder()
+ .setStatus(
+ com.google.rpc.Status.newBuilder()
+ .setCode(determineCode(e).getNumber())
+ .setMessage("Failed to fetch from any URI: " + e.getMessage())
+ .build())
+ .setUri(request.getUris(0))
+ .build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ logger.atWarning().withCause(e).log("Failed to upload blob to CAS");
+ responseObserver.onError(StatusUtils.internalError(e));
+ } finally {
+ try {
+ tempDownloadDir.deleteTree();
+ } catch (IOException e) {
+ logger.atWarning().withCause(e).log(
+ "Failed to delete temporary download directory %s", tempDownloadDir);
+ }
+ }
+ }
+
+ @Override
+ public void fetchDirectory(
+ FetchDirectoryRequest request, StreamObserver<FetchDirectoryResponse> responseObserver) {
+ // FetchDirectory is not used by Bazel's GrpcRemoteDownloader client.
+ responseObserver.onError(
+ io.grpc.Status.UNIMPLEMENTED
+ .withDescription("FetchDirectory is not implemented")
+ .asRuntimeException());
+ }
+
+ public FetchServer(OnDiskBlobStoreCache cache, DigestUtil digestUtil, Path tempPath) {
+ this.cache = cache;
+ this.digestUtil = digestUtil;
+ this.tempPath = tempPath;
+ }
+
+ private static Qualifiers parseQualifiers(Iterable<Qualifier> qualifiersList)
+ throws StatusException {
+ Checksum expectedChecksum = null;
+ var globalHeaders = ImmutableMap.<String, String>builder();
+ var urlSpecificHeaders = ImmutableTable.<Integer, String, String>builder();
+ String canonicalId = null;
+
+ for (var qualifier : qualifiersList) {
+ String name = qualifier.getName();
+ String value = qualifier.getValue();
+
+ if (name.equals(QUALIFIER_CANONICAL_ID)) {
+ canonicalId = value;
+ } else if (name.equals(QUALIFIER_CHECKSUM_SRI)) {
+ try {
+ expectedChecksum = Checksum.fromSubresourceIntegrity(value);
+ } catch (Checksum.InvalidChecksumException e) {
+ throw StatusUtils.invalidArgumentError(
+ "qualifiers",
+ "invalid '%s' qualifier: %s".formatted(QUALIFIER_CHECKSUM_SRI, e.getMessage()));
+ }
+ } else if (name.startsWith(QUALIFIER_HTTP_HEADER_URL_PREFIX)) {
+ // Format: http_header_url:<url_index>:<header_name>
+ String remainder = name.substring(QUALIFIER_HTTP_HEADER_URL_PREFIX.length());
+ int colonIndex = remainder.indexOf(':');
+ if (colonIndex > 0) {
+ try {
+ int urlIndex = Integer.parseInt(remainder.substring(0, colonIndex));
+ String headerName = remainder.substring(colonIndex + 1);
+ urlSpecificHeaders.put(urlIndex, headerName, value);
+ } catch (NumberFormatException e) {
+ throw StatusUtils.invalidArgumentError(
+ "qualifiers",
+ "invalid '%s' qualifier: %s"
+ .formatted(QUALIFIER_HTTP_HEADER_URL_PREFIX, e.getMessage()));
+ }
+ }
+ } else if (name.startsWith(QUALIFIER_HTTP_HEADER_PREFIX)) {
+ String headerName = name.substring(QUALIFIER_HTTP_HEADER_PREFIX.length());
+ globalHeaders.put(headerName, value);
+ } else {
+ throw StatusUtils.invalidArgumentError(
+ "qualifiers", "unknown qualifier: '%s'".formatted(name));
+ }
+ }
+
+ return new Qualifiers(
+ expectedChecksum,
+ globalHeaders.buildOrThrow(),
+ urlSpecificHeaders.buildOrThrow(),
+ canonicalId);
+ }
+
+ private Optional<Digest> checkCache(
+ Iterable<String> uris, @Nullable String canonicalId, Instant cutoff) {
+ for (var uri : uris) {
+ var cacheValue = knownUrls.get(new CacheKey(uri, canonicalId));
+ if (cacheValue != null && cacheValue.downloadedAt.isAfter(cutoff)) {
+ return Optional.of(cacheValue.digest);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private void addToCache(String uri, @Nullable String canonicalId, Digest digest) {
+ knownUrls.put(new CacheKey(uri, canonicalId), new CacheValue(digest, Instant.now()));
+ }
+
+ private record DownloadResult(String uri, Path path, Digest digest) {}
+
+ private DownloadResult tryDownload(
+ FetchBlobRequest request, Qualifiers qualifiers, Path tempDownloadDir) throws IOException {
+ IOException lastException = null;
+
+ for (int i = 0; i < request.getUrisCount(); i++) {
+ String uri = request.getUris(i);
+ Path downloadPath = tempDownloadDir.getChild("attempt_" + i);
+ try {
+ var out = downloadPath.getOutputStream();
+ var digestOut =
+ new DigestOutputStream(
+ downloadPath.getFileSystem().getDigestFunction().getHashFunction(), out);
+ var maybeChecksumOut =
+ qualifiers.expectedChecksum() != null
+ ? new HashOutputStream(digestOut, qualifiers.expectedChecksum())
+ : digestOut;
+ try (maybeChecksumOut) {
+ var headers = new LinkedHashMap<>(qualifiers.globalHeaders());
+ headers.putAll(qualifiers.urlSpecificHeaders().row(i));
+ fetchFromUrl(
+ uri,
+ headers,
+ Duration.ofSeconds(request.getTimeout().getSeconds()),
+ maybeChecksumOut);
+ return new DownloadResult(uri, downloadPath, digestOut.digest());
+ }
+ } catch (IOException e) {
+ try {
+ downloadPath.delete();
+ } catch (IOException ex) {
+ logger.atWarning().withCause(ex).log(
+ "Failed to delete partially downloaded file %s", downloadPath);
+ }
+ lastException = e;
+ logger.atFine().withCause(e).log("Failed to fetch from %s", uri);
+ }
+ }
+
+ throw lastException != null ? lastException : new IOException("No URIs to fetch");
+ }
+
+ private Code determineCode(@Nullable IOException lastException) {
+ return switch (lastException) {
+ case SocketTimeoutException e -> Code.DEADLINE_EXCEEDED;
+ case FileNotFoundException e -> Code.NOT_FOUND;
+ // See HashOutputStream#verifyHash.
+ case UnrecoverableHttpException e when e.getMessage().startsWith("Checksum was ") ->
+ Code.ABORTED;
+ case null, default -> Code.UNKNOWN;
+ };
+ }
+
+ private void fetchFromUrl(
+ String urlString, SequencedMap<String, String> headers, Duration timeout, OutputStream out)
+ throws IOException {
+ HttpURLConnection connection;
+ try {
+ connection = (HttpURLConnection) new URI(urlString).toURL().openConnection();
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid URI: " + urlString, e);
+ }
+ var timeoutMillis = timeout.equals(Duration.ZERO) ? 30000 : (int) timeout.toMillis();
+ try {
+ connection.setRequestMethod("GET");
+ connection.setConnectTimeout(timeoutMillis);
+ connection.setReadTimeout(timeoutMillis);
+ headers.forEach(connection::setRequestProperty);
+
+ int responseCode = connection.getResponseCode();
+ if (responseCode != HttpURLConnection.HTTP_OK) {
+ throw new IOException("HTTP request failed with status " + responseCode);
+ }
+
+ try (var in = connection.getInputStream()) {
+ in.transferTo(out);
+ }
+ } finally {
+ connection.disconnect();
+ }
+ }
+}
diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
index f2fbced..69ddc65 100644
--- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
+++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/RemoteWorker.java
@@ -18,6 +18,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
+import build.bazel.remote.asset.v1.FetchGrpc.FetchImplBase;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheImplBase;
import build.bazel.remote.execution.v2.ActionResult;
import build.bazel.remote.execution.v2.CapabilitiesGrpc.CapabilitiesImplBase;
@@ -105,6 +106,7 @@
private final ContentAddressableStorageImplBase casServer;
private final ExecutionImplBase execServer;
private final CapabilitiesImplBase capabilitiesServer;
+ private final FetchImplBase fetchServer;
static FileSystem getFileSystem() {
final DigestHashFunction hashFunction;
@@ -206,6 +208,7 @@
execServer = null;
}
this.capabilitiesServer = new CapabilitiesServer(digestUtil, execServer != null, workerOptions);
+ this.fetchServer = new FetchServer(cache, digestUtil, workPath.getRelative("fetch-temp"));
}
public Server startServer() throws IOException {
@@ -223,7 +226,8 @@
.addService(ServerInterceptors.intercept(actionCacheServer, interceptors))
.addService(ServerInterceptors.intercept(bsServer, interceptors))
.addService(ServerInterceptors.intercept(casServer, interceptors))
- .addService(ServerInterceptors.intercept(capabilitiesServer, interceptors));
+ .addService(ServerInterceptors.intercept(capabilitiesServer, interceptors))
+ .addService(ServerInterceptors.intercept(fetchServer, interceptors));
if (workerOptions.tlsCertificate != null) {
b.sslContext(getSslContextBuilder(workerOptions).build());