Android, Python tools: bugfix in junction.py
junction.TempJunction now can create all parent
directories of the junction target if they don't
exist.
Change-Id: I3e9cf34e78a3eb1ef9415036b791843a3b37f7c1
PiperOrigin-RevId: 170176180
diff --git a/tools/android/junction.py b/tools/android/junction.py
index 7dc58b8..97fa8d2 100644
--- a/tools/android/junction.py
+++ b/tools/android/junction.py
@@ -63,7 +63,10 @@
...do something else...
"""
- def __init__(self, junction_target, testonly_mkdtemp=None):
+ def __init__(self,
+ junction_target,
+ testonly_mkdtemp=None,
+ testonly_maxpath=None):
"""Initialize this object.
Args:
@@ -72,14 +75,78 @@
testonly_mkdtemp: function(); for testing only; a custom function that
returns a temp directory path, you can use it to mock out
tempfile.mkdtemp
+ testonly_maxpath: int; for testing oly; maximum path length before the
+ path is a "long path" (typically MAX_PATH on Windows)
"""
- self._target = os.path.normpath(junction_target)
+ self._target = os.path.abspath(junction_target)
self._junction = None
self._mkdtemp = testonly_mkdtemp or tempfile.mkdtemp
+ self._max_path = testonly_maxpath or 248
+
+ @staticmethod
+ def _Mklink(name, target):
+ proc = subprocess.Popen(
+ "cmd.exe /C mklink /J \"%s\" \"\\\\?\\%s\"" % (name, target),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ exitcode = proc.wait()
+ if exitcode != 0:
+ stdout = proc.communicate()[0]
+ raise JunctionCreationError(name, target, stdout)
+
+ @staticmethod
+ def _TryMkdir(path):
+ try:
+ os.mkdir(path)
+ except OSError as e:
+ # Another process may have already created this directory.
+ if not os.path.isdir(path):
+ raise IOError("Could not create directory at '%s': %s" % (path, str(e)))
+
+ @staticmethod
+ def _MakeLinks(target, mkdtemp, max_path):
+ """Creates a temp directory and a junction in it, pointing to `target`.
+
+ Creates all parent directories of `target` if they don't exist.
+
+ Args:
+ target: string; path to the directory that is the junction's target
+ mkdtemp: function():string; creates a temp directory and returns its
+ absolute path
+ max_path: int; maximum path length before the path is a "long path"
+ (typically MAX_PATH on Windows)
+ Returns:
+ The full path to the junction.
+ Raises:
+ JunctionCreationError: if `mklink` fails to create a junction
+ """
+ segments = []
+ dirpath = target
+ while not os.path.isdir(dirpath):
+ dirpath, child = os.path.split(dirpath)
+ if child:
+ segments.append(child)
+ tmp = mkdtemp()
+ juncpath = os.path.join(tmp, "j")
+ for child in reversed(segments):
+ childpath = os.path.join(dirpath, child)
+ if len(childpath) >= max_path:
+ try:
+ TempJunction._Mklink(juncpath, dirpath)
+ TempJunction._TryMkdir(os.path.join(juncpath, child))
+ finally:
+ os.rmdir(juncpath)
+ else:
+ TempJunction._TryMkdir(childpath)
+ dirpath = childpath
+ TempJunction._Mklink(juncpath, target)
+ return juncpath
def __enter__(self):
"""Creates a temp directory and a junction in it, pointing to self._target.
+ Creates all parent directories of self._target if they don't exist.
+
This method is automatically called upon entering a `with` statement's body.
Returns:
@@ -87,18 +154,9 @@
Raises:
JunctionCreationError: if `mklink` fails to create a junction
"""
- result = os.path.normpath(os.path.join(self._mkdtemp(), "j"))
- proc = subprocess.Popen(
- "cmd.exe /C mklink /J \"%s\" \"\\\\?\\%s\"" %
- (result, os.path.normpath(self._target)),
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- exitcode = proc.wait()
- if exitcode != 0:
- stdout = proc.communicate()[0]
- raise JunctionCreationError(result, self._target, stdout)
- self._junction = result
- return result
+ self._junction = TempJunction._MakeLinks(self._target, self._mkdtemp,
+ self._max_path)
+ return self._junction
def __exit__(self, unused_type, unused_value, unused_traceback):
"""Deletes the junction and its parent directory.
diff --git a/tools/android/junction_test.py b/tools/android/junction_test.py
index 0ee1b08..e3ff421 100644
--- a/tools/android/junction_test.py
+++ b/tools/android/junction_test.py
@@ -23,35 +23,48 @@
class JunctionTest(test_base.TestBase):
"""Unit tests for junction.py."""
- def testCreateJunction(self):
+ def _AssertCreateJunctionWhenTargetsParentsDontExist(self, max_path=None):
def tempdir():
return self.ScratchDir("junc temp")
- target = self.ScratchDir("junc target")
- # Make the `target` path a non-normalized Windows path with a space in it.
- # TempJunction should still work.
- target = os.path.dirname(target) + "/junc target"
+ target = self.Path("this directory/should not\\yet exist")
+ self.assertFalse(os.path.exists(os.path.dirname(os.path.dirname(target))))
+ # Make the `target` path a non-normalized Windows path with a space in it
+ # which doesn't even exist.
+ # TempJunction should still work; it should:
+ # - normalize the path, and
+ # - create all directories on the path
+ # target = os.path.dirname(target) + "/junc target"
juncpath = None
- with junction.TempJunction(target, testonly_mkdtemp=tempdir) as j:
+ with junction.TempJunction(
+ target, testonly_mkdtemp=tempdir, testonly_maxpath=max_path) as j:
juncpath = j
# Ensure that `j` created the junction.
self.assertTrue(os.path.exists(target))
self.assertTrue(os.path.exists(juncpath))
+ self.assertTrue(juncpath.endswith(os.path.join("junc temp", "j")))
+ self.assertTrue(os.path.isabs(juncpath))
# Create a file under the junction.
- filepath = os.path.join(juncpath, "file.txt")
+ filepath = os.path.join(juncpath, "some file.txt")
with open(filepath, "w") as f:
f.write("hello")
# Ensure we can reach the file via the junction and the target directory.
- self.assertTrue(os.path.exists(os.path.join(target, "file.txt")))
- self.assertTrue(os.path.exists(os.path.join(juncpath, "file.txt")))
+ self.assertTrue(os.path.exists(os.path.join(target, "some file.txt")))
+ self.assertTrue(os.path.exists(os.path.join(juncpath, "some file.txt")))
# Ensure that after the `with` block the junction and temp directories no
# longer exist, but we can still reach the file via the target directory.
- self.assertTrue(os.path.exists(os.path.join(target, "file.txt")))
- self.assertFalse(os.path.exists(os.path.join(juncpath, "file.txt")))
+ self.assertTrue(os.path.exists(os.path.join(target, "some file.txt")))
+ self.assertFalse(os.path.exists(os.path.join(juncpath, "some file.txt")))
self.assertFalse(os.path.exists(juncpath))
self.assertFalse(os.path.exists(os.path.dirname(juncpath)))
+ def testCreateJunctionWhenTargetsParentsDontExistAndPathIsShort(self):
+ self._AssertCreateJunctionWhenTargetsParentsDontExist()
+
+ def testCreateJunctionWhenTargetsParentsDontExistAndPathIsLong(self):
+ self._AssertCreateJunctionWhenTargetsParentsDontExist(1)
+
def testCannotCreateJunction(self):
def tempdir():