| #!/usr/bin/env python3 |
| # |
| # Copyright 2025 The Bazel Authors. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import hashlib |
| import logging |
| import os |
| import re |
| import subprocess |
| import sys |
| import tempfile |
| from collections import namedtuple |
| from typing import Dict, Set |
| |
| import requests |
| from bazelci import BuildkiteClient, BuildkiteException, execute_command |
| |
| # --- Color Constants for Terminal Output --- |
| # These work best in terminals that support ANSI escape codes. |
| class Colors: |
| SUCCESS = "\033[92m" # Green |
| SKIPPED = "\033[93m" # Yellow |
| FAILED = "\033[91m" # Red |
| RESET = "\033[0m" # Reset color |
| |
| |
| # --- Constants --- |
| GCS_BUCKET = "bazel-mirror" |
| BUILDKITE_ORG = "bazel" |
| BUILDKITE_PIPELINE = "bazel-bazel" |
| URL_RE = re.compile( # Matches URLs with optional URL-encoded characters |
| r"Download from (https?://mirror\.bazel\.build\S+)\s+failed: class java.io.FileNotFoundException GET returned 404 Not Found" |
| ) |
| |
| # A structured way to represent the result of a mirroring operation. |
| MirrorResult = namedtuple("MirrorResult", ["status", "url", "reason"]) |
| |
| |
| def setup_logging(level=logging.INFO): |
| """Configures basic logging for the script.""" |
| logging.basicConfig( |
| level=level, |
| format="%(asctime)s - %(levelname)-8s - %(message)s", |
| stream=sys.stdout, |
| ) |
| |
| |
| def get_latest_build(client: BuildkiteClient) -> Dict: |
| """Returns the latest finished build object for the master branch.""" |
| builds = client.get_build_info_list( |
| params=[("per_page", 1), ("branch", "master"), ("state", "finished")] |
| ) |
| if not builds: |
| raise RuntimeError( |
| f"No finished builds found for pipeline '{client.pipeline}' on branch 'master'" |
| ) |
| return builds[0] |
| |
| |
| def parse_urls_from_logs(logs: str) -> Set[str]: |
| """Parses failed download URLs from the given logs.""" |
| found_urls = URL_RE.findall(logs) |
| # URL-decode the found URLs to handle characters like %2B |
| decoded_urls = {requests.utils.unquote(url) for url in found_urls} |
| return decoded_urls |
| |
| |
| def mirror_url(url: str, bucket: str) -> MirrorResult: |
| """ |
| Mirrors a single URL to the GCS bucket and returns the result. |
| """ |
| logging.info(f"Processing URL: {url}") |
| source_url = url |
| mirror_prefix = "https://mirror.bazel.build/" |
| if source_url.startswith(mirror_prefix): |
| source_url = "https://" + source_url[len(mirror_prefix) :] |
| logging.debug(f"URL is on mirror; translating to source: {source_url}") |
| |
| target_path = source_url.split("://", 1)[1] |
| gcs_url = f"gs://{bucket}/{target_path}" |
| |
| try: |
| execute_command(["gsutil", "-q", "stat", gcs_url]) |
| return MirrorResult("SKIPPED", gcs_url, "Artifact already exists") |
| except subprocess.CalledProcessError: |
| logging.debug("Artifact not found in GCS, proceeding with mirror...") |
| except Exception as e: |
| return MirrorResult("FAILED", gcs_url, f"GCS check failed: {e}") |
| |
| temp_filename = None |
| try: |
| with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
| temp_filename = temp_file.name |
| logging.debug(f"Downloading {source_url} to {temp_filename}...") |
| response = requests.get(source_url, stream=True, timeout=300) |
| response.raise_for_status() |
| |
| hasher = hashlib.sha256() |
| for chunk in response.iter_content(chunk_size=8192): |
| temp_file.write(chunk) |
| hasher.update(chunk) |
| logging.debug(f"Download complete. SHA256: {hasher.hexdigest()}") |
| |
| execute_command(["gsutil", "cp", temp_filename, gcs_url]) |
| execute_command( |
| ["gsutil", "setmeta", "-h", "Cache-Control:public, max-age=31536000", gcs_url] |
| ) |
| return MirrorResult("SUCCESS", gcs_url, "") |
| except (requests.exceptions.RequestException, subprocess.CalledProcessError) as e: |
| return MirrorResult("FAILED", source_url, str(e)) |
| finally: |
| if temp_filename and os.path.exists(temp_filename): |
| os.remove(temp_filename) |
| logging.debug(f"Cleaned up temporary file: {temp_filename}") |
| |
| |
| def get_urls_from_buildkite(client: BuildkiteClient) -> Set[str]: |
| """Fetches logs from the latest build and parses them for failed URLs.""" |
| latest_build = get_latest_build(client) |
| build_number = latest_build["number"] |
| logging.info(f"Found latest build: #{build_number} ({latest_build['web_url']})") |
| |
| logging.info(f"Fetching and parsing logs for build #{build_number}...") |
| all_urls_to_mirror: Set[str] = set() |
| for job in latest_build.get("jobs", []): |
| if job.get("raw_log_url"): |
| job_id = job.get("id", "N/A") |
| try: |
| log_content = client.get_build_log(job) |
| if not log_content: |
| logging.warning(f"Log content for job {job_id} is empty. Skipping.") |
| continue |
| |
| urls_in_job = parse_urls_from_logs(log_content) |
| if urls_in_job: |
| job_url = job.get("web_url", f"job_id: {job_id}") |
| logging.info(f"Found {len(urls_in_job)} failed URL(s) in job: {job_url}") |
| all_urls_to_mirror.update(urls_in_job) |
| |
| except BuildkiteException as e: |
| logging.error(f"Failed to fetch log for job ID {job_id}: {e}") |
| # Continue to next job instead of aborting all |
| |
| return all_urls_to_mirror |
| |
| |
| def mirror_artifacts(urls_to_mirror: Set[str], bucket: str): |
| """Mirrors a set of URLs and prints a final summary.""" |
| if not urls_to_mirror: |
| logging.info("No failed download URLs found. Nothing to do.") |
| return |
| |
| logging.info( |
| f"\nFound a total of {len(urls_to_mirror)} unique URLs to mirror." |
| ) |
| results = [mirror_url(url, bucket) for url in sorted(list(urls_to_mirror))] |
| |
| successes = [r for r in results if r.status == "SUCCESS"] |
| skips = [r for r in results if r.status == "SKIPPED"] |
| failures = [r for r in results if r.status == "FAILED"] |
| |
| # --- Final Summary --- |
| summary_message = ( |
| f"Mirroring complete. " |
| f"Success: {len(successes)}, Skipped: {len(skips)}, Failed: {len(failures)}" |
| ) |
| logging.info("\n" + "=" * len(summary_message)) |
| logging.info("Mirroring Summary") |
| logging.info("=" * len(summary_message)) |
| |
| for r in successes: |
| logging.info(f"{Colors.SUCCESS}SUCCESS: {r.url}{Colors.RESET}") |
| for r in skips: |
| logging.warning(f"{Colors.SKIPPED}SKIPPED: {r.url} ({r.reason}){Colors.RESET}") |
| for r in failures: |
| logging.error(f"{Colors.FAILED}FAILED: {r.url} - Reason: {r.reason}{Colors.RESET}") |
| |
| if failures: |
| logging.critical("Some artifacts failed to mirror. See errors above.") |
| sys.exit(1) |
| |
| |
| def main(): |
| """Main execution function.""" |
| setup_logging() |
| |
| try: |
| client = BuildkiteClient(org=BUILDKITE_ORG, pipeline=BUILDKITE_PIPELINE) |
| if "BUILDKITE_API_TOKEN" in os.environ: |
| client._token = os.environ["BUILDKITE_API_TOKEN"] |
| |
| urls = get_urls_from_buildkite(client) |
| mirror_artifacts(urls, GCS_BUCKET) |
| |
| except (RuntimeError, BuildkiteException) as e: |
| logging.critical(f"A critical error occurred: {e}") |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| main() |