blob: 31e06ce540e04e44e56176d6d699cb4538ffccbc [file] [log] [blame] [edit]
#!/usr/bin/env python3
#
# Copyright 2025 The Bazel Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import logging
import os
import re
import subprocess
import sys
import tempfile
from collections import namedtuple
from typing import Dict, Set
import requests
from bazelci import BuildkiteClient, BuildkiteException, execute_command
# --- Color Constants for Terminal Output ---
# These work best in terminals that support ANSI escape codes.
class Colors:
SUCCESS = "\033[92m" # Green
SKIPPED = "\033[93m" # Yellow
FAILED = "\033[91m" # Red
RESET = "\033[0m" # Reset color
# --- Constants ---
GCS_BUCKET = "bazel-mirror"
BUILDKITE_ORG = "bazel"
BUILDKITE_PIPELINE = "bazel-bazel"
URL_RE = re.compile( # Matches URLs with optional URL-encoded characters
r"Download from (https?://mirror\.bazel\.build\S+)\s+failed: class java.io.FileNotFoundException GET returned 404 Not Found"
)
# A structured way to represent the result of a mirroring operation.
MirrorResult = namedtuple("MirrorResult", ["status", "url", "reason"])
def setup_logging(level=logging.INFO):
"""Configures basic logging for the script."""
logging.basicConfig(
level=level,
format="%(asctime)s - %(levelname)-8s - %(message)s",
stream=sys.stdout,
)
def get_latest_build(client: BuildkiteClient) -> Dict:
"""Returns the latest finished build object for the master branch."""
builds = client.get_build_info_list(
params=[("per_page", 1), ("branch", "master"), ("state", "finished")]
)
if not builds:
raise RuntimeError(
f"No finished builds found for pipeline '{client.pipeline}' on branch 'master'"
)
return builds[0]
def parse_urls_from_logs(logs: str) -> Set[str]:
"""Parses failed download URLs from the given logs."""
found_urls = URL_RE.findall(logs)
# URL-decode the found URLs to handle characters like %2B
decoded_urls = {requests.utils.unquote(url) for url in found_urls}
return decoded_urls
def mirror_url(url: str, bucket: str) -> MirrorResult:
"""
Mirrors a single URL to the GCS bucket and returns the result.
"""
logging.info(f"Processing URL: {url}")
source_url = url
mirror_prefix = "https://mirror.bazel.build/"
if source_url.startswith(mirror_prefix):
source_url = "https://" + source_url[len(mirror_prefix) :]
logging.debug(f"URL is on mirror; translating to source: {source_url}")
target_path = source_url.split("://", 1)[1]
gcs_url = f"gs://{bucket}/{target_path}"
try:
execute_command(["gsutil", "-q", "stat", gcs_url])
return MirrorResult("SKIPPED", gcs_url, "Artifact already exists")
except subprocess.CalledProcessError:
logging.debug("Artifact not found in GCS, proceeding with mirror...")
except Exception as e:
return MirrorResult("FAILED", gcs_url, f"GCS check failed: {e}")
temp_filename = None
try:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_filename = temp_file.name
logging.debug(f"Downloading {source_url} to {temp_filename}...")
response = requests.get(source_url, stream=True, timeout=300)
response.raise_for_status()
hasher = hashlib.sha256()
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
hasher.update(chunk)
logging.debug(f"Download complete. SHA256: {hasher.hexdigest()}")
execute_command(["gsutil", "cp", temp_filename, gcs_url])
execute_command(
["gsutil", "setmeta", "-h", "Cache-Control:public, max-age=31536000", gcs_url]
)
return MirrorResult("SUCCESS", gcs_url, "")
except (requests.exceptions.RequestException, subprocess.CalledProcessError) as e:
return MirrorResult("FAILED", source_url, str(e))
finally:
if temp_filename and os.path.exists(temp_filename):
os.remove(temp_filename)
logging.debug(f"Cleaned up temporary file: {temp_filename}")
def get_urls_from_buildkite(client: BuildkiteClient) -> Set[str]:
"""Fetches logs from the latest build and parses them for failed URLs."""
latest_build = get_latest_build(client)
build_number = latest_build["number"]
logging.info(f"Found latest build: #{build_number} ({latest_build['web_url']})")
logging.info(f"Fetching and parsing logs for build #{build_number}...")
all_urls_to_mirror: Set[str] = set()
for job in latest_build.get("jobs", []):
if job.get("raw_log_url"):
job_id = job.get("id", "N/A")
try:
log_content = client.get_build_log(job)
if not log_content:
logging.warning(f"Log content for job {job_id} is empty. Skipping.")
continue
urls_in_job = parse_urls_from_logs(log_content)
if urls_in_job:
job_url = job.get("web_url", f"job_id: {job_id}")
logging.info(f"Found {len(urls_in_job)} failed URL(s) in job: {job_url}")
all_urls_to_mirror.update(urls_in_job)
except BuildkiteException as e:
logging.error(f"Failed to fetch log for job ID {job_id}: {e}")
# Continue to next job instead of aborting all
return all_urls_to_mirror
def mirror_artifacts(urls_to_mirror: Set[str], bucket: str):
"""Mirrors a set of URLs and prints a final summary."""
if not urls_to_mirror:
logging.info("No failed download URLs found. Nothing to do.")
return
logging.info(
f"\nFound a total of {len(urls_to_mirror)} unique URLs to mirror."
)
results = [mirror_url(url, bucket) for url in sorted(list(urls_to_mirror))]
successes = [r for r in results if r.status == "SUCCESS"]
skips = [r for r in results if r.status == "SKIPPED"]
failures = [r for r in results if r.status == "FAILED"]
# --- Final Summary ---
summary_message = (
f"Mirroring complete. "
f"Success: {len(successes)}, Skipped: {len(skips)}, Failed: {len(failures)}"
)
logging.info("\n" + "=" * len(summary_message))
logging.info("Mirroring Summary")
logging.info("=" * len(summary_message))
for r in successes:
logging.info(f"{Colors.SUCCESS}SUCCESS: {r.url}{Colors.RESET}")
for r in skips:
logging.warning(f"{Colors.SKIPPED}SKIPPED: {r.url} ({r.reason}){Colors.RESET}")
for r in failures:
logging.error(f"{Colors.FAILED}FAILED: {r.url} - Reason: {r.reason}{Colors.RESET}")
if failures:
logging.critical("Some artifacts failed to mirror. See errors above.")
sys.exit(1)
def main():
"""Main execution function."""
setup_logging()
try:
client = BuildkiteClient(org=BUILDKITE_ORG, pipeline=BUILDKITE_PIPELINE)
if "BUILDKITE_API_TOKEN" in os.environ:
client._token = os.environ["BUILDKITE_API_TOKEN"]
urls = get_urls_from_buildkite(client)
mirror_artifacts(urls, GCS_BUCKET)
except (RuntimeError, BuildkiteException) as e:
logging.critical(f"A critical error occurred: {e}")
sys.exit(1)
if __name__ == "__main__":
main()