| # Copyright 2020 The Bazel Authors. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Tests for unittest.bash.""" |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import os |
| import re |
| import shutil |
| import stat |
| import subprocess |
| import tempfile |
| import textwrap |
| import unittest |
| |
| # The test setup for this external test is forwarded to the internal bash test. |
| # This allows the internal test to use the same runfiles to load unittest.bash. |
| _TEST_PREAMBLE = """ |
| #!/bin/bash |
| # --- begin runfiles.bash initialization --- |
| if [[ -f "${RUNFILES_DIR:-/dev/null}/bazel_tools/tools/bash/runfiles/runfiles.bash" ]]; then |
| source "${RUNFILES_DIR}/bazel_tools/tools/bash/runfiles/runfiles.bash" |
| else |
| echo >&2 "ERROR: cannot find @bazel_tools//tools/bash/runfiles:runfiles.bash" |
| exit 1 |
| fi |
| # --- end runfiles.bash initialization --- |
| |
| echo "Writing XML to ${XML_OUTPUT_FILE}" |
| |
| source "$(rlocation "io_bazel/src/test/shell/unittest.bash")" \ |
| || { echo "Could not source unittest.bash" >&2; exit 1; } |
| """ |
| |
| ANSI_ESCAPE = re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]") |
| |
| |
| def remove_ansi(line): |
| """Remove ANSI-style escape sequences from the input.""" |
| return ANSI_ESCAPE.sub("", line) |
| |
| |
| class TestResult(object): |
| """Save test results for easy checking.""" |
| |
| def __init__(self, asserter, return_code, output, xmlfile): |
| self._asserter = asserter |
| self._return_code = return_code |
| self._output = remove_ansi(output) |
| |
| # Read in the XML result file. |
| if os.path.isfile(xmlfile): |
| with open(xmlfile, "r") as f: |
| self._xml = f.read() |
| else: |
| # Unable to read the file, errors will be reported later. |
| self._xml = "" |
| |
| # Methods to assert on the state of the results. |
| |
| def assertLogMessage(self, message): |
| self.assertExactlyOneMatch(self._output, message) |
| |
| def assertNotLogMessage(self, message): |
| self._asserter.assertNotRegex(self._output, message) |
| |
| def assertXmlMessage(self, message): |
| self.assertExactlyOneMatch(self._xml, message) |
| |
| def assertNotXmlMessage(self, message): |
| self._asserter.assertNotRegex(self._xml, message) |
| |
| def assertSuccess(self, suite_name): |
| self._asserter.assertEqual( |
| 0, self._return_code, f"Script failed unexpectedly:\n{self._output}" |
| ) |
| self.assertLogMessage(suite_name) |
| self.assertXmlMessage('<testsuites [^/]*failures="0"') |
| self.assertXmlMessage('<testsuites [^/]*errors="0"') |
| |
| def assertNotSuccess(self, suite_name=None, failures=0, errors=0): |
| self._asserter.assertNotEqual(0, self._return_code) |
| if suite_name: |
| self.assertLogMessage(suite_name) |
| if failures: |
| self.assertXmlMessage(f'<testsuites [^/]*failures="{failures}"') |
| if errors: |
| self.assertXmlMessage(f'<testsuites [^/]*errors="{errors}"') |
| |
| def assertTestPassed(self, test_name): |
| self.assertLogMessage(f"PASSED: {test_name}") |
| |
| def assertTestFailed(self, test_name, message=""): |
| self.assertLogMessage(f"{test_name} FAILED: {message}") |
| |
| def assertExactlyOneMatch(self, text, pattern): |
| self._asserter.assertRegex(text, pattern) |
| self._asserter.assertEqual( |
| len(re.findall(pattern, text)), |
| 1, |
| msg=f"Found more than 1 match of '{pattern}' in '{text}'", |
| ) |
| |
| |
| class UnittestTest(unittest.TestCase): |
| |
| def setUp(self): |
| """Create a working directory under our temp dir.""" |
| super(UnittestTest, self).setUp() |
| self.work_dir = tempfile.mkdtemp(dir=os.environ["TEST_TMPDIR"]) |
| |
| def tearDown(self): |
| """Clean up the working directory.""" |
| super(UnittestTest, self).tearDown() |
| shutil.rmtree(self.work_dir) |
| |
| def write_file(self, filename, contents=""): |
| """Write the contents to a file in the workdir.""" |
| |
| filepath = os.path.join(self.work_dir, filename) |
| with open(filepath, "w") as f: |
| f.write(_TEST_PREAMBLE.strip()) |
| f.write(contents) |
| os.chmod(filepath, stat.S_IEXEC | stat.S_IWRITE | stat.S_IREAD) |
| |
| def find_runfiles(self): |
| if "RUNFILES_DIR" in os.environ: |
| return os.environ["RUNFILES_DIR"] |
| |
| # Fall back to being based on the srcdir. |
| if "TEST_SRCDIR" in os.environ: |
| return os.environ["TEST_SRCDIR"] |
| |
| # Base on the current dir |
| return f"{os.getcwd()}/.." |
| |
| def execute_test(self, filename, env=None, args=()): |
| """Executes the file and stores the results.""" |
| |
| filepath = os.path.join(self.work_dir, filename) |
| xmlfile = os.path.join(self.work_dir, "dummy-testlog.xml") |
| test_env = { |
| "TEST_TMPDIR": self.work_dir, |
| "RUNFILES_DIR": self.find_runfiles(), |
| "TEST_SRCDIR": os.environ["TEST_SRCDIR"], |
| "XML_OUTPUT_FILE": xmlfile, |
| } |
| # Add in env, forcing everything to be a string. |
| if env: |
| for k, v in env.items(): |
| test_env[k] = str(v) |
| completed = subprocess.run( |
| [filepath, *args], |
| env=test_env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| ) |
| return TestResult( |
| self, completed.returncode, completed.stdout.decode("utf-8"), xmlfile |
| ) |
| |
| # Actual test cases. |
| |
| def test_success(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| function test_success() { |
| echo foo >&${TEST_log} || fail "expected echo to succeed" |
| expect_log "foo" |
| } |
| |
| run_suite "success tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertSuccess("success tests") |
| result.assertTestPassed("test_success") |
| |
| def test_timestamp(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| function test_timestamp() { |
| local ts=$(timestamp) |
| [[ $ts =~ ^[0-9]{13}$ ]] || fail "timestamp wan't valid: $ts" |
| |
| local time_diff=$(get_run_time 100000 223456) |
| assert_equals $time_diff 123.456 |
| } |
| |
| run_suite "timestamp tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertSuccess("timestamp tests") |
| result.assertTestPassed("test_timestamp") |
| |
| def test_failure(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| function test_failure() { |
| fail "I'm a failure with <>&\\" escaped symbols" |
| } |
| |
| run_suite "failure tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertNotSuccess("failure tests", failures=0, errors=1) |
| result.assertTestFailed("test_failure") |
| result.assertXmlMessage( |
| 'message="I\'m a failure with <>&" escaped symbols"' |
| ) |
| result.assertXmlMessage("I'm a failure with <>&\" escaped symbols") |
| |
| def test_set_bash_errexit_prints_stack_trace(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| set -euo pipefail |
| |
| function helper() { |
| echo before |
| false |
| echo after |
| } |
| |
| function test_failure_in_helper() { |
| helper |
| } |
| |
| run_suite "bash errexit tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertNotSuccess("bash errexit tests") |
| result.assertTestFailed("test_failure_in_helper") |
| result.assertLogMessage(r"./thing.sh:\d*: in call to helper") |
| result.assertLogMessage( |
| r"./thing.sh:\d*: in call to test_failure_in_helper" |
| ) |
| |
| def test_set_bash_errexit_runs_tear_down(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| set -euo pipefail |
| |
| function tear_down() { |
| echo "Running tear_down" |
| } |
| |
| function testenv_tear_down() { |
| echo "Running testenv_tear_down" |
| } |
| |
| function test_failure_in_helper() { |
| wrong_command |
| } |
| |
| run_suite "bash errexit tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertNotSuccess("bash errexit tests") |
| result.assertTestFailed("test_failure_in_helper") |
| result.assertLogMessage("Running tear_down") |
| result.assertLogMessage("Running testenv_tear_down") |
| |
| def test_set_bash_errexit_pipefail_propagates_failure_through_pipe(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| set -euo pipefail |
| |
| function test_pipefail() { |
| wrong_command | cat |
| echo after |
| } |
| |
| run_suite "bash errexit tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertNotSuccess("bash errexit tests") |
| result.assertTestFailed("test_pipefail") |
| result.assertLogMessage("wrong_command: command not found") |
| result.assertNotLogMessage("after") |
| |
| def test_set_bash_errexit_no_pipefail_ignores_failure_before_pipe(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| set -eu |
| set +o pipefail |
| |
| function test_nopipefail() { |
| wrong_command | cat |
| echo after |
| } |
| |
| run_suite "bash errexit tests" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertSuccess("bash errexit tests") |
| result.assertTestPassed("test_nopipefail") |
| result.assertLogMessage("wrong_command: command not found") |
| result.assertLogMessage("after") |
| |
| def test_set_bash_errexit_pipefail_long_testname_succeeds(self): |
| test_name = "x" * 1000 |
| self.write_file( |
| "thing.sh", |
| """ |
| set -euo pipefail |
| |
| function test_%s() { |
| : |
| } |
| |
| run_suite "bash errexit tests" |
| """ % test_name, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertSuccess("bash errexit tests") |
| |
| def test_empty_test_fails(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| # No tests present. |
| |
| run_suite "empty test suite" |
| """, |
| ) |
| |
| result = self.execute_test("thing.sh") |
| result.assertNotSuccess("empty test suite") |
| result.assertLogMessage("No tests found.") |
| |
| def test_empty_test_succeeds_sharding(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| # Only one test. |
| function test_thing() { |
| echo |
| } |
| |
| run_suite "empty test suite" |
| """, |
| ) |
| |
| # First shard. |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": 0, |
| }, |
| ) |
| result.assertSuccess("empty test suite") |
| result.assertNotLogMessage("No tests") |
| |
| # Second shard. |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": 1, |
| }, |
| ) |
| result.assertSuccess("empty test suite") |
| result.assertLogMessage("No tests executed due to sharding") |
| |
| def test_default_sharding(self): |
| self.write_file( |
| "thing.sh", |
| """ |
| function test_x0() { |
| echo "running x0" |
| } |
| |
| function test_x1() { |
| echo "running x1" |
| } |
| |
| function test_x2() { |
| echo "running x2" |
| } |
| |
| function test_x3() { |
| echo "running x3" |
| } |
| |
| run_suite "default sharding" |
| """, |
| ) |
| |
| # 2 shards |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="default sharding", |
| total_shards=2, |
| shard_index=0, |
| expected_run=["x0", "x2"], |
| expected_not_run=["x1", "x3"], |
| ) |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="default sharding", |
| total_shards=2, |
| shard_index=1, |
| expected_run=["x1", "x3"], |
| expected_not_run=["x0", "x2"], |
| ) |
| |
| # 3 shards |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="default sharding", |
| total_shards=3, |
| shard_index=0, |
| expected_run=["x0", "x3"], |
| expected_not_run=["x1", "x2"], |
| ) |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="default sharding", |
| total_shards=3, |
| shard_index=1, |
| expected_run=["x1"], |
| expected_not_run=["x0", "x2", "x3"], |
| ) |
| |
| # 4 shards |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="default sharding", |
| total_shards=4, |
| shard_index=2, |
| expected_run=["x2"], |
| expected_not_run=["x0", "x1", "x3"], |
| ) |
| |
| def test_manual_sharding(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| define_test_shard test_x0 |
| define_test_shard test_x1 test_x2 |
| function test_x0() { |
| echo "running x0" |
| } |
| |
| function test_x1() { |
| echo "running x1" |
| } |
| |
| function test_x2() { |
| echo "running x2" |
| } |
| |
| function test_x3() { |
| echo "running x3" |
| } |
| |
| function test_x4() { |
| echo "running x4" |
| } |
| |
| function test_x5() { |
| echo "running x5" |
| } |
| |
| run_suite "manual sharding" |
| """), |
| ) |
| |
| # 3 shards |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="manual sharding", |
| total_shards=3, |
| shard_index=0, |
| expected_run=["x0"], |
| expected_not_run=["x1", "x2", "x3", "x4", "x5"], |
| ) |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="manual sharding", |
| total_shards=3, |
| shard_index=1, |
| expected_run=["x1", "x2"], |
| expected_not_run=["x0", "x3", "x4", "x5"], |
| ) |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="manual sharding", |
| total_shards=3, |
| shard_index=2, |
| expected_run=["x3", "x4", "x5"], |
| expected_not_run=["x0", "x1", "x2"], |
| ) |
| |
| # 4 shards |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="manual sharding", |
| total_shards=4, |
| shard_index=2, |
| expected_run=["x3", "x5"], |
| expected_not_run=["x0", "x1", "x2", "x4"], |
| ) |
| self.__run_sharded_test_assert_expected_tests_were_run( |
| test_file="thing.sh", |
| suite_name="manual sharding", |
| total_shards=4, |
| shard_index=3, |
| expected_run=["x4"], |
| expected_not_run=["x0", "x1", "x2", "x3", "x5"], |
| ) |
| |
| # 2 shards, no non-manual shard for remaining tests |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": 0, |
| }, |
| ) |
| result.assertNotSuccess("manual sharding") |
| result.assertLogMessage( |
| "There were tests with no explicit shards and " |
| + "every shard was manually defined." |
| ) |
| |
| # 1 shard, fewer than manual shards |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 1, |
| "TEST_SHARD_INDEX": 0, |
| }, |
| ) |
| result.assertNotSuccess("manual sharding") |
| result.assertLogMessage( |
| "More manual shards [(]2[)] defined than total shards [(]1[)]" |
| ) |
| |
| def __run_sharded_test_assert_expected_tests_were_run( |
| self, |
| *, |
| test_file, |
| suite_name, |
| total_shards, |
| shard_index, |
| expected_run, |
| expected_not_run, |
| ): |
| result = self.execute_test( |
| test_file, |
| env={ |
| "TEST_TOTAL_SHARDS": total_shards, |
| "TEST_SHARD_INDEX": shard_index, |
| }, |
| ) |
| result.assertSuccess(suite_name) |
| result.assertNotLogMessage("No tests") |
| for test in expected_run: |
| result.assertLogMessage(f"running {test}") |
| for test in expected_not_run: |
| result.assertNotLogMessage(f"running {test}") |
| |
| def test_manual_sharding_unknown_test(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| define_test_shard unknown_test |
| function test_x0() { |
| echo "running x0" |
| } |
| |
| run_suite "manual sharding unknown test" |
| """), |
| ) |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": 0, |
| }, |
| ) |
| result.assertNotSuccess("manual sharding unknown test") |
| result.assertLogMessage("not found in TESTS") |
| |
| def test_manual_sharding_double_defined_test(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| define_test_shard test_x0 |
| define_test_shard test_x0 |
| function test_x0() { |
| echo "running x0" |
| } |
| """), |
| ) |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": 0, |
| }, |
| ) |
| result.assertNotSuccess() |
| result.assertLogMessage("already in a shard") |
| |
| def test_filter_runs_only_matching_test(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| : |
| } |
| |
| function test_def() { |
| echo "running def" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", env={"TESTBRIDGE_TEST_ONLY": "test_a*"} |
| ) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_abc") |
| result.assertNotLogMessage("running def") |
| |
| def test_filter_prefix_match_only_skips_test(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| echo "running abc" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", env={"TESTBRIDGE_TEST_ONLY": "test_a"} |
| ) |
| |
| result.assertNotSuccess("tests to filter") |
| result.assertLogMessage("No tests found.") |
| |
| def test_filter_multiple_globs_runs_tests_matching_any(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| echo "running abc" |
| } |
| |
| function test_def() { |
| echo "running def" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", env={"TESTBRIDGE_TEST_ONLY": "donotmatch:*a*"} |
| ) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_abc") |
| result.assertNotLogMessage("running def") |
| |
| def test_filter_character_group_runs_only_matching_tests(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_aaa() { |
| : |
| } |
| |
| function test_daa() { |
| : |
| } |
| |
| function test_zaa() { |
| echo "running zaa" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", env={"TESTBRIDGE_TEST_ONLY": "test_[a-f]aa"} |
| ) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_aaa") |
| result.assertTestPassed("test_daa") |
| result.assertNotLogMessage("running zaa") |
| |
| def test_filter_sharded_runs_subset_of_filtered_tests(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_a0() { |
| echo "running a0" |
| } |
| |
| function test_a1() { |
| echo "running a1" |
| } |
| |
| function test_bb() { |
| echo "running bb" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| for index in range(2): |
| with self.subTest(index=index): |
| self.__filter_sharded_runs_subset_of_filtered_tests(index) |
| |
| def __filter_sharded_runs_subset_of_filtered_tests(self, index): |
| result = self.execute_test( |
| "thing.sh", |
| env={ |
| "TESTBRIDGE_TEST_ONLY": "test_a*", |
| "TEST_TOTAL_SHARDS": 2, |
| "TEST_SHARD_INDEX": index, |
| }, |
| ) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_a" + str(index)) |
| result.assertLogMessage("running a" + str(index)) |
| result.assertNotLogMessage("running a" + str(index ^ 1)) |
| result.assertNotLogMessage("running bb") |
| |
| def test_arg_runs_only_matching_test_and_issues_warning(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| : |
| } |
| |
| function test_def() { |
| echo "running def" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh", args=["test_abc"]) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_abc") |
| result.assertNotLogMessage("running def") |
| result.assertLogMessage( |
| r"WARNING: Passing test names in arguments \(--test_arg\) is " |
| "deprecated, please use --test_filter='test_abc' instead." |
| ) |
| |
| def test_arg_multiple_tests_issues_warning_with_test_filter_command(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| : |
| } |
| |
| function test_def() { |
| : |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh", args=["test_abc", "test_def"]) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_abc") |
| result.assertTestPassed("test_def") |
| result.assertLogMessage( |
| r"WARNING: Passing test names in arguments \(--test_arg\) is " |
| "deprecated, please use --test_filter='test_abc:test_def' instead." |
| ) |
| |
| def test_arg_and_filter_ignores_arg(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(""" |
| function test_abc() { |
| : |
| } |
| |
| function test_def() { |
| echo "running def" |
| } |
| |
| run_suite "tests to filter" |
| """), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", args=["test_def"], env={"TESTBRIDGE_TEST_ONLY": "test_a*"} |
| ) |
| |
| result.assertSuccess("tests to filter") |
| result.assertTestPassed("test_abc") |
| result.assertNotLogMessage("running def") |
| result.assertLogMessage( |
| "WARNING: Both --test_arg and --test_filter specified, ignoring" |
| " --test_arg" |
| ) |
| |
| def test_custom_ifs_variable_finds_and_runs_test(self): |
| for sharded in (False, True): |
| for ifs in (r"\t", ":", ","): |
| with self.subTest(ifs=ifs, sharded=sharded): |
| self.__custom_ifs_variable_finds_and_runs_test(ifs, sharded) |
| |
| def __custom_ifs_variable_finds_and_runs_test(self, ifs, sharded): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| set -euo pipefail |
| IFS=$'%s' |
| function test_foo() { |
| : |
| } |
| |
| run_suite "custom IFS test" |
| """ % ifs), |
| ) |
| |
| result = self.execute_test( |
| "thing.sh", |
| env={} |
| if not sharded |
| else {"TEST_TOTAL_SHARDS": 2, "TEST_SHARD_INDEX": 0}, |
| ) |
| |
| result.assertSuccess("custom IFS test") |
| result.assertTestPassed("test_foo") |
| |
| def test_fail_in_teardown_reports_failure(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| function tear_down() { |
| echo "tear_down log" >"${TEST_log}" |
| fail "tear_down failure" |
| } |
| |
| function test_foo() { |
| : |
| } |
| |
| run_suite "Failure in tear_down test" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh") |
| |
| result.assertNotSuccess("Failure in tear_down test", errors=1) |
| result.assertTestFailed("test_foo", "tear_down failure") |
| result.assertXmlMessage('message="tear_down failure"') |
| result.assertLogMessage("tear_down log") |
| |
| def test_fail_in_teardown_after_test_failure_reports_both_failures(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| function tear_down() { |
| echo "tear_down log" >"${TEST_log}" |
| fail "tear_down failure" |
| } |
| |
| function test_foo() { |
| echo "test_foo log" >"${TEST_log}" |
| fail "Test failure" |
| } |
| |
| run_suite "Failure in tear_down test" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh") |
| |
| result.assertNotSuccess("Failure in tear_down test", errors=1) |
| result.assertTestFailed("test_foo", "Test failure") |
| result.assertTestFailed("test_foo", "tear_down failure") |
| result.assertXmlMessage('message="Test failure"') |
| result.assertNotXmlMessage('message="tear_down failure"') |
| result.assertXmlMessage("test_foo log") |
| result.assertXmlMessage("tear_down log") |
| result.assertLogMessage("Test failure") |
| result.assertLogMessage("tear_down failure") |
| result.assertLogMessage("test_foo log") |
| result.assertLogMessage("tear_down log") |
| |
| def test_errexit_in_teardown_reports_failure(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| set -euo pipefail |
| |
| function tear_down() { |
| invalid_command |
| } |
| |
| function test_foo() { |
| : |
| } |
| |
| run_suite "errexit in tear_down test" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh") |
| |
| result.assertNotSuccess("errexit in tear_down test") |
| result.assertLogMessage("invalid_command: command not found") |
| result.assertXmlMessage('message="No failure message"') |
| result.assertXmlMessage("invalid_command: command not found") |
| |
| def test_fail_in_tear_down_after_errexit_reports_both_failures(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| set -euo pipefail |
| |
| function tear_down() { |
| echo "tear_down log" >"${TEST_log}" |
| fail "tear_down failure" |
| } |
| |
| function test_foo() { |
| invalid_command |
| } |
| |
| run_suite "fail after failure" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh") |
| |
| result.assertNotSuccess("fail after failure") |
| result.assertTestFailed( |
| "test_foo", "terminated because this command returned a non-zero status" |
| ) |
| result.assertTestFailed("test_foo", "tear_down failure") |
| result.assertLogMessage("invalid_command: command not found") |
| result.assertLogMessage("tear_down log") |
| result.assertXmlMessage('message="No failure message"') |
| result.assertXmlMessage("invalid_command: command not found") |
| |
| def test_errexit_in_tear_down_after_errexit_reports_both_failures(self): |
| self.write_file( |
| "thing.sh", |
| textwrap.dedent(r""" |
| set -euo pipefail |
| |
| function tear_down() { |
| invalid_command_tear_down |
| } |
| |
| function test_foo() { |
| invalid_command_test |
| } |
| |
| run_suite "fail after failure" |
| """), |
| ) |
| |
| result = self.execute_test("thing.sh") |
| |
| result.assertNotSuccess("fail after failure") |
| result.assertTestFailed( |
| "test_foo", "terminated because this command returned a non-zero status" |
| ) |
| result.assertLogMessage("invalid_command_test: command not found") |
| result.assertLogMessage("invalid_command_tear_down: command not found") |
| result.assertXmlMessage('message="No failure message"') |
| result.assertXmlMessage("invalid_command_test: command not found") |
| result.assertXmlMessage("invalid_command_tear_down: command not found") |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |