Collect CI metrics and push to BigQuery (#2509)

- Add hooks to capture boot time in every agent (Linux and Windows)
- Collect CI metrics from BEP file and BuildKite
- The collection and push to bigquery logic is called after the test
logic is done.

The code here had been running in the testing branch for a few days in
[bazel pipeline in the testing org
](https://buildkite.com/bazel-testing/bazel-bazel)
And the collected metrics can be found in
`bazel-public.bazel_ci_metrics.ci_builds`
diff --git a/buildkite/bazelci.py b/buildkite/bazelci.py
index 91461ee..dde9f50 100755
--- a/buildkite/bazelci.py
+++ b/buildkite/bazelci.py
@@ -1602,6 +1602,24 @@
                         ],
                         fail_if_nonzero=False,
                     )
+                if is_trueish(os.environ.get("ENABLE_METRICS_COLLECTION", "false")):
+                    try:
+                        from collect_metrics import collect_metrics_and_push_to_bigquery
+                        collect_metrics_and_push_to_bigquery(test_bep_file)
+                    except Exception as e:
+                        eprint(f"Failed to upload metrics: {e}")
+                        job_url = f"{os.getenv('BUILDKITE_BUILD_URL')}#{os.getenv('BUILDKITE_JOB_ID')}"
+                        execute_command(
+                        [
+                            "buildkite-agent",
+                            "annotate",
+                            "--style=warning",
+                            f"Failed to upload metrics from [this job]({job_url})",
+                            "--context",
+                            "ctx-metrics_upload_failed",
+                        ],
+                        fail_if_nonzero=False,
+                    )
 
             _ = future.result()
             # TODO: print results
@@ -3006,7 +3024,7 @@
 
 
 def create_docker_step(label, image, commands=None, additional_env_vars=None, queue="default"):
-    env = ["ANDROID_HOME", "ANDROID_NDK_HOME", "BUILDKITE_ARTIFACT_UPLOAD_DESTINATION"]
+    env = ["ANDROID_HOME", "ANDROID_NDK_HOME", "BUILDKITE_ARTIFACT_UPLOAD_DESTINATION", "CHECKOUT_DURATION_S", "PREP_DURATION_S"]
     if THIS_IS_TRUSTED:
         # For the trusted Linux arm64 machine to upload artifacts
         env += ["GOOGLE_APPLICATION_CREDENTIALS"]
diff --git a/buildkite/collect_metrics.py b/buildkite/collect_metrics.py
new file mode 100644
index 0000000..3a5da43
--- /dev/null
+++ b/buildkite/collect_metrics.py
@@ -0,0 +1,359 @@
+import os
+import json
+import base64
+import re
+import subprocess
+import urllib
+import urllib.request
+import tempfile
+
+import collections
+import dataclasses
+import datetime
+from typing import List
+
+import bazelci
+
+@dataclasses.dataclass
+class JobTimestamps:
+    created_at: str = None
+    started_at: str = None
+    finished_at: str = None
+
+@dataclasses.dataclass
+class TestTarget:
+    label: str
+    status: str
+    duration_s: float
+    shard_count: int = 1
+    shard_durations: List[float] = dataclasses.field(default_factory=list)
+
+# --- BigQuery Configuration constants ---
+PROJECT_ID = "bazel-public"
+DATASET_ID = "bazel_ci_metrics"
+TABLE_ID = "ci_builds"
+
+@dataclasses.dataclass
+class BuildMetrics:
+    wall_time_ms: int = 0
+    critical_path_s: float = 0.0
+    remote_and_disk_cache_hits: int = 0
+    total_actions: int = 0
+    output_size_bytes: int = 0
+    bytes_downloaded: int = 0
+    failed_test_count: int = 0
+    exit_code: int = 0
+    targets: List[TestTarget] = dataclasses.field(default_factory=list)
+
+def print_and_annotate_warning(message):
+    """
+    Prints a warning to the logs and annotates the Buildkite UI so it's visible on the build page.
+    """
+    bazelci.eprint(message)
+    try:
+        job_url = f"{os.getenv('BUILDKITE_BUILD_URL')}#{os.getenv('BUILDKITE_JOB_ID')}"
+        bazelci.execute_command(
+            [
+                "buildkite-agent",
+                "annotate",
+                "--style=warning",
+                f"{message} (for [this job]({job_url}))",
+                "--context",
+                "ctx-metrics_upload_failed",
+            ],
+            fail_if_nonzero=False,
+        )
+    except Exception as e:
+        bazelci.eprint(f"Failed to annotate Buildkite: {e}")
+
+
+def fetch_job_timestamps(org_slug, pipeline_slug, build_number, job_id):
+    """
+    Fetches real timestamps from Buildkite API for the current job.
+    Returns:
+        JobTimestamps: An object containing created_at, started_at, and finished_at strings.
+    """
+    try:
+        client = bazelci.BuildkiteClient(org_slug, pipeline_slug)
+        build_data = client.get_build_info(build_number)
+        for job in build_data.get("jobs", []):
+            if job.get("id") == job_id:
+                # If the job is still running when this script executes (which is typical
+                # when called from bazelci.py), finished_at is None. We fallback to the
+                # current time so Grafana queries have a valid timestamp for metrics.
+                finished_at = job.get("finished_at")
+                if not finished_at:
+                    finished_at = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+                return JobTimestamps(
+                    created_at=job.get("created_at"),
+                    started_at=job.get("started_at"),
+                    finished_at=finished_at,
+                )
+    except Exception as e:
+        bazelci.eprint(f"Warning: Failed to fetch job timestamps: {e}")
+
+    return JobTimestamps()
+
+
+def get_git_stats(target_dir="."):
+    """Gets the number of files changed between HEAD and HEAD~1."""
+    try:
+        # Use git show instead of diff HEAD~1 because PRs might be squashed or shallow cloned
+        # --shortstat gives a summary of "X files changed" includes NEW and DELETED files too
+        output = subprocess.check_output(
+            ["git", "show", "--shortstat", "--format="],
+            cwd=target_dir,
+            text=True,
+            stderr=subprocess.STDOUT,
+        ).strip()
+        # Output format: " 1 file changed, 1 insertion(+)"
+        match = re.search(r"(\d+)\s+file[s]?\s+changed", output)
+        if match:
+            return int(match.group(1))
+    except Exception as e:
+        bazelci.eprint(f"Warning: Git diff failed ({e}). Defaulting changed_files to large value (9999).")
+    
+    # This will be passed by the cache hit metric, as it only considers small changes.
+    # but we can also return -1 and update Grafana to skip these values.
+    return 9999
+
+
+def extract_critical_path(build_tool_logs):
+    """
+    Decodes the 'critical path' log from Base64 and extracts the duration.
+    """
+    for log in build_tool_logs:
+        if log.get("name") == "critical path":
+            try:
+                content_b64 = log.get("contents")
+                if not content_b64:
+                    continue
+                content_str = base64.b64decode(content_b64).decode("utf-8")
+                match = re.search(r"Critical Path: ([\d\.]+)s", content_str)
+                if match:
+                    return float(match.group(1))
+            except Exception as e:
+                bazelci.eprint(f"Error parsing critical path log: {e}")
+    return 0.0
+
+
+def parse_bep(filepath):
+    """
+    Parses the Build Event Protocol (BEP) JSON file to extract build metrics and targets.
+
+    Returns:
+        BuildMetrics: An object containing aggregated build metrics and test targets.
+        None: If the file does not exist.
+    """
+
+    if not os.path.exists(filepath):
+        bazelci.eprint(f"Error: BEP file not found at {filepath}")
+        return None
+
+    build_metrics = BuildMetrics()
+    target_map = collections.defaultdict(list)
+    target_status = {}
+
+    with open(filepath, "r") as f:
+        for line_num, line in enumerate(f, 1):
+            if not line.strip():
+                continue
+            try:
+                event = json.loads(line)
+            except json.JSONDecodeError:
+                bazelci.eprint(f"Skipping invalid JSON line at line {line_num}")
+                continue
+
+            event_id = event.get("id", {})
+
+            # --- 1. Test Results ---
+            if "testResult" in event:
+                data = event["testResult"]
+                label = event_id.get("testResult", {}).get("label")
+
+                if label:
+                    duration_ms = int(data.get("testAttemptDurationMillis", 0))
+                    duration_s = duration_ms / 1000.0
+                    target_map[label].append(duration_s)
+
+                    # Status (PASSED, FAILED, FLAKY)
+                    current_status = data.get("status", "UNKNOWN")
+                    if label not in target_status or current_status != "PASSED":
+                        target_status[label] = current_status
+
+                    if current_status != "PASSED":
+                        build_metrics.failed_test_count += 1
+
+            # --- 2. Build Metrics ---
+            elif "buildMetrics" in event:
+                buildMetrics = event["buildMetrics"]
+                build_metrics.wall_time_ms = int(
+                    buildMetrics.get("timingMetrics", {}).get("wallTimeInMs", 0)
+                )
+
+                action_summary = buildMetrics.get("actionSummary", {})
+                build_metrics.total_actions = int(action_summary.get("actionsExecuted", 0))
+
+                for runner in action_summary.get("runnerCount", []):
+                    name = runner.get("name", "").lower()
+                    if "remote cache hit" in name or "disk cache hit" in name:
+                        build_metrics.remote_and_disk_cache_hits += int(runner.get("count", 0))
+
+                artifacts = buildMetrics.get("artifactMetrics", {})
+                build_metrics.output_size_bytes = int(
+                    artifacts.get("topLevelArtifacts", {}).get("sizeInBytes", 0)
+                )
+                if build_metrics.output_size_bytes == 0:
+                    build_metrics.output_size_bytes = int(
+                        artifacts.get("outputArtifactsSeen", {}).get("sizeInBytes", 0)
+                    )
+
+                # Network
+                net = buildMetrics.get("networkMetrics", {}).get("systemNetworkStats", {})
+                build_metrics.bytes_downloaded = int(net.get("bytesRecv", 0))
+
+            # --- 3. Build Tool Logs ---
+            elif "buildToolLogs" in event:
+                logs = event["buildToolLogs"].get("log", [])
+                build_metrics.critical_path_s = extract_critical_path(logs)
+
+            # --- Build Finished (Exit Code) ---
+            if "buildFinished" in event_id:
+                exit_data = event.get("finished").get("exitCode", {})
+                build_metrics.exit_code = int(exit_data.get("code", 0))
+
+    # --- 4. Post-Process Nested Targets ---
+    for label, shards in target_map.items():
+        build_metrics.targets.append(
+            TestTarget(
+                label=label,
+                status=target_status.get(label, "UNKNOWN"),
+                duration_s=max(shards) if shards else 0.0,
+                shard_count=len(shards),
+                shard_durations=shards,
+            )
+        )
+
+    return build_metrics
+
+
+def publish_to_bigquery(row):
+    """
+    Pushes a single row to BigQuery using the 'bq' CLI tool via subprocess.
+    """
+
+    bazelci.eprint(f"Publishing Metrics to BigQuery ...")
+    table_ref = f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}"        
+    bq_cmd = "bq.cmd" if bazelci.is_windows() else "bq"
+
+    try:
+        with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as tf:
+            json.dump(row, tf)
+            tf.write("\n")
+            temp_path = tf.name
+
+        result = subprocess.run(
+            [bq_cmd, "insert", table_ref, temp_path],
+            capture_output=True,
+            text=True,
+            check=False
+        )
+        
+        if result.returncode != 0:
+            print_and_annotate_warning(f"BigQuery CLI Insert Error:\nSTDOUT: {result.stdout}\nSTDERR: {result.stderr}")
+            return
+            
+        bazelci.eprint("Success: Metrics pushed to BigQuery via CLI.")
+            
+    except Exception as e:
+        eprint_and_annotate_warning(f"Failed to execute bq CLI: {e}")
+    finally:
+        if 'temp_path' in locals() and os.path.exists(temp_path):
+            os.remove(temp_path)
+
+def collect_metrics_and_push_to_bigquery(bep_file_path):
+    """
+    Reads the BEP file, collects environment variables, and pushes metrics to BigQuery.
+    Called from bazelci.py after the build finishes.
+    """
+
+    bazelci.eprint(f"Collecting CI Metrics ...")
+
+    # --- Configuration (Read Env Vars inside function) ---
+    BUILDKITE_BUILD_ID = os.getenv("BUILDKITE_BUILD_ID")
+    BUILDKITE_BUILD_NUMBER = int(os.getenv("BUILDKITE_BUILD_NUMBER"))
+    BUILDKITE_JOB_ID = os.getenv("BUILDKITE_JOB_ID")
+    BUILDKITE_LABEL = os.getenv("BUILDKITE_LABEL")
+    PIPELINE = os.getenv("BUILDKITE_PIPELINE_SLUG")
+    ORG = os.getenv("BUILDKITE_ORGANIZATION_SLUG")
+    REPO = os.getenv("BUILDKITE_REPO")
+    PLATFORM = os.getenv("BUILDKITE_AGENT_META_DATA_OS")
+    AGENT_ID = os.getenv("BUILDKITE_AGENT_ID")
+    BRANCH = os.getenv("BUILDKITE_BRANCH", "main")
+    COMMIT_SHA = os.getenv("BUILDKITE_COMMIT")
+    RETRY_COUNT = int(os.getenv("BUILDKITE_RETRY_COUNT", "0"))
+
+    # Injected via webhooks
+    try:
+        CHECKOUT_DURATION_S = float(os.getenv("CHECKOUT_DURATION_S"))
+    except (ValueError, TypeError):
+        CHECKOUT_DURATION_S = None
+    try:
+        PREP_DURATION_S = float(os.getenv("PREP_DURATION_S"))
+    except (ValueError, TypeError):
+        PREP_DURATION_S = None
+
+    # Calculate Changed Files
+    CHANGED_FILES_COUNT = get_git_stats()
+
+    # Parse BEP Data
+    build_metrics = parse_bep(bep_file_path)
+    if build_metrics is None:
+        print_and_annotate_warning("Skipping BigQuery push due to BEP parsing failure.")
+        return
+
+    # Get Timestamps & calculate Queue time
+    timestamps = fetch_job_timestamps(
+        ORG, PIPELINE, BUILDKITE_BUILD_NUMBER, BUILDKITE_JOB_ID
+    )
+    queue_duration = 0.0
+    if timestamps.created_at and timestamps.started_at:
+        try:
+            created_dt = datetime.datetime.fromisoformat(timestamps.created_at.replace("Z", "+00:00"))
+            started_dt = datetime.datetime.fromisoformat(timestamps.started_at.replace("Z", "+00:00"))
+            queue_duration = (started_dt - created_dt).total_seconds()
+        except Exception as e:
+            bazelci.eprint(f"Warning: Could not parse timestamps: {e}")
+
+    # Construct BigQuery Row
+    row = {
+        "build_id": BUILDKITE_BUILD_ID,
+        "build_number": BUILDKITE_BUILD_NUMBER,
+        "job_id": BUILDKITE_JOB_ID,
+        "job_label": BUILDKITE_LABEL,
+        "finished_at": timestamps.finished_at,
+        "created_at": timestamps.created_at,
+        "started_at": timestamps.started_at,
+        "pipeline": PIPELINE,
+        "platform": PLATFORM,
+        "agent_id": AGENT_ID,
+        "branch": BRANCH,
+        "repo": REPO,
+        "commit_sha": COMMIT_SHA,
+        "exit_code": build_metrics.exit_code,
+        "failed_test_count": build_metrics.failed_test_count,
+        "retry_count": RETRY_COUNT,
+        "wall_time_s": build_metrics.wall_time_ms / 1000.0,
+        "critical_path_s": build_metrics.critical_path_s,
+        "queue_duration_s": queue_duration,
+        "checkout_duration_s": CHECKOUT_DURATION_S,
+        "prep_duration_s": PREP_DURATION_S,
+        "remote_and_disk_cache_hits": build_metrics.remote_and_disk_cache_hits,
+        "total_actions": build_metrics.total_actions,
+        "output_size_bytes": build_metrics.output_size_bytes,
+        "bytes_downloaded": build_metrics.bytes_downloaded,
+        "changed_files_count": CHANGED_FILES_COUNT,
+        "targets": [dataclasses.asdict(t) for t in build_metrics.targets],
+    }
+
+    publish_to_bigquery(row)
diff --git a/buildkite/collect_metrics_test.py b/buildkite/collect_metrics_test.py
new file mode 100644
index 0000000..33f622a
--- /dev/null
+++ b/buildkite/collect_metrics_test.py
@@ -0,0 +1,256 @@
+import unittest
+from unittest.mock import patch, MagicMock, mock_open
+import os
+import json
+import base64
+
+import collect_metrics
+
+
+class TestPublishMetrics(unittest.TestCase):
+
+    def setUp(self):
+        # Reset environment variables before each test
+        self.original_environ = os.environ.copy()
+        os.environ.clear()
+
+    def tearDown(self):
+        os.environ.clear()
+        os.environ.update(self.original_environ)
+
+    # --- Test 1: Git Stats Logic ---
+    @patch("subprocess.check_output")
+    def test_get_git_stats_success(self, mock_subprocess):
+        # Mock successful git output
+        mock_subprocess.return_value = " 5 files changed, 20 insertions(+), 5 deletions(-)"
+
+        count = collect_metrics.get_git_stats()
+        self.assertEqual(count, 5)
+
+    @patch("subprocess.check_output")
+    def test_get_git_stats_singular(self, mock_subprocess):
+        # Mock singular output
+        mock_subprocess.return_value = " 1 file changed, 1 insertion(+)"
+
+        count = collect_metrics.get_git_stats()
+        self.assertEqual(count, 1)
+
+    @patch("subprocess.check_output")
+    def test_get_git_stats_failure(self, mock_subprocess):
+        # Mock a git failure
+        mock_subprocess.side_effect = Exception("Git command not found")
+
+        count = collect_metrics.get_git_stats()
+        self.assertEqual(count, 9999)  # Fallback value
+
+    # --- Test 2: BEP Parsing ---
+    def test_parse_bep_valid(self):
+        # Create a mock BEP file content
+        mock_bep_content = [
+            # Test Result Event
+            json.dumps(
+                {
+                    "id": {"testResult": {"label": "//pkg:test1"}},
+                    "testResult": {"status": "PASSED", "testAttemptDurationMillis": "1500"},
+                }
+            ),
+            # Another Test Result (Failed)
+            json.dumps(
+                {
+                    "id": {"testResult": {"label": "//pkg:test2"}},
+                    "testResult": {"status": "FAILED", "testAttemptDurationMillis": "5000"},
+                }
+            ),
+            # Build Metrics
+            json.dumps(
+                {
+                    "id": {"buildMetrics": {}},
+                    "buildMetrics": {
+                        "timingMetrics": {"wallTimeInMs": "10000"},
+                        "actionSummary": {
+                            "actionsExecuted": "100",
+                            "runnerCount": [{"name": "remote cache hit", "count": "50"}],
+                        },
+                        "artifactMetrics": {"topLevelArtifacts": {"sizeInBytes": "2048"}},
+                        "networkMetrics": {"systemNetworkStats": {"bytesRecv": "1024"}},
+                    },
+                }
+            ),
+            # Build Tool Logs
+            json.dumps(
+                {
+                    "id": {"buildToolLogs": {}},
+                    "buildToolLogs": {
+                        "log": [
+                            {
+                                "name": "critical path",
+                                "contents": base64.b64encode(b"Critical Path: 15.0s\n").decode("utf-8"),
+                            }
+                        ]
+                    },
+                }
+            ),
+            # Build Finished
+            json.dumps(
+                {"id": {"buildFinished": {}}, "finished": {"exitCode": {"code": 0}}}
+            ),
+        ]
+
+        # Mock file reading
+        with patch("builtins.open", mock_open(read_data="\n".join(mock_bep_content))), \
+             patch("os.path.exists", return_value=True):
+            bep_metrics = collect_metrics.parse_bep("dummy.json")
+
+            # Verify Metrics
+            self.assertEqual(bep_metrics.wall_time_ms, 10000)
+            self.assertEqual(bep_metrics.total_actions, 100)
+            self.assertEqual(bep_metrics.remote_and_disk_cache_hits, 50)
+            self.assertEqual(bep_metrics.failed_test_count, 1)
+            self.assertEqual(bep_metrics.critical_path_s, 15.0)
+            self.assertEqual(bep_metrics.exit_code, 0)
+
+            # Verify Targets
+            self.assertEqual(len(bep_metrics.targets), 2)
+            target1 = next(t for t in bep_metrics.targets if t.label == "//pkg:test1")
+            self.assertEqual(target1.status, "PASSED")
+            self.assertEqual(target1.duration_s, 1.5)
+
+            # --- Second Run to fall back to outputArtifactsSeen ---
+            mock_bep_content_2 = [
+                json.dumps(
+                    {
+                        "id": {"buildMetrics": {}},
+                        "buildMetrics": {
+                            "artifactMetrics": {
+                                "topLevelArtifacts": {"sizeInBytes": "0"},
+                                "outputArtifactsSeen": {"sizeInBytes": "4096"}
+                            },
+                        },
+                    }
+                ),
+            ]
+            
+            with patch("builtins.open", mock_open(read_data="\n".join(mock_bep_content_2))):
+                bep_metrics_2 = collect_metrics.parse_bep("dummy2.json")
+                self.assertEqual(bep_metrics_2.output_size_bytes, 4096)
+
+    def test_extract_critical_path(self):
+        # Mock a Base64 encoded critical path log
+        raw_log = "Critical Path: 12.5s\n  Action A..."
+        b64_log = base64.b64encode(raw_log.encode("utf-8")).decode("utf-8")
+
+        logs = [{"name": "critical path", "contents": b64_log}]
+        duration = collect_metrics.extract_critical_path(logs)
+
+        self.assertEqual(duration, 12.5)
+
+    # --- Test 3: Main Logic & BigQuery Push ---
+    @patch("collect_metrics.subprocess.run")
+    def test_publish_to_bigquery(self, mock_run):
+        # Mock the subprocess run to succeed (return code 0)
+        mock_result = MagicMock()
+        mock_result.returncode = 0
+        mock_run.return_value = mock_result
+
+        test_row = {"build_number": 123, "pipeline": "test"}
+        collect_metrics.publish_to_bigquery(test_row)
+
+        # Verify it called subprocess.run with bq insert
+        mock_run.assert_called_once()
+        call_args = mock_run.call_args[0][0]
+        self.assertTrue(any("bq" in arg for arg in call_args))
+        self.assertIn("insert", call_args)
+
+    @patch("collect_metrics.publish_to_bigquery")
+    @patch("collect_metrics.parse_bep")
+    @patch("collect_metrics.get_git_stats")
+    def test_collect_metrics_end_to_end(self, mock_git, mock_parse, mock_publish):
+        # Setup Environment
+        os.environ["BUILDKITE_BUILD_NUMBER"] = "500"
+        os.environ["BUILDKITE_PIPELINE_SLUG"] = "test-pipeline"
+
+        # Setup Mocks
+        mock_git.return_value = 5  # 5 changed files
+
+        # Mock BEP Return
+        mock_bep_metrics = collect_metrics.BuildMetrics(
+            wall_time_ms=5000,
+            critical_path_s=4.0,
+            remote_and_disk_cache_hits=10,
+            total_actions=20,
+            output_size_bytes=100,
+            bytes_downloaded=50,
+            failed_test_count=0,
+            exit_code=0,
+        )
+        mock_parse.return_value = mock_bep_metrics
+
+        # Run Function (with mocked timestamps)
+        with patch("collect_metrics.fetch_job_timestamps") as mock_fetch:
+            mock_fetch.return_value = collect_metrics.JobTimestamps(
+                created_at="2023-10-25T10:00:00Z",
+                started_at="2023-10-25T10:05:00Z"
+            )
+            collect_metrics.collect_metrics_and_push_to_bigquery("dummy_path.json")
+
+        # Verify publish_to_bigquery was called
+        mock_publish.assert_called_once()
+
+        # Inspect the row payload that was generated
+        row = mock_publish.call_args[0][0]
+
+        self.assertEqual(row["build_number"], 500)
+        self.assertEqual(row["pipeline"], "test-pipeline")
+        self.assertEqual(row["changed_files_count"], 5)
+        self.assertEqual(row["failed_test_count"], 0)
+        self.assertEqual(row.get("queue_duration_s"), 300.0)
+
+    @patch("collect_metrics.subprocess.run")
+    def test_publish_to_bigquery_failure(self, mock_run):
+        mock_result = MagicMock()
+        mock_result.returncode = 1
+        mock_result.stdout = "out"
+        mock_result.stderr = "err"
+        mock_run.return_value = mock_result
+        
+        with patch("collect_metrics.print_and_annotate_warning") as mock_annotate:
+            collect_metrics.publish_to_bigquery({"test": 1})
+            mock_annotate.assert_called_once()
+
+    def test_duration_parsing_error(self):
+        with patch.dict(os.environ, {"CHECKOUT_DURATION_S": "invalid", "PREP_DURATION_S": "invalid", "BUILDKITE_BUILD_NUMBER": "500"}):
+            with patch("collect_metrics.parse_bep") as mock_parse, \
+                 patch("collect_metrics.publish_to_bigquery"):
+                mock_parse.return_value = MagicMock()
+                collect_metrics.collect_metrics_and_push_to_bigquery("dummy")
+
+    @patch("collect_metrics.parse_bep")
+    def test_collect_metrics_bep_failure(self, mock_parse):
+        mock_parse.return_value = None
+        with patch.dict(os.environ, {"BUILDKITE_BUILD_NUMBER": "500"}):
+            with patch("collect_metrics.print_and_annotate_warning") as mock_annotate:
+                collect_metrics.collect_metrics_and_push_to_bigquery("dummy")
+                mock_annotate.assert_called_once_with("Skipping BigQuery push due to BEP parsing failure.")
+
+
+    def test_parse_bep_file_not_found(self):
+        with patch("os.path.exists", return_value=False):
+            bep_metrics = collect_metrics.parse_bep("non_existent_file.json")
+            self.assertIsNone(bep_metrics)
+
+
+    @patch("collect_metrics.bazelci.BuildkiteClient")
+    def test_fetch_job_timestamps_success(self, mock_client_cls):
+        mock_client = MagicMock()
+        mock_client_cls.return_value = mock_client
+        mock_client.get_build_info.return_value = {
+            "jobs": [{"id": "123", "created_at": "C", "started_at": "S", "finished_at": "F"}]
+        }
+        
+        ts = collect_metrics.fetch_job_timestamps("org", "pipe", 1, "123")
+        self.assertEqual(ts.created_at, "C")
+        self.assertEqual(ts.finished_at, "F")
+
+
+if __name__ == "__main__":
+    unittest.main()