Collect CI metrics and push to BigQuery (#2509)
- Add hooks to capture boot time in every agent (Linux and Windows)
- Collect CI metrics from BEP file and BuildKite
- The collection and push to bigquery logic is called after the test
logic is done.
The code here had been running in the testing branch for a few days in
[bazel pipeline in the testing org
](https://buildkite.com/bazel-testing/bazel-bazel)
And the collected metrics can be found in
`bazel-public.bazel_ci_metrics.ci_builds`
diff --git a/buildkite/bazelci.py b/buildkite/bazelci.py
index 91461ee..dde9f50 100755
--- a/buildkite/bazelci.py
+++ b/buildkite/bazelci.py
@@ -1602,6 +1602,24 @@
],
fail_if_nonzero=False,
)
+ if is_trueish(os.environ.get("ENABLE_METRICS_COLLECTION", "false")):
+ try:
+ from collect_metrics import collect_metrics_and_push_to_bigquery
+ collect_metrics_and_push_to_bigquery(test_bep_file)
+ except Exception as e:
+ eprint(f"Failed to upload metrics: {e}")
+ job_url = f"{os.getenv('BUILDKITE_BUILD_URL')}#{os.getenv('BUILDKITE_JOB_ID')}"
+ execute_command(
+ [
+ "buildkite-agent",
+ "annotate",
+ "--style=warning",
+ f"Failed to upload metrics from [this job]({job_url})",
+ "--context",
+ "ctx-metrics_upload_failed",
+ ],
+ fail_if_nonzero=False,
+ )
_ = future.result()
# TODO: print results
@@ -3006,7 +3024,7 @@
def create_docker_step(label, image, commands=None, additional_env_vars=None, queue="default"):
- env = ["ANDROID_HOME", "ANDROID_NDK_HOME", "BUILDKITE_ARTIFACT_UPLOAD_DESTINATION"]
+ env = ["ANDROID_HOME", "ANDROID_NDK_HOME", "BUILDKITE_ARTIFACT_UPLOAD_DESTINATION", "CHECKOUT_DURATION_S", "PREP_DURATION_S"]
if THIS_IS_TRUSTED:
# For the trusted Linux arm64 machine to upload artifacts
env += ["GOOGLE_APPLICATION_CREDENTIALS"]
diff --git a/buildkite/collect_metrics.py b/buildkite/collect_metrics.py
new file mode 100644
index 0000000..3a5da43
--- /dev/null
+++ b/buildkite/collect_metrics.py
@@ -0,0 +1,359 @@
+import os
+import json
+import base64
+import re
+import subprocess
+import urllib
+import urllib.request
+import tempfile
+
+import collections
+import dataclasses
+import datetime
+from typing import List
+
+import bazelci
+
+@dataclasses.dataclass
+class JobTimestamps:
+ created_at: str = None
+ started_at: str = None
+ finished_at: str = None
+
+@dataclasses.dataclass
+class TestTarget:
+ label: str
+ status: str
+ duration_s: float
+ shard_count: int = 1
+ shard_durations: List[float] = dataclasses.field(default_factory=list)
+
+# --- BigQuery Configuration constants ---
+PROJECT_ID = "bazel-public"
+DATASET_ID = "bazel_ci_metrics"
+TABLE_ID = "ci_builds"
+
+@dataclasses.dataclass
+class BuildMetrics:
+ wall_time_ms: int = 0
+ critical_path_s: float = 0.0
+ remote_and_disk_cache_hits: int = 0
+ total_actions: int = 0
+ output_size_bytes: int = 0
+ bytes_downloaded: int = 0
+ failed_test_count: int = 0
+ exit_code: int = 0
+ targets: List[TestTarget] = dataclasses.field(default_factory=list)
+
+def print_and_annotate_warning(message):
+ """
+ Prints a warning to the logs and annotates the Buildkite UI so it's visible on the build page.
+ """
+ bazelci.eprint(message)
+ try:
+ job_url = f"{os.getenv('BUILDKITE_BUILD_URL')}#{os.getenv('BUILDKITE_JOB_ID')}"
+ bazelci.execute_command(
+ [
+ "buildkite-agent",
+ "annotate",
+ "--style=warning",
+ f"{message} (for [this job]({job_url}))",
+ "--context",
+ "ctx-metrics_upload_failed",
+ ],
+ fail_if_nonzero=False,
+ )
+ except Exception as e:
+ bazelci.eprint(f"Failed to annotate Buildkite: {e}")
+
+
+def fetch_job_timestamps(org_slug, pipeline_slug, build_number, job_id):
+ """
+ Fetches real timestamps from Buildkite API for the current job.
+ Returns:
+ JobTimestamps: An object containing created_at, started_at, and finished_at strings.
+ """
+ try:
+ client = bazelci.BuildkiteClient(org_slug, pipeline_slug)
+ build_data = client.get_build_info(build_number)
+ for job in build_data.get("jobs", []):
+ if job.get("id") == job_id:
+ # If the job is still running when this script executes (which is typical
+ # when called from bazelci.py), finished_at is None. We fallback to the
+ # current time so Grafana queries have a valid timestamp for metrics.
+ finished_at = job.get("finished_at")
+ if not finished_at:
+ finished_at = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ return JobTimestamps(
+ created_at=job.get("created_at"),
+ started_at=job.get("started_at"),
+ finished_at=finished_at,
+ )
+ except Exception as e:
+ bazelci.eprint(f"Warning: Failed to fetch job timestamps: {e}")
+
+ return JobTimestamps()
+
+
+def get_git_stats(target_dir="."):
+ """Gets the number of files changed between HEAD and HEAD~1."""
+ try:
+ # Use git show instead of diff HEAD~1 because PRs might be squashed or shallow cloned
+ # --shortstat gives a summary of "X files changed" includes NEW and DELETED files too
+ output = subprocess.check_output(
+ ["git", "show", "--shortstat", "--format="],
+ cwd=target_dir,
+ text=True,
+ stderr=subprocess.STDOUT,
+ ).strip()
+ # Output format: " 1 file changed, 1 insertion(+)"
+ match = re.search(r"(\d+)\s+file[s]?\s+changed", output)
+ if match:
+ return int(match.group(1))
+ except Exception as e:
+ bazelci.eprint(f"Warning: Git diff failed ({e}). Defaulting changed_files to large value (9999).")
+
+ # This will be passed by the cache hit metric, as it only considers small changes.
+ # but we can also return -1 and update Grafana to skip these values.
+ return 9999
+
+
+def extract_critical_path(build_tool_logs):
+ """
+ Decodes the 'critical path' log from Base64 and extracts the duration.
+ """
+ for log in build_tool_logs:
+ if log.get("name") == "critical path":
+ try:
+ content_b64 = log.get("contents")
+ if not content_b64:
+ continue
+ content_str = base64.b64decode(content_b64).decode("utf-8")
+ match = re.search(r"Critical Path: ([\d\.]+)s", content_str)
+ if match:
+ return float(match.group(1))
+ except Exception as e:
+ bazelci.eprint(f"Error parsing critical path log: {e}")
+ return 0.0
+
+
+def parse_bep(filepath):
+ """
+ Parses the Build Event Protocol (BEP) JSON file to extract build metrics and targets.
+
+ Returns:
+ BuildMetrics: An object containing aggregated build metrics and test targets.
+ None: If the file does not exist.
+ """
+
+ if not os.path.exists(filepath):
+ bazelci.eprint(f"Error: BEP file not found at {filepath}")
+ return None
+
+ build_metrics = BuildMetrics()
+ target_map = collections.defaultdict(list)
+ target_status = {}
+
+ with open(filepath, "r") as f:
+ for line_num, line in enumerate(f, 1):
+ if not line.strip():
+ continue
+ try:
+ event = json.loads(line)
+ except json.JSONDecodeError:
+ bazelci.eprint(f"Skipping invalid JSON line at line {line_num}")
+ continue
+
+ event_id = event.get("id", {})
+
+ # --- 1. Test Results ---
+ if "testResult" in event:
+ data = event["testResult"]
+ label = event_id.get("testResult", {}).get("label")
+
+ if label:
+ duration_ms = int(data.get("testAttemptDurationMillis", 0))
+ duration_s = duration_ms / 1000.0
+ target_map[label].append(duration_s)
+
+ # Status (PASSED, FAILED, FLAKY)
+ current_status = data.get("status", "UNKNOWN")
+ if label not in target_status or current_status != "PASSED":
+ target_status[label] = current_status
+
+ if current_status != "PASSED":
+ build_metrics.failed_test_count += 1
+
+ # --- 2. Build Metrics ---
+ elif "buildMetrics" in event:
+ buildMetrics = event["buildMetrics"]
+ build_metrics.wall_time_ms = int(
+ buildMetrics.get("timingMetrics", {}).get("wallTimeInMs", 0)
+ )
+
+ action_summary = buildMetrics.get("actionSummary", {})
+ build_metrics.total_actions = int(action_summary.get("actionsExecuted", 0))
+
+ for runner in action_summary.get("runnerCount", []):
+ name = runner.get("name", "").lower()
+ if "remote cache hit" in name or "disk cache hit" in name:
+ build_metrics.remote_and_disk_cache_hits += int(runner.get("count", 0))
+
+ artifacts = buildMetrics.get("artifactMetrics", {})
+ build_metrics.output_size_bytes = int(
+ artifacts.get("topLevelArtifacts", {}).get("sizeInBytes", 0)
+ )
+ if build_metrics.output_size_bytes == 0:
+ build_metrics.output_size_bytes = int(
+ artifacts.get("outputArtifactsSeen", {}).get("sizeInBytes", 0)
+ )
+
+ # Network
+ net = buildMetrics.get("networkMetrics", {}).get("systemNetworkStats", {})
+ build_metrics.bytes_downloaded = int(net.get("bytesRecv", 0))
+
+ # --- 3. Build Tool Logs ---
+ elif "buildToolLogs" in event:
+ logs = event["buildToolLogs"].get("log", [])
+ build_metrics.critical_path_s = extract_critical_path(logs)
+
+ # --- Build Finished (Exit Code) ---
+ if "buildFinished" in event_id:
+ exit_data = event.get("finished").get("exitCode", {})
+ build_metrics.exit_code = int(exit_data.get("code", 0))
+
+ # --- 4. Post-Process Nested Targets ---
+ for label, shards in target_map.items():
+ build_metrics.targets.append(
+ TestTarget(
+ label=label,
+ status=target_status.get(label, "UNKNOWN"),
+ duration_s=max(shards) if shards else 0.0,
+ shard_count=len(shards),
+ shard_durations=shards,
+ )
+ )
+
+ return build_metrics
+
+
+def publish_to_bigquery(row):
+ """
+ Pushes a single row to BigQuery using the 'bq' CLI tool via subprocess.
+ """
+
+ bazelci.eprint(f"Publishing Metrics to BigQuery ...")
+ table_ref = f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}"
+ bq_cmd = "bq.cmd" if bazelci.is_windows() else "bq"
+
+ try:
+ with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tf:
+ json.dump(row, tf)
+ tf.write("\n")
+ temp_path = tf.name
+
+ result = subprocess.run(
+ [bq_cmd, "insert", table_ref, temp_path],
+ capture_output=True,
+ text=True,
+ check=False
+ )
+
+ if result.returncode != 0:
+ print_and_annotate_warning(f"BigQuery CLI Insert Error:\nSTDOUT: {result.stdout}\nSTDERR: {result.stderr}")
+ return
+
+ bazelci.eprint("Success: Metrics pushed to BigQuery via CLI.")
+
+ except Exception as e:
+ eprint_and_annotate_warning(f"Failed to execute bq CLI: {e}")
+ finally:
+ if 'temp_path' in locals() and os.path.exists(temp_path):
+ os.remove(temp_path)
+
+def collect_metrics_and_push_to_bigquery(bep_file_path):
+ """
+ Reads the BEP file, collects environment variables, and pushes metrics to BigQuery.
+ Called from bazelci.py after the build finishes.
+ """
+
+ bazelci.eprint(f"Collecting CI Metrics ...")
+
+ # --- Configuration (Read Env Vars inside function) ---
+ BUILDKITE_BUILD_ID = os.getenv("BUILDKITE_BUILD_ID")
+ BUILDKITE_BUILD_NUMBER = int(os.getenv("BUILDKITE_BUILD_NUMBER"))
+ BUILDKITE_JOB_ID = os.getenv("BUILDKITE_JOB_ID")
+ BUILDKITE_LABEL = os.getenv("BUILDKITE_LABEL")
+ PIPELINE = os.getenv("BUILDKITE_PIPELINE_SLUG")
+ ORG = os.getenv("BUILDKITE_ORGANIZATION_SLUG")
+ REPO = os.getenv("BUILDKITE_REPO")
+ PLATFORM = os.getenv("BUILDKITE_AGENT_META_DATA_OS")
+ AGENT_ID = os.getenv("BUILDKITE_AGENT_ID")
+ BRANCH = os.getenv("BUILDKITE_BRANCH", "main")
+ COMMIT_SHA = os.getenv("BUILDKITE_COMMIT")
+ RETRY_COUNT = int(os.getenv("BUILDKITE_RETRY_COUNT", "0"))
+
+ # Injected via webhooks
+ try:
+ CHECKOUT_DURATION_S = float(os.getenv("CHECKOUT_DURATION_S"))
+ except (ValueError, TypeError):
+ CHECKOUT_DURATION_S = None
+ try:
+ PREP_DURATION_S = float(os.getenv("PREP_DURATION_S"))
+ except (ValueError, TypeError):
+ PREP_DURATION_S = None
+
+ # Calculate Changed Files
+ CHANGED_FILES_COUNT = get_git_stats()
+
+ # Parse BEP Data
+ build_metrics = parse_bep(bep_file_path)
+ if build_metrics is None:
+ print_and_annotate_warning("Skipping BigQuery push due to BEP parsing failure.")
+ return
+
+ # Get Timestamps & calculate Queue time
+ timestamps = fetch_job_timestamps(
+ ORG, PIPELINE, BUILDKITE_BUILD_NUMBER, BUILDKITE_JOB_ID
+ )
+ queue_duration = 0.0
+ if timestamps.created_at and timestamps.started_at:
+ try:
+ created_dt = datetime.datetime.fromisoformat(timestamps.created_at.replace("Z", "+00:00"))
+ started_dt = datetime.datetime.fromisoformat(timestamps.started_at.replace("Z", "+00:00"))
+ queue_duration = (started_dt - created_dt).total_seconds()
+ except Exception as e:
+ bazelci.eprint(f"Warning: Could not parse timestamps: {e}")
+
+ # Construct BigQuery Row
+ row = {
+ "build_id": BUILDKITE_BUILD_ID,
+ "build_number": BUILDKITE_BUILD_NUMBER,
+ "job_id": BUILDKITE_JOB_ID,
+ "job_label": BUILDKITE_LABEL,
+ "finished_at": timestamps.finished_at,
+ "created_at": timestamps.created_at,
+ "started_at": timestamps.started_at,
+ "pipeline": PIPELINE,
+ "platform": PLATFORM,
+ "agent_id": AGENT_ID,
+ "branch": BRANCH,
+ "repo": REPO,
+ "commit_sha": COMMIT_SHA,
+ "exit_code": build_metrics.exit_code,
+ "failed_test_count": build_metrics.failed_test_count,
+ "retry_count": RETRY_COUNT,
+ "wall_time_s": build_metrics.wall_time_ms / 1000.0,
+ "critical_path_s": build_metrics.critical_path_s,
+ "queue_duration_s": queue_duration,
+ "checkout_duration_s": CHECKOUT_DURATION_S,
+ "prep_duration_s": PREP_DURATION_S,
+ "remote_and_disk_cache_hits": build_metrics.remote_and_disk_cache_hits,
+ "total_actions": build_metrics.total_actions,
+ "output_size_bytes": build_metrics.output_size_bytes,
+ "bytes_downloaded": build_metrics.bytes_downloaded,
+ "changed_files_count": CHANGED_FILES_COUNT,
+ "targets": [dataclasses.asdict(t) for t in build_metrics.targets],
+ }
+
+ publish_to_bigquery(row)
diff --git a/buildkite/collect_metrics_test.py b/buildkite/collect_metrics_test.py
new file mode 100644
index 0000000..33f622a
--- /dev/null
+++ b/buildkite/collect_metrics_test.py
@@ -0,0 +1,256 @@
+import unittest
+from unittest.mock import patch, MagicMock, mock_open
+import os
+import json
+import base64
+
+import collect_metrics
+
+
+class TestPublishMetrics(unittest.TestCase):
+
+ def setUp(self):
+ # Reset environment variables before each test
+ self.original_environ = os.environ.copy()
+ os.environ.clear()
+
+ def tearDown(self):
+ os.environ.clear()
+ os.environ.update(self.original_environ)
+
+ # --- Test 1: Git Stats Logic ---
+ @patch("subprocess.check_output")
+ def test_get_git_stats_success(self, mock_subprocess):
+ # Mock successful git output
+ mock_subprocess.return_value = " 5 files changed, 20 insertions(+), 5 deletions(-)"
+
+ count = collect_metrics.get_git_stats()
+ self.assertEqual(count, 5)
+
+ @patch("subprocess.check_output")
+ def test_get_git_stats_singular(self, mock_subprocess):
+ # Mock singular output
+ mock_subprocess.return_value = " 1 file changed, 1 insertion(+)"
+
+ count = collect_metrics.get_git_stats()
+ self.assertEqual(count, 1)
+
+ @patch("subprocess.check_output")
+ def test_get_git_stats_failure(self, mock_subprocess):
+ # Mock a git failure
+ mock_subprocess.side_effect = Exception("Git command not found")
+
+ count = collect_metrics.get_git_stats()
+ self.assertEqual(count, 9999) # Fallback value
+
+ # --- Test 2: BEP Parsing ---
+ def test_parse_bep_valid(self):
+ # Create a mock BEP file content
+ mock_bep_content = [
+ # Test Result Event
+ json.dumps(
+ {
+ "id": {"testResult": {"label": "//pkg:test1"}},
+ "testResult": {"status": "PASSED", "testAttemptDurationMillis": "1500"},
+ }
+ ),
+ # Another Test Result (Failed)
+ json.dumps(
+ {
+ "id": {"testResult": {"label": "//pkg:test2"}},
+ "testResult": {"status": "FAILED", "testAttemptDurationMillis": "5000"},
+ }
+ ),
+ # Build Metrics
+ json.dumps(
+ {
+ "id": {"buildMetrics": {}},
+ "buildMetrics": {
+ "timingMetrics": {"wallTimeInMs": "10000"},
+ "actionSummary": {
+ "actionsExecuted": "100",
+ "runnerCount": [{"name": "remote cache hit", "count": "50"}],
+ },
+ "artifactMetrics": {"topLevelArtifacts": {"sizeInBytes": "2048"}},
+ "networkMetrics": {"systemNetworkStats": {"bytesRecv": "1024"}},
+ },
+ }
+ ),
+ # Build Tool Logs
+ json.dumps(
+ {
+ "id": {"buildToolLogs": {}},
+ "buildToolLogs": {
+ "log": [
+ {
+ "name": "critical path",
+ "contents": base64.b64encode(b"Critical Path: 15.0s\n").decode("utf-8"),
+ }
+ ]
+ },
+ }
+ ),
+ # Build Finished
+ json.dumps(
+ {"id": {"buildFinished": {}}, "finished": {"exitCode": {"code": 0}}}
+ ),
+ ]
+
+ # Mock file reading
+ with patch("builtins.open", mock_open(read_data="\n".join(mock_bep_content))), \
+ patch("os.path.exists", return_value=True):
+ bep_metrics = collect_metrics.parse_bep("dummy.json")
+
+ # Verify Metrics
+ self.assertEqual(bep_metrics.wall_time_ms, 10000)
+ self.assertEqual(bep_metrics.total_actions, 100)
+ self.assertEqual(bep_metrics.remote_and_disk_cache_hits, 50)
+ self.assertEqual(bep_metrics.failed_test_count, 1)
+ self.assertEqual(bep_metrics.critical_path_s, 15.0)
+ self.assertEqual(bep_metrics.exit_code, 0)
+
+ # Verify Targets
+ self.assertEqual(len(bep_metrics.targets), 2)
+ target1 = next(t for t in bep_metrics.targets if t.label == "//pkg:test1")
+ self.assertEqual(target1.status, "PASSED")
+ self.assertEqual(target1.duration_s, 1.5)
+
+ # --- Second Run to fall back to outputArtifactsSeen ---
+ mock_bep_content_2 = [
+ json.dumps(
+ {
+ "id": {"buildMetrics": {}},
+ "buildMetrics": {
+ "artifactMetrics": {
+ "topLevelArtifacts": {"sizeInBytes": "0"},
+ "outputArtifactsSeen": {"sizeInBytes": "4096"}
+ },
+ },
+ }
+ ),
+ ]
+
+ with patch("builtins.open", mock_open(read_data="\n".join(mock_bep_content_2))):
+ bep_metrics_2 = collect_metrics.parse_bep("dummy2.json")
+ self.assertEqual(bep_metrics_2.output_size_bytes, 4096)
+
+ def test_extract_critical_path(self):
+ # Mock a Base64 encoded critical path log
+ raw_log = "Critical Path: 12.5s\n Action A..."
+ b64_log = base64.b64encode(raw_log.encode("utf-8")).decode("utf-8")
+
+ logs = [{"name": "critical path", "contents": b64_log}]
+ duration = collect_metrics.extract_critical_path(logs)
+
+ self.assertEqual(duration, 12.5)
+
+ # --- Test 3: Main Logic & BigQuery Push ---
+ @patch("collect_metrics.subprocess.run")
+ def test_publish_to_bigquery(self, mock_run):
+ # Mock the subprocess run to succeed (return code 0)
+ mock_result = MagicMock()
+ mock_result.returncode = 0
+ mock_run.return_value = mock_result
+
+ test_row = {"build_number": 123, "pipeline": "test"}
+ collect_metrics.publish_to_bigquery(test_row)
+
+ # Verify it called subprocess.run with bq insert
+ mock_run.assert_called_once()
+ call_args = mock_run.call_args[0][0]
+ self.assertTrue(any("bq" in arg for arg in call_args))
+ self.assertIn("insert", call_args)
+
+ @patch("collect_metrics.publish_to_bigquery")
+ @patch("collect_metrics.parse_bep")
+ @patch("collect_metrics.get_git_stats")
+ def test_collect_metrics_end_to_end(self, mock_git, mock_parse, mock_publish):
+ # Setup Environment
+ os.environ["BUILDKITE_BUILD_NUMBER"] = "500"
+ os.environ["BUILDKITE_PIPELINE_SLUG"] = "test-pipeline"
+
+ # Setup Mocks
+ mock_git.return_value = 5 # 5 changed files
+
+ # Mock BEP Return
+ mock_bep_metrics = collect_metrics.BuildMetrics(
+ wall_time_ms=5000,
+ critical_path_s=4.0,
+ remote_and_disk_cache_hits=10,
+ total_actions=20,
+ output_size_bytes=100,
+ bytes_downloaded=50,
+ failed_test_count=0,
+ exit_code=0,
+ )
+ mock_parse.return_value = mock_bep_metrics
+
+ # Run Function (with mocked timestamps)
+ with patch("collect_metrics.fetch_job_timestamps") as mock_fetch:
+ mock_fetch.return_value = collect_metrics.JobTimestamps(
+ created_at="2023-10-25T10:00:00Z",
+ started_at="2023-10-25T10:05:00Z"
+ )
+ collect_metrics.collect_metrics_and_push_to_bigquery("dummy_path.json")
+
+ # Verify publish_to_bigquery was called
+ mock_publish.assert_called_once()
+
+ # Inspect the row payload that was generated
+ row = mock_publish.call_args[0][0]
+
+ self.assertEqual(row["build_number"], 500)
+ self.assertEqual(row["pipeline"], "test-pipeline")
+ self.assertEqual(row["changed_files_count"], 5)
+ self.assertEqual(row["failed_test_count"], 0)
+ self.assertEqual(row.get("queue_duration_s"), 300.0)
+
+ @patch("collect_metrics.subprocess.run")
+ def test_publish_to_bigquery_failure(self, mock_run):
+ mock_result = MagicMock()
+ mock_result.returncode = 1
+ mock_result.stdout = "out"
+ mock_result.stderr = "err"
+ mock_run.return_value = mock_result
+
+ with patch("collect_metrics.print_and_annotate_warning") as mock_annotate:
+ collect_metrics.publish_to_bigquery({"test": 1})
+ mock_annotate.assert_called_once()
+
+ def test_duration_parsing_error(self):
+ with patch.dict(os.environ, {"CHECKOUT_DURATION_S": "invalid", "PREP_DURATION_S": "invalid", "BUILDKITE_BUILD_NUMBER": "500"}):
+ with patch("collect_metrics.parse_bep") as mock_parse, \
+ patch("collect_metrics.publish_to_bigquery"):
+ mock_parse.return_value = MagicMock()
+ collect_metrics.collect_metrics_and_push_to_bigquery("dummy")
+
+ @patch("collect_metrics.parse_bep")
+ def test_collect_metrics_bep_failure(self, mock_parse):
+ mock_parse.return_value = None
+ with patch.dict(os.environ, {"BUILDKITE_BUILD_NUMBER": "500"}):
+ with patch("collect_metrics.print_and_annotate_warning") as mock_annotate:
+ collect_metrics.collect_metrics_and_push_to_bigquery("dummy")
+ mock_annotate.assert_called_once_with("Skipping BigQuery push due to BEP parsing failure.")
+
+
+ def test_parse_bep_file_not_found(self):
+ with patch("os.path.exists", return_value=False):
+ bep_metrics = collect_metrics.parse_bep("non_existent_file.json")
+ self.assertIsNone(bep_metrics)
+
+
+ @patch("collect_metrics.bazelci.BuildkiteClient")
+ def test_fetch_job_timestamps_success(self, mock_client_cls):
+ mock_client = MagicMock()
+ mock_client_cls.return_value = mock_client
+ mock_client.get_build_info.return_value = {
+ "jobs": [{"id": "123", "created_at": "C", "started_at": "S", "finished_at": "F"}]
+ }
+
+ ts = collect_metrics.fetch_job_timestamps("org", "pipe", 1, "123")
+ self.assertEqual(ts.created_at, "C")
+ self.assertEqual(ts.finished_at, "F")
+
+
+if __name__ == "__main__":
+ unittest.main()