A few improvements to bazel ci scripts (#2076)

`aggregate_incompatible_flags_test_result.py`:

- Stripped out timestamps added by BuildKite in job log.
- Added support for collecting incompatible flag test result for
https://buildkite.com/bazel/bcr-bazel-compatibility-test

`bazelci.py`:
- Added support for overriding Bazel version in task config before task
expansion.
- Support `concurrency` and `concurrency_group` to limit CI resource
usage.
- Avoid hitting 429 Too Many Requests error while fetching a large
number of buildkite job logs.
diff --git a/buildkite/aggregate_incompatible_flags_test_result.py b/buildkite/aggregate_incompatible_flags_test_result.py
index e0fadd2..99e78c7 100755
--- a/buildkite/aggregate_incompatible_flags_test_result.py
+++ b/buildkite/aggregate_incompatible_flags_test_result.py
@@ -31,6 +31,26 @@
 
 FLAG_LINE_PATTERN = re.compile(r"\s*(?P<flag>--\S+)\s*")
 
+MODULE_VERSION_PATTERN = re.compile(r'(?P<module_version>[a-z](?:[a-z0-9._-]*[a-z0-9])?@[^\s]+)')
+
+BAZEL_TEAM_OWNED_MODULES = frozenset([
+    "bazel-skylib",
+    "rules_android",
+    "rules_android_ndk",
+    "rules_cc",
+    "rules_java",
+    "rules_license",
+    "rules_pkg",
+    "rules_platform",
+    "rules_shell",
+    "rules_testing",
+])
+
+PROJECT = "module" if PIPELINE == "bcr-bazel-compatibility-test" else "project"
+
+MAX_LOG_FETCHER_THREADS = 30
+LOG_FETCHER_SEMAPHORE = threading.Semaphore(MAX_LOG_FETCHER_THREADS)
+
 class LogFetcher(threading.Thread):
     def __init__(self, job, client):
         threading.Thread.__init__(self)
@@ -39,7 +59,8 @@
         self.log = None
 
     def run(self):
-        self.log = self.client.get_build_log(self.job)
+        with LOG_FETCHER_SEMAPHORE:
+            self.log = self.client.get_build_log(self.job)
 
 
 def process_build_log(failed_jobs_per_flag, already_failing_jobs, log, job):
@@ -59,6 +80,10 @@
         if index_success == -1 or index_failure == -1:
             raise bazelci.BuildkiteException("Cannot recognize log of " + job["web_url"])
         for line in log[index_failure:].split("\n"):
+            # Strip out BuildKite timestamp prefix
+            line = re.sub(r'\x1b.*?\x07', '', line.strip())
+            if not line:
+                break
             handle_failing_flags(line)
         log = log[0 : log.rfind("+++ Result")]
 
@@ -67,6 +92,12 @@
         already_failing_jobs.append(job)
 
 
+def extract_module_version(line):
+    match = MODULE_VERSION_PATTERN.search(line)
+    if match:
+        return match.group("module_version")
+
+
 def extract_flag(line):
     match = FLAG_LINE_PATTERN.match(line)
     if match:
@@ -77,19 +108,28 @@
     return f'<a href="{link}" target="_blank">{content}</a>'
 
 
+def is_project_owned_by_bazel_team(project):
+    if bazelci.is_downstream_pipeline() and project in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[project].get(
+        "owned_by_bazel"
+    ):
+        # Check the downstream projects definition.
+        return True
+    elif project.split("@")[0] in BAZEL_TEAM_OWNED_MODULES:
+        # Parse the module name and check if it's bazel team owned.
+        return True
+    return False
+
 # Check if any of the given jobs needs to be migrated by the Bazel team
 def needs_bazel_team_migrate(jobs):
     for job in jobs:
-        pipeline, _ = get_pipeline_and_platform(job)
-        if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[pipeline].get(
-            "owned_by_bazel"
-        ):
+        project = get_project_name(job)
+        if is_project_owned_by_bazel_team(project):
             return True
     return False
 
 
 def print_flags_ready_to_flip(failed_jobs_per_flag, incompatible_flags):
-    info_text1 = ["#### The following flags didn't break any passing projects"]
+    info_text1 = [f"#### The following flags didn't break any passing {PROJECT}s"]
     for flag in sorted(list(incompatible_flags.keys())):
         if flag not in failed_jobs_per_flag:
             html_link_text = get_html_link_text(":github:", incompatible_flags[flag])
@@ -99,7 +139,7 @@
         info_text1 = []
 
     info_text2 = [
-        "#### The following flags didn't break any passing Bazel team owned/co-owned projects"
+        f"#### The following flags didn't break any passing Bazel team owned/co-owned {PROJECT}s"
     ]
     for flag, jobs in failed_jobs_per_flag.items():
         if flag not in incompatible_flags:
@@ -128,7 +168,7 @@
 
 
 def print_projects_need_to_migrate(failed_jobs_per_flag):
-    info_text = ["#### The following projects need migration"]
+    info_text = [f"#### The following {PROJECT}s need migration"]
     jobs_need_migration = {}
     for jobs in failed_jobs_per_flag.values():
         for job in jobs.values():
@@ -141,14 +181,14 @@
 
     projects = set()
     for job in job_list:
-        project, _ = get_pipeline_and_platform(job)
+        project = get_project_name(job)
         projects.add(project)
     project_num = len(projects)
 
     s1 = "" if project_num == 1 else "s"
     s2 = "s" if project_num == 1 else ""
     info_text.append(
-        f"<details><summary>{project_num} project{s1} need{s2} migration, click to see details</summary><ul>"
+        f"<details><summary>{project_num} {PROJECT}{s1} need{s2} migration, click to see details</summary><ul>"
     )
 
     entries = merge_and_format_jobs(job_list, "    <li><strong>{}</strong>: {}</li>")
@@ -179,62 +219,68 @@
         if jobs:
             github_url = incompatible_flags[flag]
             info_text = [f"* **{flag}** " + get_html_link_text(":github:", github_url)]
-            jobs_per_pipeline = merge_jobs(jobs.values())
-            for pipeline, platforms in jobs_per_pipeline.items():
+            jobs_per_project = merge_jobs(jobs.values())
+            for project, platforms in jobs_per_project.items():
                 bazel_mark = ""
-                if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[
-                    pipeline
-                ].get("owned_by_bazel"):
+                if is_project_owned_by_bazel_team(project):
                     bazel_mark = ":bazel:"
                 platforms_text = ", ".join(platforms)
-                info_text.append(f"  - {bazel_mark}**{pipeline}**: {platforms_text}")
+                info_text.append(f"  - {bazel_mark}**{project}**: {platforms_text}")
             # Use flag as the context so that each flag gets a different info box.
             print_info(flag, "error", info_text)
             printed_flag_boxes = True
     if not printed_flag_boxes:
         return
     info_text = [
-        "#### Downstream projects need to migrate for the following flags:",
+        "#### Projects need to migrate for the following flags:",
         "Projects marked with :bazel: need to be migrated by the Bazel team.",
     ]
     print_info("flags_need_to_migrate", "error", info_text)
 
 
 def merge_jobs(jobs):
-    jobs_per_pipeline = collections.defaultdict(list)
+    jobs_per_project = collections.defaultdict(list)
     for job in sorted(jobs, key=lambda s: s["name"].lower()):
-        pipeline, platform = get_pipeline_and_platform(job)
-        jobs_per_pipeline[pipeline].append(get_html_link_text(platform, job["web_url"]))
-    return jobs_per_pipeline
+        project = get_project_name(job)
+        platform_label = get_platform_emoji_name(job)
+        jobs_per_project[project].append(get_html_link_text(platform_label, job["web_url"]))
+    return jobs_per_project
 
 
 def merge_and_format_jobs(jobs, line_pattern):
-    # Merges all jobs for a single pipeline into one line.
+    # Merges all jobs for a single project into one line.
     # Example:
-    #   pipeline (platform1)
-    #   pipeline (platform2)
-    #   pipeline (platform3)
+    #   project (platform1)
+    #   project (platform2)
+    #   project (platform3)
     # with line_pattern ">> {}: {}" becomes
-    #   >> pipeline: platform1, platform2, platform3
-    jobs_per_pipeline = merge_jobs(jobs)
+    #   >> project: platform1, platform2, platform3
+    jobs_per_project = merge_jobs(jobs)
     return [
-        line_pattern.format(pipeline, ", ".join(platforms))
-        for pipeline, platforms in jobs_per_pipeline.items()
+        line_pattern.format(project, ", ".join(platforms))
+        for project, platforms in jobs_per_project.items()
     ]
 
 
-def get_pipeline_and_platform(job):
+def get_project_name(job):
+    # Strip out platform label from job name
+    name = job["name"].replace(get_platform_emoji_name(job), "")
+    if bazelci.is_downstream_pipeline():
+        # This is for downstream pipeline, parse the pipeline name
+        return name.partition("-")[0].partition("(")[0].strip()
+    else:
+        # This is for BCR compatibility test pipeline, parse the module name + version
+        return extract_module_version(name)
+
+
+def get_platform_emoji_name(job):
+    # By search for the platform label in the job name.
     name = job["name"]
-    platform = ""
     for p in bazelci.PLATFORMS.values():
         platform_label = p.get("emoji-name")
         if platform_label in name:
-            platform = platform_label
-            name = name.replace(platform_label, "")
-            break
-
-    name = name.partition("-")[0].partition("(")[0].strip()
-    return name, platform
+            return platform_label
+    raise bazelci.BuildkiteException("Cannot detect platform name for: " + job["web_url"])
 
 
 def print_info(context, style, info):
@@ -264,8 +310,8 @@
 
     threads = []
     for job in build_info["jobs"]:
-        # Some irrelevant job has no "state" field
-        if "state" in job:
+        # Some irrelevant job has no "state" or "raw_log_url" field
+        if "state" in job and "raw_log_url" in job:
             thread = LogFetcher(job, client)
             threads.append(thread)
             thread.start()
diff --git a/buildkite/bazelci.py b/buildkite/bazelci.py
index 5e4f82a..da9b2a2 100755
--- a/buildkite/bazelci.py
+++ b/buildkite/bazelci.py
@@ -699,16 +699,28 @@
             project=("bazel-public" if THIS_IS_TRUSTED else "bazel-untrusted"),
         )
 
-    def _open_url(self, url, params=[]):
-        try:
-            params_str = "".join("&{}={}".format(k, v) for k, v in params)
-            return (
-                urllib.request.urlopen("{}?access_token={}{}".format(url, self._token, params_str))
-                .read()
-                .decode("utf-8", "ignore")
-            )
-        except urllib.error.HTTPError as ex:
-            raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))
+    def _open_url(self, url, params=[], retries=5):
+        params_str = "".join("&{}={}".format(k, v) for k, v in params)
+        full_url = "{}?access_token={}{}".format(url, self._token, params_str)
+
+        for attempt in range(retries):
+            try:
+                response = urllib.request.urlopen(full_url)
+                return response.read().decode("utf-8", "ignore")
+            except urllib.error.HTTPError as ex:
+                # Handle specific error codes
+                if ex.code == 429:  # Too Many Requests
+                    retry_after = ex.headers.get("RateLimit-Reset")
+                    if retry_after:
+                        wait_time = int(retry_after)
+                    else:
+                        wait_time = (2 ** attempt)  # Exponential backoff if no RateLimit-Reset header
+
+                    time.sleep(wait_time)
+                else:
+                    raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))
+
+        raise BuildkiteException(f"Failed to open {url} after {retries} retries.")
 
     def get_pipeline_info(self):
         """Get details for a pipeline given its organization slug
@@ -984,7 +996,7 @@
     return expanded_task
 
 
-def fetch_configs(http_url, file_config):
+def fetch_configs(http_url, file_config, bazel_version=None):
     """
     If specified fetches the build configuration from file_config or http_url, else tries to
     read it from .bazelci/presubmit.yml.
@@ -993,7 +1005,7 @@
     if file_config is not None and http_url is not None:
         raise BuildkiteException("file_config and http_url cannot be set at the same time")
 
-    return load_config(http_url, file_config)
+    return load_config(http_url, file_config, bazel_version=bazel_version)
 
 
 def expand_task_config(config):
@@ -1023,7 +1035,15 @@
     config["tasks"].update(expanded_tasks)
 
 
-def load_config(http_url, file_config, allow_imports=True):
+def maybe_overwrite_bazel_version(bazel_version, config):
+    if not bazel_version:
+        return
+    for task in config.get("tasks", {}):
+        config["tasks"][task]["old_bazel"] = config["tasks"][task].get("bazel")
+        config["tasks"][task]["bazel"] = bazel_version
+
+
+def load_config(http_url, file_config, allow_imports=True, bazel_version=None):
     if http_url:
         config = load_remote_yaml_file(http_url)
     else:
@@ -1041,6 +1061,7 @@
     if "tasks" not in config:
         config["tasks"] = {}
 
+    maybe_overwrite_bazel_version(bazel_version, config)
     expand_task_config(config)
 
     imports = config.pop("imports", None)
@@ -1049,7 +1070,7 @@
             raise BuildkiteException("Nested imports are not allowed")
 
         for i in imports:
-            imported_tasks = load_imported_tasks(i, http_url, file_config)
+            imported_tasks = load_imported_tasks(i, http_url, file_config, bazel_version)
             config["tasks"].update(imported_tasks)
 
     if len(config["tasks"]) > MAX_TASK_NUMBER:
@@ -1066,7 +1087,7 @@
         return yaml.safe_load(reader(resp))
 
 
-def load_imported_tasks(import_name, http_url, file_config):
+def load_imported_tasks(import_name, http_url, file_config, bazel_version):
     if "/" in import_name:
         raise BuildkiteException("Invalid import '%s'" % import_name)
 
@@ -1077,7 +1098,7 @@
     else:
         file_config = new_path
 
-    imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False)
+    imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False, bazel_version=bazel_version)
 
     namespace = import_name.partition(".")[0]
     tasks = {}
@@ -2777,7 +2798,7 @@
             process.kill()
 
 
-def create_step(label, commands, platform, shards=1, soft_fail=None):
+def create_step(label, commands, platform, shards=1, soft_fail=None, concurrency=None, concurrency_group=None):
     if "docker-image" in PLATFORMS[platform]:
         step = create_docker_step(
             label,
@@ -2823,6 +2844,10 @@
         step["retry"]["automatic"].append({"exit_status": 128, "limit": 1})
         step["retry"]["automatic"].append({"exit_status": 1, "limit": 1})
 
+    if concurrency and concurrency_group:
+        step["concurrency"] = concurrency
+        step["concurrency_group"] = concurrency_group
+
     return step
 
 
@@ -4455,6 +4480,7 @@
     runner.add_argument("--task", action="store", type=str, default="")
     runner.add_argument("--file_config", type=str)
     runner.add_argument("--http_config", type=str)
+    runner.add_argument("--overwrite_bazel_version", type=str, help="Overwrite the bazel version in the config file.")
     runner.add_argument("--git_repository", type=str)
     runner.add_argument(
         "--git_commit", type=str, help="Reset the git repository to this commit after cloning it"
@@ -4533,7 +4559,9 @@
             elif args.git_repository:
                 clone_git_repository(args.git_repository, args.git_commit)
 
-            configs = fetch_configs(args.http_config, args.file_config)
+            # Maybe overwrite the bazel version for each task, we have to do it before the config expansion.
+            bazel_version = args.overwrite_bazel_version
+            configs = fetch_configs(args.http_config, args.file_config, bazel_version)
             tasks = configs.get("tasks", {})
             task_config = tasks.get(args.task)
             if not task_config:
@@ -4553,6 +4581,12 @@
             if "BUILDKITE_MESSAGE" in os.environ:
                 os.environ["BUILDKITE_MESSAGE"] = os.environ["BUILDKITE_MESSAGE"][:1000]
 
+            # Give user a warning that the bazel version in the config file has been overridden.
+            old_bazel = task_config.get("old_bazel")
+            if old_bazel:
+                new_bazel = task_config.get("bazel")
+                print_collapsed_group(f":bazel: Bazel version overridden from {old_bazel} to {new_bazel}")
+
             execute_commands(
                 task_config=task_config,
                 platform=platform,