| %shebang% |
| |
| # This script must retain compatibility with a wide variety of Python versions |
| # since it is run for every py_binary target. Currently we guarantee support |
| # going back to Python 2.7, and try to support even Python 2.6 on a best-effort |
| # basis. We might abandon 2.6 support once users have the ability to control the |
| # above shebang string via the Python toolchain (#8685). |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import sys |
| |
| # The Python interpreter unconditionally prepends the directory containing this |
| # script (following symlinks) to the import path. This is the cause of #9239, |
| # and is a special case of #7091. We therefore explicitly delete that entry. |
| # TODO(#7091): Remove this hack when no longer necessary. |
| del sys.path[0] |
| |
| import os |
| import subprocess |
| |
| def IsRunningFromZip(): |
| return %is_zipfile% |
| |
| if IsRunningFromZip(): |
| import shutil |
| import tempfile |
| import zipfile |
| else: |
| import re |
| |
| # Return True if running on Windows |
| def IsWindows(): |
| return os.name == 'nt' |
| |
| def GetWindowsPathWithUNCPrefix(path): |
| """Adds UNC prefix after getting a normalized absolute Windows path. |
| |
| No-op for non-Windows platforms or if running under python2. |
| """ |
| path = path.strip() |
| |
| # No need to add prefix for non-Windows platforms. |
| # And \\?\ doesn't work in python 2 or on mingw |
| if not IsWindows() or sys.version_info[0] < 3: |
| return path |
| |
| # Starting in Windows 10, version 1607(OS build 14393), MAX_PATH limitations have been |
| # removed from common Win32 file and directory functions. |
| # Related doc: https://docs.microsoft.com/en-us/windows/win32/fileio/maximum-file-path-limitation?tabs=cmd#enable-long-paths-in-windows-10-version-1607-and-later |
| import platform |
| if platform.win32_ver()[1] >= '10.0.14393': |
| return path |
| |
| # import sysconfig only now to maintain python 2.6 compatibility |
| import sysconfig |
| if sysconfig.get_platform() == 'mingw': |
| return path |
| |
| # Lets start the unicode fun |
| unicode_prefix = '\\\\?\\' |
| if path.startswith(unicode_prefix): |
| return path |
| |
| # os.path.abspath returns a normalized absolute path |
| return unicode_prefix + os.path.abspath(path) |
| |
| def HasWindowsExecutableExtension(path): |
| return path.endswith('.exe') or path.endswith('.com') or path.endswith('.bat') |
| |
| PYTHON_BINARY = '%python_binary%' |
| if IsWindows() and not HasWindowsExecutableExtension(PYTHON_BINARY): |
| PYTHON_BINARY = PYTHON_BINARY + '.exe' |
| |
| def SearchPath(name): |
| """Finds a file in a given search path.""" |
| search_path = os.getenv('PATH', os.defpath).split(os.pathsep) |
| for directory in search_path: |
| if directory: |
| path = os.path.join(directory, name) |
| if os.path.isfile(path) and os.access(path, os.X_OK): |
| return path |
| return None |
| |
| def FindPythonBinary(module_space): |
| """Finds the real Python binary if it's not a normal absolute path.""" |
| return FindBinary(module_space, PYTHON_BINARY) |
| |
| def PrintVerboseCoverage(*args): |
| """Print output if VERBOSE_COVERAGE is non-empty in the environment.""" |
| if os.environ.get("VERBOSE_COVERAGE"): |
| print(*args, file=sys.stderr) |
| |
| def FindCoverageEntryPoint(module_space): |
| cov_tool = '%coverage_tool%' |
| if cov_tool: |
| PrintVerboseCoverage('Using toolchain coverage_tool %r' % cov_tool) |
| else: |
| cov_tool = os.environ.get('PYTHON_COVERAGE') |
| if cov_tool: |
| PrintVerboseCoverage('PYTHON_COVERAGE: %r' % cov_tool) |
| if cov_tool: |
| return FindBinary(module_space, cov_tool) |
| return None |
| |
| def FindBinary(module_space, bin_name): |
| """Finds the real binary if it's not a normal absolute path.""" |
| if not bin_name: |
| return None |
| if bin_name.startswith("//"): |
| # Case 1: Path is a label. Not supported yet. |
| raise AssertionError( |
| "Bazel does not support execution of Python interpreters via labels yet" |
| ) |
| elif os.path.isabs(bin_name): |
| # Case 2: Absolute path. |
| return bin_name |
| # Use normpath() to convert slashes to os.sep on Windows. |
| elif os.sep in os.path.normpath(bin_name): |
| # Case 3: Path is relative to the repo root. |
| return os.path.join(module_space, bin_name) |
| else: |
| # Case 4: Path has to be looked up in the search path. |
| return SearchPath(bin_name) |
| |
| def CreatePythonPathEntries(python_imports, module_space): |
| parts = python_imports.split(':') |
| return [module_space] + ['%s/%s' % (module_space, path) for path in parts] |
| |
| def FindModuleSpace(main_rel_path): |
| """Finds the runfiles tree.""" |
| # When the calling process used the runfiles manifest to resolve the |
| # location of this stub script, the path may be expanded. This means |
| # argv[0] may no longer point to a location inside the runfiles |
| # directory. We should therefore respect RUNFILES_DIR and |
| # RUNFILES_MANIFEST_FILE set by the caller. |
| runfiles_dir = os.environ.get('RUNFILES_DIR', None) |
| if not runfiles_dir: |
| runfiles_manifest_file = os.environ.get('RUNFILES_MANIFEST_FILE', '') |
| if (runfiles_manifest_file.endswith('.runfiles_manifest') or |
| runfiles_manifest_file.endswith('.runfiles/MANIFEST')): |
| runfiles_dir = runfiles_manifest_file[:-9] |
| # Be defensive: the runfiles dir should contain our main entry point. If |
| # it doesn't, then it must not be our runfiles directory. |
| if runfiles_dir and os.path.exists(os.path.join(runfiles_dir, main_rel_path)): |
| return runfiles_dir |
| |
| stub_filename = sys.argv[0] |
| if not os.path.isabs(stub_filename): |
| stub_filename = os.path.join(os.getcwd(), stub_filename) |
| |
| while True: |
| module_space = stub_filename + ('.exe' if IsWindows() else '') + '.runfiles' |
| if os.path.isdir(module_space): |
| return module_space |
| |
| runfiles_pattern = r'(.*\.runfiles)' + (r'\\' if IsWindows() else '/') + '.*' |
| matchobj = re.match(runfiles_pattern, stub_filename) |
| if matchobj: |
| return matchobj.group(1) |
| |
| if not os.path.islink(stub_filename): |
| break |
| target = os.readlink(stub_filename) |
| if os.path.isabs(target): |
| stub_filename = target |
| else: |
| stub_filename = os.path.join(os.path.dirname(stub_filename), target) |
| |
| raise AssertionError('Cannot find .runfiles directory for %s' % sys.argv[0]) |
| |
| def ExtractZip(zip_path, dest_dir): |
| """Extracts the contents of a zip file, preserving the unix file mode bits. |
| |
| These include the permission bits, and in particular, the executable bit. |
| |
| Ideally the zipfile module should set these bits, but it doesn't. See: |
| https://bugs.python.org/issue15795. |
| |
| Args: |
| zip_path: The path to the zip file to extract |
| dest_dir: The path to the destination directory |
| """ |
| zip_path = GetWindowsPathWithUNCPrefix(zip_path) |
| dest_dir = GetWindowsPathWithUNCPrefix(dest_dir) |
| with zipfile.ZipFile(zip_path) as zf: |
| for info in zf.infolist(): |
| zf.extract(info, dest_dir) |
| # UNC-prefixed paths must be absolute/normalized. See |
| # https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file#maximum-path-length-limitation |
| file_path = os.path.abspath(os.path.join(dest_dir, info.filename)) |
| # The Unix st_mode bits (see "man 7 inode") are stored in the upper 16 |
| # bits of external_attr. Of those, we set the lower 12 bits, which are the |
| # file mode bits (since the file type bits can't be set by chmod anyway). |
| attrs = info.external_attr >> 16 |
| if attrs != 0: # Rumor has it these can be 0 for zips created on Windows. |
| os.chmod(file_path, attrs & 0o7777) |
| |
| # Create the runfiles tree by extracting the zip file |
| def CreateModuleSpace(): |
| temp_dir = tempfile.mkdtemp('', 'Bazel.runfiles_') |
| ExtractZip(os.path.dirname(__file__), temp_dir) |
| return os.path.join(temp_dir, 'runfiles') |
| |
| # Returns repository roots to add to the import path. |
| def GetRepositoriesImports(module_space, import_all): |
| if import_all: |
| repo_dirs = [os.path.join(module_space, d) for d in os.listdir(module_space)] |
| repo_dirs.sort() |
| return [d for d in repo_dirs if os.path.isdir(d)] |
| return [os.path.join(module_space, '%workspace_name%')] |
| |
| def RunfilesEnvvar(module_space): |
| """Finds the runfiles manifest or the runfiles directory.""" |
| # If this binary is the data-dependency of another one, the other sets |
| # RUNFILES_MANIFEST_FILE or RUNFILES_DIR for our sake. |
| runfiles = os.environ.get('RUNFILES_MANIFEST_FILE', None) |
| if runfiles: |
| return ('RUNFILES_MANIFEST_FILE', runfiles) |
| |
| runfiles = os.environ.get('RUNFILES_DIR', None) |
| if runfiles: |
| return ('RUNFILES_DIR', runfiles) |
| |
| # If running from a zip, there's no manifest file. |
| if IsRunningFromZip(): |
| return ('RUNFILES_DIR', module_space) |
| |
| # Look for the runfiles "output" manifest, argv[0] + ".runfiles_manifest" |
| runfiles = module_space + '_manifest' |
| if os.path.exists(runfiles): |
| return ('RUNFILES_MANIFEST_FILE', runfiles) |
| |
| # Look for the runfiles "input" manifest, argv[0] + ".runfiles/MANIFEST" |
| runfiles = os.path.join(module_space, 'MANIFEST') |
| if os.path.exists(runfiles): |
| return ('RUNFILES_DIR', runfiles) |
| |
| # If running in a sandbox and no environment variables are set, then |
| # Look for the runfiles next to the binary. |
| if module_space.endswith('.runfiles') and os.path.isdir(module_space): |
| return ('RUNFILES_DIR', module_space) |
| |
| return (None, None) |
| |
| def Deduplicate(items): |
| """Efficiently filter out duplicates, keeping the first element only.""" |
| seen = set() |
| for it in items: |
| if it not in seen: |
| seen.add(it) |
| yield it |
| |
| def InstrumentedFilePaths(): |
| """Yields tuples of realpath of each instrumented file with the relative path.""" |
| manifest_filename = os.environ.get('COVERAGE_MANIFEST') |
| if not manifest_filename: |
| return |
| with open(manifest_filename, "r") as manifest: |
| for line in manifest: |
| filename = line.strip() |
| if not filename: |
| continue |
| try: |
| realpath = os.path.realpath(filename) |
| except OSError: |
| print( |
| "Could not find instrumented file {}".format(filename), |
| file=sys.stderr) |
| continue |
| if realpath != filename: |
| PrintVerboseCoverage("Fixing up {} -> {}".format(realpath, filename)) |
| yield (realpath, filename) |
| |
| def UnresolveSymlinks(output_filename): |
| # type: (str) -> None |
| """Replace realpath of instrumented files with the relative path in the lcov output. |
| |
| Though we are asking coveragepy to use relative file names, currently |
| ignore that for purposes of generating the lcov report (and other reports |
| which are not the XML report), so we need to go and fix up the report. |
| |
| This function is a workaround for that issue. Once that issue is fixed |
| upstream and the updated version is widely in use, this should be removed. |
| |
| See https://github.com/nedbat/coveragepy/issues/963. |
| """ |
| substitutions = list(InstrumentedFilePaths()) |
| if substitutions: |
| unfixed_file = output_filename + '.tmp' |
| os.rename(output_filename, unfixed_file) |
| with open(unfixed_file, "r") as unfixed: |
| with open(output_filename, "w") as output_file: |
| for line in unfixed: |
| if line.startswith('SF:'): |
| for (realpath, filename) in substitutions: |
| line = line.replace(realpath, filename) |
| output_file.write(line) |
| os.unlink(unfixed_file) |
| |
| def ExecuteFile(python_program, main_filename, args, env, module_space, |
| coverage_entrypoint, workspace): |
| # type: (str, str, list[str], dict[str, str], str, str|None, str|None) -> ... |
| """Executes the given Python file using the various environment settings. |
| |
| This will not return, and acts much like os.execv, except is much |
| more restricted, and handles Bazel-related edge cases. |
| |
| Args: |
| python_program: (str) Path to the Python binary to use for execution |
| main_filename: (str) The Python file to execute |
| args: (list[str]) Additional args to pass to the Python file |
| env: (dict[str, str]) A dict of environment variables to set for the execution |
| module_space: (str) Path to the module space/runfiles tree directory |
| coverage_entrypoint: (str|None) Path to the coverage tool entry point file. |
| workspace: (str|None) Name of the workspace to execute in. This is expected to be a |
| directory under the runfiles tree, and will recursively delete the |
| runfiles directory if set. |
| """ |
| # We want to use os.execv instead of subprocess.call, which causes |
| # problems with signal passing (making it difficult to kill |
| # Bazel). However, these conditions force us to run via |
| # subprocess.call instead: |
| # |
| # - On Windows, os.execv doesn't handle arguments with spaces |
| # correctly, and it actually starts a subprocess just like |
| # subprocess.call. |
| # - When running in a workspace (i.e., if we're running from a zip), |
| # we need to clean up the workspace after the process finishes so |
| # control must return here. |
| # - If we may need to emit a host config warning after execution, we |
| # can't execv because we need control to return here. This only |
| # happens for targets built in the host config. |
| # - For coverage targets, at least coveragepy requires running in |
| # two invocations, which also requires control to return here. |
| # |
| if not (IsWindows() or workspace or coverage_entrypoint): |
| _RunExecv(python_program, main_filename, args, env) |
| |
| if coverage_entrypoint is not None: |
| ret_code = _RunForCoverage(python_program, main_filename, args, env, |
| coverage_entrypoint, workspace) |
| else: |
| ret_code = subprocess.call( |
| [python_program, main_filename] + args, |
| env=env, |
| cwd=workspace |
| ) |
| |
| if workspace: |
| shutil.rmtree(os.path.dirname(module_space), True) |
| sys.exit(ret_code) |
| |
| def _RunExecv(python_program, main_filename, args, env): |
| # type: (str, str, list[str], dict[str, str]) -> ... |
| """Executes the given Python file using the various environment settings.""" |
| os.environ.update(env) |
| os.execv(python_program, [python_program, main_filename] + args) |
| |
| def _RunForCoverage(python_program, main_filename, args, env, |
| coverage_entrypoint, workspace): |
| # type: (str, str, list[str], dict[str, str], str, str|None) -> int |
| """Collects coverage infomration for the given Python file. |
| |
| Args: |
| python_program: (str) Path to the Python binary to use for execution |
| main_filename: (str) The Python file to execute |
| args: (list[str]) Additional args to pass to the Python file |
| env: (dict[str, str]) A dict of environment variables to set for the execution |
| coverage_entrypoint: (str|None) Path to the coverage entry point to execute with. |
| workspace: (str|None) Name of the workspace to execute in. This is expected to be a |
| directory under the runfiles tree, and will recursively delete the |
| runfiles directory if set. |
| """ |
| # We need for coveragepy to use relative paths. This can only be configured |
| # via an rc file, so we need to make one. |
| rcfile_name = os.path.join(os.environ['COVERAGE_DIR'], '.coveragerc') |
| with open(rcfile_name, "w") as rcfile: |
| rcfile.write('''[run] |
| relative_files = True |
| ''') |
| PrintVerboseCoverage('Coverage entrypoint:', coverage_entrypoint) |
| # First run the target Python file via coveragepy to create a .coverage |
| # database file, from which we can later export lcov. |
| ret_code = subprocess.call( |
| [ |
| python_program, |
| coverage_entrypoint, |
| "run", |
| "--rcfile=" + rcfile_name, |
| "--append", |
| "--branch", |
| main_filename |
| ] + args, |
| env=env, |
| cwd=workspace |
| ) |
| output_filename = os.path.join(os.environ['COVERAGE_DIR'], 'pylcov.dat') |
| |
| PrintVerboseCoverage('Converting coveragepy database to lcov:', output_filename) |
| # Run coveragepy again to convert its .coverage database file into lcov. |
| ret_code = subprocess.call( |
| [ |
| python_program, |
| coverage_entrypoint, |
| "lcov", |
| "--rcfile=" + rcfile_name, |
| "-o", |
| output_filename |
| ], |
| env=env, |
| cwd=workspace |
| ) or ret_code |
| try: |
| os.unlink(rcfile_name) |
| except OSError as err: |
| # It's possible that the profiled program might execute another Python |
| # binary through a wrapper that would then delete the rcfile. Not much |
| # we can do about that, besides ignore the failure here. |
| PrintVerboseCoverage('Error removing temporary coverage rc file:', err) |
| if os.path.isfile(output_filename): |
| UnresolveSymlinks(output_filename) |
| return ret_code |
| |
| def Main(): |
| args = sys.argv[1:] |
| |
| new_env = {} |
| |
| # The main Python source file. |
| # The magic string percent-main-percent is replaced with the runfiles-relative |
| # filename of the main file of the Python binary in BazelPythonSemantics.java. |
| main_rel_path = '%main%' |
| if IsWindows(): |
| main_rel_path = main_rel_path.replace('/', os.sep) |
| |
| if IsRunningFromZip(): |
| module_space = CreateModuleSpace() |
| else: |
| module_space = FindModuleSpace(main_rel_path) |
| |
| python_imports = '%imports%' |
| python_path_entries = CreatePythonPathEntries(python_imports, module_space) |
| python_path_entries += GetRepositoriesImports(module_space, %import_all%) |
| # Remove duplicates to avoid overly long PYTHONPATH (#10977). Preserve order, |
| # keep first occurrence only. |
| python_path_entries = [ |
| GetWindowsPathWithUNCPrefix(d) |
| for d in python_path_entries |
| ] |
| |
| old_python_path = os.environ.get('PYTHONPATH') |
| if old_python_path: |
| python_path_entries += old_python_path.split(os.pathsep) |
| |
| python_path = os.pathsep.join(Deduplicate(python_path_entries)) |
| |
| if IsWindows(): |
| python_path = python_path.replace('/', os.sep) |
| |
| new_env['PYTHONPATH'] = python_path |
| runfiles_envkey, runfiles_envvalue = RunfilesEnvvar(module_space) |
| if runfiles_envkey: |
| new_env[runfiles_envkey] = runfiles_envvalue |
| |
| # Don't prepend a potentially unsafe path to sys.path |
| # See: https://docs.python.org/3.11/using/cmdline.html#envvar-PYTHONSAFEPATH |
| new_env['PYTHONSAFEPATH'] = '1' |
| |
| main_filename = os.path.join(module_space, main_rel_path) |
| main_filename = GetWindowsPathWithUNCPrefix(main_filename) |
| assert os.path.exists(main_filename), \ |
| 'Cannot exec() %r: file not found.' % main_filename |
| assert os.access(main_filename, os.R_OK), \ |
| 'Cannot exec() %r: file not readable.' % main_filename |
| |
| program = python_program = FindPythonBinary(module_space) |
| if python_program is None: |
| raise AssertionError('Could not find python binary: ' + PYTHON_BINARY) |
| |
| # COVERAGE_DIR is set if coverage is enabled and instrumentation is configured |
| # for something, though it could be another program executing this one or |
| # one executed by this one (e.g. an extension module). |
| if os.environ.get('COVERAGE_DIR'): |
| cov_tool = FindCoverageEntryPoint(module_space) |
| if cov_tool is None: |
| PrintVerboseCoverage('Coverage was enabled, but python coverage tool was not configured.') |
| else: |
| # Inhibit infinite recursion: |
| if 'PYTHON_COVERAGE' in os.environ: |
| del os.environ['PYTHON_COVERAGE'] |
| |
| if not os.path.exists(cov_tool): |
| raise EnvironmentError( |
| 'Python coverage tool %r not found. ' |
| 'Try running with VERBOSE_COVERAGE=1 to collect more information.' |
| % cov_tool |
| ) |
| |
| # coverage library expects sys.path[0] to contain the library, and replaces |
| # it with the directory of the program it starts. Our actual sys.path[0] is |
| # the runfiles directory, which must not be replaced. |
| # CoverageScript.do_execute() undoes this sys.path[0] setting. |
| # |
| # Update sys.path such that python finds the coverage package. The coverage |
| # entry point is coverage.coverage_main, so we need to do twice the dirname. |
| python_path_entries = new_env['PYTHONPATH'].split(os.pathsep) |
| python_path_entries.append(os.path.dirname(os.path.dirname(cov_tool))) |
| new_env['PYTHONPATH'] = os.pathsep.join(Deduplicate(python_path_entries)) |
| else: |
| cov_tool = None |
| |
| new_env.update((key, val) for key, val in os.environ.items() if key not in new_env) |
| |
| workspace = None |
| if IsRunningFromZip(): |
| # If RUN_UNDER_RUNFILES equals 1, it means we need to |
| # change directory to the right runfiles directory. |
| # (So that the data files are accessible) |
| if os.environ.get('RUN_UNDER_RUNFILES') == '1': |
| workspace = os.path.join(module_space, '%workspace_name%') |
| |
| try: |
| sys.stdout.flush() |
| ExecuteFile( |
| python_program, main_filename, args, new_env, module_space, |
| cov_tool, workspace |
| ) |
| |
| except EnvironmentError: |
| # This works from Python 2.4 all the way to 3.x. |
| e = sys.exc_info()[1] |
| # This exception occurs when os.execv() fails for some reason. |
| if not getattr(e, 'filename', None): |
| e.filename = program # Add info to error message |
| raise |
| |
| if __name__ == '__main__': |
| Main() |