A few improvements to bazel ci scripts (#2076)
`aggregate_incompatible_flags_test_result.py`:
- Stripped out timestamps added by BuildKite in job log.
- Added support for collecting incompatible flag test result for
https://buildkite.com/bazel/bcr-bazel-compatibility-test
`bazelci.py`:
- Added support for overriding Bazel version in task config before task
expansion.
- Support `concurrency` and `concurrency_group` to limit CI resource
usage.
- Avoid hitting 429 Too Many Requests error while fetching a large
number of buildkite job logs.
diff --git a/buildkite/aggregate_incompatible_flags_test_result.py b/buildkite/aggregate_incompatible_flags_test_result.py
index e0fadd2..99e78c7 100755
--- a/buildkite/aggregate_incompatible_flags_test_result.py
+++ b/buildkite/aggregate_incompatible_flags_test_result.py
@@ -31,6 +31,26 @@
FLAG_LINE_PATTERN = re.compile(r"\s*(?P<flag>--\S+)\s*")
+MODULE_VERSION_PATTERN = re.compile(r'(?P<module_version>[a-z](?:[a-z0-9._-]*[a-z0-9])?@[^\s]+)')
+
+BAZEL_TEAM_OWNED_MODULES = frozenset([
+ "bazel-skylib",
+ "rules_android",
+ "rules_android_ndk",
+ "rules_cc",
+ "rules_java",
+ "rules_license",
+ "rules_pkg",
+ "rules_platform",
+ "rules_shell",
+ "rules_testing",
+])
+
+PROJECT = "module" if PIPELINE == "bcr-bazel-compatibility-test" else "project"
+
+MAX_LOG_FETCHER_THREADS = 30
+LOG_FETCHER_SEMAPHORE = threading.Semaphore(MAX_LOG_FETCHER_THREADS)
+
class LogFetcher(threading.Thread):
def __init__(self, job, client):
threading.Thread.__init__(self)
@@ -39,7 +59,8 @@
self.log = None
def run(self):
- self.log = self.client.get_build_log(self.job)
+ with LOG_FETCHER_SEMAPHORE:
+ self.log = self.client.get_build_log(self.job)
def process_build_log(failed_jobs_per_flag, already_failing_jobs, log, job):
@@ -59,6 +80,10 @@
if index_success == -1 or index_failure == -1:
raise bazelci.BuildkiteException("Cannot recognize log of " + job["web_url"])
for line in log[index_failure:].split("\n"):
+ # Strip out BuildKite timestamp prefix
+ line = re.sub(r'\x1b.*?\x07', '', line.strip())
+ if not line:
+ break
handle_failing_flags(line)
log = log[0 : log.rfind("+++ Result")]
@@ -67,6 +92,12 @@
already_failing_jobs.append(job)
+def extract_module_version(line):
+ match = MODULE_VERSION_PATTERN.search(line)
+ if match:
+ return match.group("module_version")
+
+
def extract_flag(line):
match = FLAG_LINE_PATTERN.match(line)
if match:
@@ -77,19 +108,28 @@
return f'<a href="{link}" target="_blank">{content}</a>'
+def is_project_owned_by_bazel_team(project):
+ if bazelci.is_downstream_pipeline() and project in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[project].get(
+ "owned_by_bazel"
+ ):
+ # Check the downstream projects definition.
+ return True
+ elif project.split("@")[0] in BAZEL_TEAM_OWNED_MODULES:
+ # Parse the module name and check if it's bazel team owned.
+ return True
+ return False
+
# Check if any of the given jobs needs to be migrated by the Bazel team
def needs_bazel_team_migrate(jobs):
for job in jobs:
- pipeline, _ = get_pipeline_and_platform(job)
- if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[pipeline].get(
- "owned_by_bazel"
- ):
+ project = get_project_name(job)
+ if is_project_owned_by_bazel_team(project):
return True
return False
def print_flags_ready_to_flip(failed_jobs_per_flag, incompatible_flags):
- info_text1 = ["#### The following flags didn't break any passing projects"]
+ info_text1 = [f"#### The following flags didn't break any passing {PROJECT}s"]
for flag in sorted(list(incompatible_flags.keys())):
if flag not in failed_jobs_per_flag:
html_link_text = get_html_link_text(":github:", incompatible_flags[flag])
@@ -99,7 +139,7 @@
info_text1 = []
info_text2 = [
- "#### The following flags didn't break any passing Bazel team owned/co-owned projects"
+ f"#### The following flags didn't break any passing Bazel team owned/co-owned {PROJECT}s"
]
for flag, jobs in failed_jobs_per_flag.items():
if flag not in incompatible_flags:
@@ -128,7 +168,7 @@
def print_projects_need_to_migrate(failed_jobs_per_flag):
- info_text = ["#### The following projects need migration"]
+ info_text = [f"#### The following {PROJECT}s need migration"]
jobs_need_migration = {}
for jobs in failed_jobs_per_flag.values():
for job in jobs.values():
@@ -141,14 +181,14 @@
projects = set()
for job in job_list:
- project, _ = get_pipeline_and_platform(job)
+ project = get_project_name(job)
projects.add(project)
project_num = len(projects)
s1 = "" if project_num == 1 else "s"
s2 = "s" if project_num == 1 else ""
info_text.append(
- f"<details><summary>{project_num} project{s1} need{s2} migration, click to see details</summary><ul>"
+ f"<details><summary>{project_num} {PROJECT}{s1} need{s2} migration, click to see details</summary><ul>"
)
entries = merge_and_format_jobs(job_list, " <li><strong>{}</strong>: {}</li>")
@@ -179,62 +219,68 @@
if jobs:
github_url = incompatible_flags[flag]
info_text = [f"* **{flag}** " + get_html_link_text(":github:", github_url)]
- jobs_per_pipeline = merge_jobs(jobs.values())
- for pipeline, platforms in jobs_per_pipeline.items():
+ jobs_per_project = merge_jobs(jobs.values())
+ for project, platforms in jobs_per_project.items():
bazel_mark = ""
- if pipeline in bazelci.DOWNSTREAM_PROJECTS and bazelci.DOWNSTREAM_PROJECTS[
- pipeline
- ].get("owned_by_bazel"):
+ if is_project_owned_by_bazel_team(project):
bazel_mark = ":bazel:"
platforms_text = ", ".join(platforms)
- info_text.append(f" - {bazel_mark}**{pipeline}**: {platforms_text}")
+ info_text.append(f" - {bazel_mark}**{project}**: {platforms_text}")
# Use flag as the context so that each flag gets a different info box.
print_info(flag, "error", info_text)
printed_flag_boxes = True
if not printed_flag_boxes:
return
info_text = [
- "#### Downstream projects need to migrate for the following flags:",
+ "#### Projects need to migrate for the following flags:",
"Projects marked with :bazel: need to be migrated by the Bazel team.",
]
print_info("flags_need_to_migrate", "error", info_text)
def merge_jobs(jobs):
- jobs_per_pipeline = collections.defaultdict(list)
+ jobs_per_project = collections.defaultdict(list)
for job in sorted(jobs, key=lambda s: s["name"].lower()):
- pipeline, platform = get_pipeline_and_platform(job)
- jobs_per_pipeline[pipeline].append(get_html_link_text(platform, job["web_url"]))
- return jobs_per_pipeline
+ project = get_project_name(job)
+ platform_label = get_platform_emoji_name(job)
+ jobs_per_project[project].append(get_html_link_text(platform_label, job["web_url"]))
+ return jobs_per_project
def merge_and_format_jobs(jobs, line_pattern):
- # Merges all jobs for a single pipeline into one line.
+ # Merges all jobs for a single project into one line.
# Example:
- # pipeline (platform1)
- # pipeline (platform2)
- # pipeline (platform3)
+ # project (platform1)
+ # project (platform2)
+ # project (platform3)
# with line_pattern ">> {}: {}" becomes
- # >> pipeline: platform1, platform2, platform3
- jobs_per_pipeline = merge_jobs(jobs)
+ # >> project: platform1, platform2, platform3
+ jobs_per_project = merge_jobs(jobs)
return [
- line_pattern.format(pipeline, ", ".join(platforms))
- for pipeline, platforms in jobs_per_pipeline.items()
+ line_pattern.format(project, ", ".join(platforms))
+ for project, platforms in jobs_per_project.items()
]
-def get_pipeline_and_platform(job):
+def get_project_name(job):
+ # Strip out platform label from job name
+ name = job["name"].replace(get_platform_emoji_name(job), "")
+ if bazelci.is_downstream_pipeline():
+ # This is for downstream pipeline, parse the pipeline name
+ return name.partition("-")[0].partition("(")[0].strip()
+ else:
+ # This is for BCR compatibility test pipeline, parse the module name + version
+ return extract_module_version(name)
+
+
+def get_platform_emoji_name(job):
+ # By search for the platform label in the job name.
name = job["name"]
- platform = ""
for p in bazelci.PLATFORMS.values():
platform_label = p.get("emoji-name")
if platform_label in name:
- platform = platform_label
- name = name.replace(platform_label, "")
- break
-
- name = name.partition("-")[0].partition("(")[0].strip()
- return name, platform
+ return platform_label
+ raise bazelci.BuildkiteException("Cannot detect platform name for: " + job["web_url"])
def print_info(context, style, info):
@@ -264,8 +310,8 @@
threads = []
for job in build_info["jobs"]:
- # Some irrelevant job has no "state" field
- if "state" in job:
+ # Some irrelevant job has no "state" or "raw_log_url" field
+ if "state" in job and "raw_log_url" in job:
thread = LogFetcher(job, client)
threads.append(thread)
thread.start()
diff --git a/buildkite/bazelci.py b/buildkite/bazelci.py
index 5e4f82a..da9b2a2 100755
--- a/buildkite/bazelci.py
+++ b/buildkite/bazelci.py
@@ -699,16 +699,28 @@
project=("bazel-public" if THIS_IS_TRUSTED else "bazel-untrusted"),
)
- def _open_url(self, url, params=[]):
- try:
- params_str = "".join("&{}={}".format(k, v) for k, v in params)
- return (
- urllib.request.urlopen("{}?access_token={}{}".format(url, self._token, params_str))
- .read()
- .decode("utf-8", "ignore")
- )
- except urllib.error.HTTPError as ex:
- raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))
+ def _open_url(self, url, params=[], retries=5):
+ params_str = "".join("&{}={}".format(k, v) for k, v in params)
+ full_url = "{}?access_token={}{}".format(url, self._token, params_str)
+
+ for attempt in range(retries):
+ try:
+ response = urllib.request.urlopen(full_url)
+ return response.read().decode("utf-8", "ignore")
+ except urllib.error.HTTPError as ex:
+ # Handle specific error codes
+ if ex.code == 429: # Too Many Requests
+ retry_after = ex.headers.get("RateLimit-Reset")
+ if retry_after:
+ wait_time = int(retry_after)
+ else:
+ wait_time = (2 ** attempt) # Exponential backoff if no RateLimit-Reset header
+
+ time.sleep(wait_time)
+ else:
+ raise BuildkiteException("Failed to open {}: {} - {}".format(url, ex.code, ex.reason))
+
+ raise BuildkiteException(f"Failed to open {url} after {retries} retries.")
def get_pipeline_info(self):
"""Get details for a pipeline given its organization slug
@@ -984,7 +996,7 @@
return expanded_task
-def fetch_configs(http_url, file_config):
+def fetch_configs(http_url, file_config, bazel_version=None):
"""
If specified fetches the build configuration from file_config or http_url, else tries to
read it from .bazelci/presubmit.yml.
@@ -993,7 +1005,7 @@
if file_config is not None and http_url is not None:
raise BuildkiteException("file_config and http_url cannot be set at the same time")
- return load_config(http_url, file_config)
+ return load_config(http_url, file_config, bazel_version=bazel_version)
def expand_task_config(config):
@@ -1023,7 +1035,15 @@
config["tasks"].update(expanded_tasks)
-def load_config(http_url, file_config, allow_imports=True):
+def maybe_overwrite_bazel_version(bazel_version, config):
+ if not bazel_version:
+ return
+ for task in config.get("tasks", {}):
+ config["tasks"][task]["old_bazel"] = config["tasks"][task].get("bazel")
+ config["tasks"][task]["bazel"] = bazel_version
+
+
+def load_config(http_url, file_config, allow_imports=True, bazel_version=None):
if http_url:
config = load_remote_yaml_file(http_url)
else:
@@ -1041,6 +1061,7 @@
if "tasks" not in config:
config["tasks"] = {}
+ maybe_overwrite_bazel_version(bazel_version, config)
expand_task_config(config)
imports = config.pop("imports", None)
@@ -1049,7 +1070,7 @@
raise BuildkiteException("Nested imports are not allowed")
for i in imports:
- imported_tasks = load_imported_tasks(i, http_url, file_config)
+ imported_tasks = load_imported_tasks(i, http_url, file_config, bazel_version)
config["tasks"].update(imported_tasks)
if len(config["tasks"]) > MAX_TASK_NUMBER:
@@ -1066,7 +1087,7 @@
return yaml.safe_load(reader(resp))
-def load_imported_tasks(import_name, http_url, file_config):
+def load_imported_tasks(import_name, http_url, file_config, bazel_version):
if "/" in import_name:
raise BuildkiteException("Invalid import '%s'" % import_name)
@@ -1077,7 +1098,7 @@
else:
file_config = new_path
- imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False)
+ imported_config = load_config(http_url=http_url, file_config=file_config, allow_imports=False, bazel_version=bazel_version)
namespace = import_name.partition(".")[0]
tasks = {}
@@ -2777,7 +2798,7 @@
process.kill()
-def create_step(label, commands, platform, shards=1, soft_fail=None):
+def create_step(label, commands, platform, shards=1, soft_fail=None, concurrency=None, concurrency_group=None):
if "docker-image" in PLATFORMS[platform]:
step = create_docker_step(
label,
@@ -2823,6 +2844,10 @@
step["retry"]["automatic"].append({"exit_status": 128, "limit": 1})
step["retry"]["automatic"].append({"exit_status": 1, "limit": 1})
+ if concurrency and concurrency_group:
+ step["concurrency"] = concurrency
+ step["concurrency_group"] = concurrency_group
+
return step
@@ -4455,6 +4480,7 @@
runner.add_argument("--task", action="store", type=str, default="")
runner.add_argument("--file_config", type=str)
runner.add_argument("--http_config", type=str)
+ runner.add_argument("--overwrite_bazel_version", type=str, help="Overwrite the bazel version in the config file.")
runner.add_argument("--git_repository", type=str)
runner.add_argument(
"--git_commit", type=str, help="Reset the git repository to this commit after cloning it"
@@ -4533,7 +4559,9 @@
elif args.git_repository:
clone_git_repository(args.git_repository, args.git_commit)
- configs = fetch_configs(args.http_config, args.file_config)
+ # Maybe overwrite the bazel version for each task, we have to do it before the config expansion.
+ bazel_version = args.overwrite_bazel_version
+ configs = fetch_configs(args.http_config, args.file_config, bazel_version)
tasks = configs.get("tasks", {})
task_config = tasks.get(args.task)
if not task_config:
@@ -4553,6 +4581,12 @@
if "BUILDKITE_MESSAGE" in os.environ:
os.environ["BUILDKITE_MESSAGE"] = os.environ["BUILDKITE_MESSAGE"][:1000]
+ # Give user a warning that the bazel version in the config file has been overridden.
+ old_bazel = task_config.get("old_bazel")
+ if old_bazel:
+ new_bazel = task_config.get("bazel")
+ print_collapsed_group(f":bazel: Bazel version overridden from {old_bazel} to {new_bazel}")
+
execute_commands(
task_config=task_config,
platform=platform,