Implement sharding of build and test targets (#618)
This commit implements optional sharding. The algorithm resolves all test target patterns, then uses a list of build and test targets to distribute all targets across shards. Currently the sharding protocol assigns every N-th target (starting at offset I) to a shard, with N being the number of shards and I being the shard ID.
Example run: https://buildkite.com/bazel/fwe-test/builds/44 (with debug output)
The distribution of shell tests is not yet ideal, hence the distribution of test times is quite uneven.
This feature is still experimental, hence it's not mentioned in the documentation.
diff --git a/buildkite/bazelci.py b/buildkite/bazelci.py
index ceab73b..59af243 100644
--- a/buildkite/bazelci.py
+++ b/buildkite/bazelci.py
@@ -620,8 +620,6 @@
if incompatible_flags:
bazel_version = None
- build_only = build_only or "test_targets" not in task_config
- test_only = test_only or "build_targets" not in task_config
if build_only and test_only:
raise BuildkiteException("build_only and test_only cannot be true at the same time")
@@ -737,20 +735,26 @@
if needs_clean:
execute_bazel_clean(bazel_binary, platform)
- if not test_only:
+ build_targets, test_targets = calculate_targets(
+ task_config, platform, bazel_binary, build_only, test_only
+ )
+ if not build_targets and not test_targets:
+ raise BuildkiteException("There are neither build nor test targets")
+
+ if build_targets:
execute_bazel_build(
bazel_version,
bazel_binary,
platform,
task_config.get("build_flags", []),
- task_config.get("build_targets", None),
+ build_targets,
None,
incompatible_flags,
)
if save_but:
upload_bazel_binary(platform)
- if not build_only:
+ if test_targets:
test_flags = task_config.get("test_flags", [])
if test_env_vars:
test_flags += ["--test_env={}".format(v) for v in test_env_vars]
@@ -772,7 +776,7 @@
bazel_binary,
platform,
test_flags,
- task_config.get("test_targets", None),
+ test_targets,
test_bep_file,
monitor_flaky_tests,
incompatible_flags,
@@ -1275,6 +1279,73 @@
handle_bazel_failure(e, "build")
+def calculate_targets(task_config, platform, bazel_binary, build_only, test_only):
+ build_targets = [] if test_only else task_config.get("build_targets", [])
+ test_targets = [] if build_only else task_config.get("test_targets", [])
+
+ shard_id = int(os.getenv("BUILDKITE_PARALLEL_JOB", "-1"))
+ shard_count = int(os.getenv("BUILDKITE_PARALLEL_JOB_COUNT", "-1"))
+ if shard_id > -1 and shard_count > -1:
+ print_collapsed_group(
+ ":female-detective: Calculating targets for shard {}/{}".format(
+ shard_id + 1, shard_count
+ )
+ )
+ expanded_test_targets = expand_test_target_patterns(bazel_binary, platform, test_targets)
+ build_targets, test_targets = get_targets_for_shard(
+ build_targets, expanded_test_targets, shard_id, shard_count
+ )
+
+ return build_targets, test_targets
+
+
+def expand_test_target_patterns(bazel_binary, platform, test_targets):
+ included_targets, excluded_targets = partition_test_targets(test_targets)
+ excluded_string = (
+ " except tests(set({}))".format(" ".join("'{}'".format(t) for t in excluded_targets))
+ if excluded_targets
+ else ""
+ )
+
+ eprint("Resolving test targets via bazel query")
+ output = execute_command_and_get_output(
+ [bazel_binary]
+ + common_startup_flags(platform)
+ + [
+ "--nomaster_bazelrc",
+ "--bazelrc=/dev/null",
+ "query",
+ "tests(set({})){}".format(
+ " ".join("'{}'".format(t) for t in included_targets), excluded_string
+ ),
+ ],
+ print_output=False,
+ capture_stderr=False,
+ )
+ return output.split("\n")
+
+
+def partition_test_targets(test_targets):
+ included_targets, excluded_targets = [], []
+ for target in test_targets:
+ if target == "--":
+ continue
+ elif target.startswith("-"):
+ excluded_targets.append(target[1:])
+ else:
+ included_targets.append(target)
+
+ return included_targets, excluded_targets
+
+
+def get_targets_for_shard(build_targets, test_targets, shard_id, shard_count):
+ # TODO(fweikert): implement a more sophisticated algorithm
+ build_targets_for_this_shard = sorted(build_targets)[shard_id::shard_count]
+ test_targets_for_this_shard = sorted(test_targets)[shard_id::shard_count]
+
+ return build_targets_for_this_shard, test_targets_for_this_shard
+
+
def execute_bazel_test(
bazel_version,
bazel_binary,
@@ -1393,7 +1464,9 @@
return targets
-def execute_command_and_get_output(args, shell=False, fail_if_nonzero=True):
+def execute_command_and_get_output(
+ args, shell=False, fail_if_nonzero=True, print_output=True, capture_stderr=True
+):
eprint(" ".join(args))
process = subprocess.run(
args,
@@ -1401,11 +1474,13 @@
check=fail_if_nonzero,
env=os.environ,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
+ stderr=subprocess.STDOUT if capture_stderr else None,
errors="replace",
universal_newlines=True,
)
- eprint(process.stdout)
+ if print_output:
+ eprint(process.stdout)
+
return process.stdout
@@ -1419,14 +1494,14 @@
return subprocess.Popen(args, env=os.environ)
-def create_step(label, commands, platform=DEFAULT_PLATFORM):
+def create_step(label, commands, platform=DEFAULT_PLATFORM, shards=1):
host_platform = PLATFORMS[platform].get("host-platform", platform)
if "docker-image" in PLATFORMS[platform]:
- return create_docker_step(
+ step = create_docker_step(
label, image=PLATFORMS[platform]["docker-image"], commands=commands
)
else:
- return {
+ step = {
"label": label,
"command": commands,
"agents": {
@@ -1436,6 +1511,12 @@
},
}
+ if shards > 1:
+ step["label"] += " (shard %n)"
+ step["parallelism"] = shards
+
+ return step
+
def create_docker_step(label, image, commands=None, additional_env_vars=None):
env = ["BUILDKITE_ARTIFACT_UPLOAD_DESTINATION"]
@@ -1535,6 +1616,12 @@
git_commit = get_last_green_commit(last_green_commit_url)
for task, task_config in task_configs.items():
+ shards = task_config.get("shards", "1")
+ try:
+ shards = int(shards)
+ except ValueError:
+ raise BuildkiteException("Task {} has invalid shard value '{}'".format(task, shards))
+
step = runner_step(
platform=get_platform_for_task(task, task_config),
task=task,
@@ -1547,6 +1634,7 @@
monitor_flaky_tests=monitor_flaky_tests,
use_but=use_but,
incompatible_flags=incompatible_flags,
+ shards=shards,
)
pipeline_steps.append(step)
@@ -1636,6 +1724,7 @@
monitor_flaky_tests=False,
use_but=False,
incompatible_flags=None,
+ shards=1,
):
host_platform = PLATFORMS[platform].get("host-platform", platform)
command = python_binary(host_platform) + " bazelci.py runner --task=" + task
@@ -1655,7 +1744,7 @@
command += " --incompatible_flag=" + flag
label = create_label(platform, project_name, task_name=task_name)
return create_step(
- label=label, commands=[fetch_bazelcipy_command(), command], platform=platform
+ label=label, commands=[fetch_bazelcipy_command(), command], platform=platform, shards=shards
)