| # pylint: disable=g-direct-third-party-import | 
 | # Copyright 2017 The Bazel Authors. All rights reserved. | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); | 
 | # you may not use this file except in compliance with the License. | 
 | # You may obtain a copy of the License at | 
 | # | 
 | #    http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 | """A class that creates junctions in temp directories on Windows. | 
 |  | 
 | Only use this class on Windows, do not use on other platforms. Other platforms | 
 | support longer paths than Windows, and also support symlinks. Windows only | 
 | supports junctions (directory symlinks). | 
 |  | 
 | Junctions are useful if you need to shorten a long path. A long path is one that | 
 | is at least MAX_PATH (260) letters long. This is a constant in Windows, see | 
 | <windows.h> and API documentation for file-handling functions such as | 
 | CreateFileA. | 
 | """ | 
 |  | 
 | import os | 
 | import subprocess | 
 | import tempfile | 
 |  | 
 |  | 
 | class JunctionCreationError(Exception): | 
 |   """Raised when TempJunction fails to create an NTFS junction.""" | 
 |  | 
 |   def __init__(self, path, target, stdout): | 
 |     Exception.__init__( | 
 |         self, | 
 |         "Could not create junction \"%s\" -> \"%s\"\nError from mklink:\n%s" % | 
 |         (path, target, stdout)) | 
 |  | 
 |  | 
 | class TempJunction(object): | 
 |   r"""Junction in a temp directory. | 
 |  | 
 |   This object creates a temp directory and a junction under it. The junction | 
 |   points to a user-specified path. | 
 |  | 
 |   Use this object if you want to write files under long paths (absolute path at | 
 |   least MAX_PATH (260) chars long). Pass the directory you want to "shorten" as | 
 |   the initializer's argument. This object will create a junction under a shorter | 
 |   path, that points to the long directory. The combined path of the junction and | 
 |   files under it are more likely to be short than the original paths were. | 
 |  | 
 |   Usage example: | 
 |     with TempJunction("C:/some/long/path") as junc: | 
 |       # `junc` created a temp directory and a junction in it that points to | 
 |       # \\?\C:\some\long\path, and is itself shorter than that | 
 |       shortpath = os.path.join(junc, "file.txt") | 
 |       with open(shortpath, "w") as f: | 
 |         ...do something with f... | 
 |     # `junc` deleted the junction and its parent temp directory upon leaving | 
 |     # the `with` statement's body | 
 |     ...do something else... | 
 |   """ | 
 |  | 
 |   def __init__(self, | 
 |                junction_target, | 
 |                testonly_mkdtemp=None, | 
 |                testonly_maxpath=None): | 
 |     """Initialize this object. | 
 |  | 
 |     Args: | 
 |       junction_target: string; an absolute Windows path; the __enter__ method | 
 |         creates a junction that points to this path | 
 |       testonly_mkdtemp: function(); for testing only; a custom function that | 
 |         returns a temp directory path, you can use it to mock out | 
 |         tempfile.mkdtemp | 
 |       testonly_maxpath: int; for testing oly; maximum path length before the | 
 |         path is a "long path" (typically MAX_PATH on Windows) | 
 |     """ | 
 |     self._target = os.path.abspath(junction_target) | 
 |     self._junction = None | 
 |     self._mkdtemp = testonly_mkdtemp or tempfile.mkdtemp | 
 |     self._max_path = testonly_maxpath or 248 | 
 |  | 
 |   @staticmethod | 
 |   def _Mklink(name, target): | 
 |     proc = subprocess.Popen( | 
 |         "cmd.exe /C mklink /J \"%s\" \"\\\\?\\%s\"" % (name, target), | 
 |         stdout=subprocess.PIPE, | 
 |         stderr=subprocess.STDOUT) | 
 |     exitcode = proc.wait() | 
 |     if exitcode != 0: | 
 |       stdout = proc.communicate()[0] | 
 |       raise JunctionCreationError(name, target, stdout) | 
 |  | 
 |   @staticmethod | 
 |   def _TryMkdir(path): | 
 |     try: | 
 |       os.mkdir(path) | 
 |     except OSError as e: | 
 |       # Another process may have already created this directory. | 
 |       if not os.path.isdir(path): | 
 |         raise IOError("Could not create directory at '%s': %s" % (path, str(e))) | 
 |  | 
 |   @staticmethod | 
 |   def _MakeLinks(target, mkdtemp, max_path): | 
 |     """Creates a temp directory and a junction in it, pointing to `target`. | 
 |  | 
 |     Creates all parent directories of `target` if they don't exist. | 
 |  | 
 |     Args: | 
 |       target: string; path to the directory that is the junction's target | 
 |       mkdtemp: function():string; creates a temp directory and returns its | 
 |         absolute path | 
 |       max_path: int; maximum path length before the path is a "long path" | 
 |         (typically MAX_PATH on Windows) | 
 |     Returns: | 
 |       The full path to the junction. | 
 |     Raises: | 
 |       JunctionCreationError: if `mklink` fails to create a junction | 
 |     """ | 
 |     segments = [] | 
 |     dirpath = target | 
 |     while not os.path.isdir(dirpath): | 
 |       dirpath, child = os.path.split(dirpath) | 
 |       if child: | 
 |         segments.append(child) | 
 |     tmp = mkdtemp() | 
 |     juncpath = os.path.join(tmp, "j") | 
 |     for child in reversed(segments): | 
 |       childpath = os.path.join(dirpath, child) | 
 |       if len(childpath) >= max_path: | 
 |         try: | 
 |           TempJunction._Mklink(juncpath, dirpath) | 
 |           TempJunction._TryMkdir(os.path.join(juncpath, child)) | 
 |         finally: | 
 |           os.rmdir(juncpath) | 
 |       else: | 
 |         TempJunction._TryMkdir(childpath) | 
 |       dirpath = childpath | 
 |     TempJunction._Mklink(juncpath, target) | 
 |     return juncpath | 
 |  | 
 |   def __enter__(self): | 
 |     """Creates a temp directory and a junction in it, pointing to self._target. | 
 |  | 
 |     Creates all parent directories of self._target if they don't exist. | 
 |  | 
 |     This method is automatically called upon entering a `with` statement's body. | 
 |  | 
 |     Returns: | 
 |       The full path to the junction. | 
 |     Raises: | 
 |       JunctionCreationError: if `mklink` fails to create a junction | 
 |     """ | 
 |     self._junction = TempJunction._MakeLinks(self._target, self._mkdtemp, | 
 |                                              self._max_path) | 
 |     return self._junction | 
 |  | 
 |   def __exit__(self, unused_type, unused_value, unused_traceback): | 
 |     """Deletes the junction and its parent directory. | 
 |  | 
 |     This method is automatically called upon leaving a `with` statement's body. | 
 |  | 
 |     Args: | 
 |       unused_type: unused | 
 |       unused_value: unused | 
 |       unused_traceback: unused | 
 |     """ | 
 |     if self._junction: | 
 |       os.rmdir(self._junction) | 
 |       os.rmdir(os.path.dirname(self._junction)) |