| # Copyright 2015 The Bazel Authors. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Testing for archive.""" |
| |
| import os |
| import os.path |
| import tarfile |
| import unittest |
| |
| from tools.build_defs.pkg import archive |
| from tools.build_defs.pkg import testenv |
| |
| |
| class SimpleArFileTest(unittest.TestCase): |
| """Testing for SimpleArFile class.""" |
| |
| def assertArFileContent(self, arfile, content): |
| """Assert that arfile contains exactly the entry described by `content`. |
| |
| Args: |
| arfile: the path to the AR file to test. |
| content: an array describing the expected content of the AR file. |
| Each entry in that list should be a dictionary where each field |
| is a field to test in the corresponding SimpleArFileEntry. For |
| testing the presence of a file "x", then the entry could simply |
| be `{"filename": "x"}`, the missing field will be ignored. |
| """ |
| with archive.SimpleArFile(arfile) as f: |
| current = f.next() |
| i = 0 |
| while current: |
| error_msg = "Extraneous file at end of archive %s: %s" % ( |
| arfile, |
| current.filename |
| ) |
| self.assertTrue(i < len(content), error_msg) |
| for k, v in content[i].items(): |
| value = getattr(current, k) |
| error_msg = " ".join([ |
| "Value `%s` for key `%s` of file" % (value, k), |
| "%s in archive %s does" % (current.filename, arfile), |
| "not match expected value `%s`" % v |
| ]) |
| self.assertEqual(value, v, error_msg) |
| current = f.next() |
| i += 1 |
| if i < len(content): |
| self.fail("Missing file %s in archive %s" % (content[i], arfile)) |
| |
| def testEmptyArFile(self): |
| self.assertArFileContent(os.path.join(testenv.TESTDATA_PATH, "empty.ar"), |
| []) |
| |
| def assertSimpleFileContent(self, names): |
| datafile = os.path.join(testenv.TESTDATA_PATH, "_".join(names) + ".ar") |
| content = [{"filename": n, |
| "size": len(n.encode("utf-8")), |
| "data": n.encode("utf-8")} |
| for n in names] |
| self.assertArFileContent(datafile, content) |
| |
| def testAFile(self): |
| self.assertSimpleFileContent(["a"]) |
| |
| def testBFile(self): |
| self.assertSimpleFileContent(["b"]) |
| |
| def testABFile(self): |
| self.assertSimpleFileContent(["ab"]) |
| |
| def testA_BFile(self): |
| self.assertSimpleFileContent(["a", "b"]) |
| |
| def testA_ABFile(self): |
| self.assertSimpleFileContent(["a", "ab"]) |
| |
| def testA_B_ABFile(self): |
| self.assertSimpleFileContent(["a", "b", "ab"]) |
| |
| |
| class TarFileWriterTest(unittest.TestCase): |
| """Testing for TarFileWriter class.""" |
| |
| def assertTarFileContent(self, tar, content): |
| """Assert that tarfile contains exactly the entry described by `content`. |
| |
| Args: |
| tar: the path to the TAR file to test. |
| content: an array describing the expected content of the TAR file. |
| Each entry in that list should be a dictionary where each field |
| is a field to test in the corresponding TarInfo. For |
| testing the presence of a file "x", then the entry could simply |
| be `{"name": "x"}`, the missing field will be ignored. To match |
| the content of a file entry, use the key "data". |
| """ |
| with tarfile.open(tar, "r:") as f: |
| i = 0 |
| for current in f: |
| error_msg = "Extraneous file at end of archive %s: %s" % ( |
| tar, |
| current.name |
| ) |
| self.assertTrue(i < len(content), error_msg) |
| for k, v in content[i].items(): |
| if k == "data": |
| value = f.extractfile(current).read() |
| else: |
| value = getattr(current, k) |
| error_msg = " ".join([ |
| "Value `%s` for key `%s` of file" % (value, k), |
| "%s in archive %s does" % (current.name, tar), |
| "not match expected value `%s`" % v |
| ]) |
| self.assertEqual(value, v, error_msg) |
| i += 1 |
| if i < len(content): |
| self.fail("Missing file %s in archive %s" % (content[i], tar)) |
| |
| def setUp(self): |
| self.tempfile = os.path.join(os.environ["TEST_TMPDIR"], "test.tar") |
| |
| def tearDown(self): |
| if os.path.exists(self.tempfile): |
| os.remove(self.tempfile) |
| |
| def testEmptyTarFile(self): |
| with archive.TarFileWriter(self.tempfile): |
| pass |
| self.assertTarFileContent(self.tempfile, []) |
| |
| def assertSimpleFileContent(self, names): |
| with archive.TarFileWriter(self.tempfile) as f: |
| for n in names: |
| f.add_file(n, content=n) |
| content = ([{"name": "."}] + |
| [{"name": n, |
| "size": len(n.encode("utf-8")), |
| "data": n.encode("utf-8")} |
| for n in names]) |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testAddFile(self): |
| self.assertSimpleFileContent(["./a"]) |
| self.assertSimpleFileContent(["./b"]) |
| self.assertSimpleFileContent(["./ab"]) |
| self.assertSimpleFileContent(["./a", "./b"]) |
| self.assertSimpleFileContent(["./a", "./ab"]) |
| self.assertSimpleFileContent(["./a", "./b", "./ab"]) |
| |
| def testDottedFiles(self): |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_file("a") |
| f.add_file("/b") |
| f.add_file("./c") |
| f.add_file("./.d") |
| f.add_file("..e") |
| f.add_file(".f") |
| content = [ |
| {"name": "."}, {"name": "./a"}, {"name": "/b"}, {"name": "./c"}, |
| {"name": "./.d"}, {"name": "./..e"}, {"name": "./.f"} |
| ] |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testAddDir(self): |
| # For some strange reason, ending slash is stripped by the test |
| content = [ |
| {"name": ".", "mode": 0o755}, |
| {"name": "./a", "mode": 0o755}, |
| {"name": "./a/b", "data": b"ab", "mode": 0o644}, |
| {"name": "./a/c", "mode": 0o755}, |
| {"name": "./a/c/d", "data": b"acd", "mode": 0o644}, |
| ] |
| tempdir = os.path.join(os.environ["TEST_TMPDIR"], "test_dir") |
| # Iterate over the `content` array to create the directory |
| # structure it describes. |
| for c in content: |
| if "data" in c: |
| p = os.path.join(tempdir, c["name"][2:]) |
| os.makedirs(os.path.dirname(p)) |
| with open(p, "wb") as f: |
| f.write(c["data"]) |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_dir("./", tempdir, mode=0o644) |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testMergeTar(self): |
| content = [ |
| {"name": "./a", "data": b"a"}, |
| {"name": "./ab", "data": b"ab"}, |
| ] |
| for ext in ["", ".gz", ".bz2", ".xz"]: |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_tar(os.path.join(testenv.TESTDATA_PATH, "tar_test.tar" + ext), |
| name_filter=lambda n: n != "./b") |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testMergeTarRelocated(self): |
| content = [ |
| {"name": ".", "mode": 0o755}, |
| {"name": "./foo", "mode": 0o755}, |
| {"name": "./foo/a", "data": b"a"}, |
| {"name": "./foo/ab", "data": b"ab"}, |
| ] |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_tar(os.path.join(testenv.TESTDATA_PATH, "tar_test.tar"), |
| name_filter=lambda n: n != "./b", root="/foo") |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testAddingDirectoriesForFile(self): |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_file("d/f") |
| content = [ |
| {"name": ".", |
| "mode": 0o755}, |
| {"name": "./d", |
| "mode": 0o755}, |
| {"name": "./d/f"}, |
| ] |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testAddingDirectoriesForFileSeparately(self): |
| d_dir = os.path.join(os.environ["TEST_TMPDIR"], "d_dir") |
| os.makedirs(d_dir) |
| with open(os.path.join(d_dir, "dir_file"), "w"): |
| pass |
| a_dir = os.path.join(os.environ["TEST_TMPDIR"], "a_dir") |
| os.makedirs(a_dir) |
| with open(os.path.join(a_dir, "dir_file"), "w"): |
| pass |
| |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_dir("d", d_dir) |
| f.add_file("d/f") |
| |
| f.add_dir("a", a_dir) |
| f.add_file("a/b/f") |
| content = [ |
| {"name": ".", |
| "mode": 0o755}, |
| {"name": "./d", |
| "mode": 0o755}, |
| {"name": "./d/dir_file"}, |
| {"name": "./d/f"}, |
| {"name": "./a", |
| "mode": 0o755}, |
| {"name": "./a/dir_file"}, |
| {"name": "./a/b", |
| "mode": 0o755}, |
| {"name": "./a/b/f"}, |
| ] |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testAddingDirectoriesForFileManually(self): |
| with archive.TarFileWriter(self.tempfile) as f: |
| f.add_file("d", tarfile.DIRTYPE) |
| f.add_file("d/f") |
| |
| f.add_file("a", tarfile.DIRTYPE) |
| f.add_file("a/b", tarfile.DIRTYPE) |
| f.add_file("a/b", tarfile.DIRTYPE) |
| f.add_file("a/b/", tarfile.DIRTYPE) |
| f.add_file("a/b/c/f") |
| |
| f.add_file("x/y/f") |
| f.add_file("x", tarfile.DIRTYPE) |
| content = [ |
| {"name": ".", |
| "mode": 0o755}, |
| {"name": "./d", |
| "mode": 0o755}, |
| {"name": "./d/f"}, |
| {"name": "./a", |
| "mode": 0o755}, |
| {"name": "./a/b", |
| "mode": 0o755}, |
| {"name": "./a/b/c", |
| "mode": 0o755}, |
| {"name": "./a/b/c/f"}, |
| {"name": "./x", |
| "mode": 0o755}, |
| {"name": "./x/y", |
| "mode": 0o755}, |
| {"name": "./x/y/f"}, |
| ] |
| self.assertTarFileContent(self.tempfile, content) |
| |
| def testChangingRootDirectory(self): |
| with archive.TarFileWriter(self.tempfile, root_directory="root") as f: |
| f.add_file("d", tarfile.DIRTYPE) |
| f.add_file("d/f") |
| |
| f.add_file("a", tarfile.DIRTYPE) |
| f.add_file("a/b", tarfile.DIRTYPE) |
| f.add_file("a/b", tarfile.DIRTYPE) |
| f.add_file("a/b/", tarfile.DIRTYPE) |
| f.add_file("a/b/c/f") |
| |
| f.add_file("x/y/f") |
| f.add_file("x", tarfile.DIRTYPE) |
| content = [ |
| {"name": "root", |
| "mode": 0o755}, |
| {"name": "root/d", |
| "mode": 0o755}, |
| {"name": "root/d/f"}, |
| {"name": "root/a", |
| "mode": 0o755}, |
| {"name": "root/a/b", |
| "mode": 0o755}, |
| {"name": "root/a/b/c", |
| "mode": 0o755}, |
| {"name": "root/a/b/c/f"}, |
| {"name": "root/x", |
| "mode": 0o755}, |
| {"name": "root/x/y", |
| "mode": 0o755}, |
| {"name": "root/x/y/f"}, |
| ] |
| self.assertTarFileContent(self.tempfile, content) |
| |
| if __name__ == "__main__": |
| unittest.main() |