|  | # Lint as: python3 | 
|  | # Copyright 2022 The Bazel Authors. All rights reserved. | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  | """Testing for archive.""" | 
|  |  | 
|  | import copy | 
|  | import os | 
|  | import tarfile | 
|  | import unittest | 
|  |  | 
|  | from tools.mini_tar import mini_tar | 
|  |  | 
|  |  | 
|  | class TarFileWriterTest(unittest.TestCase): | 
|  | """Testing for TarFileWriter class.""" | 
|  |  | 
|  | def assertTarFileContent(self, tar, content): | 
|  | """Assert that tarfile contains exactly the entry described by `content`. | 
|  |  | 
|  | Args: | 
|  | tar: the path to the TAR file to test. | 
|  | content: an array describing the expected content of the TAR file. Each | 
|  | entry in that list should be a dictionary where each field is a | 
|  | field to test in the corresponding TarInfo. For testing the | 
|  | presence of a file "x", then the entry could simply be | 
|  | `{"name": "x"}`, the missing field will be ignored. To match the | 
|  | content of a file entry, use the key "data". | 
|  | """ | 
|  | with tarfile.open(tar, "r:") as f: | 
|  | i = 0 | 
|  | for current in f: | 
|  | error_msg = "Extraneous file at end of archive %s: %s" % (tar, | 
|  | current.name) | 
|  | self.assertLess(i, len(content), error_msg) | 
|  | for k, v in content[i].items(): | 
|  | if k == "data": | 
|  | value = f.extractfile(current).read() | 
|  | else: | 
|  | value = getattr(current, k) | 
|  | error_msg = " ".join([ | 
|  | "Value `%s` for key `%s` of file" % (value, k), | 
|  | "%s in archive %s does" % (current.name, tar), | 
|  | "not match expected value `%s`" % v | 
|  | ]) | 
|  | self.assertEqual(value, v, error_msg) | 
|  | i += 1 | 
|  | if i < len(content): | 
|  | self.fail("Missing file %s in archive %s" % (content[i], tar)) | 
|  |  | 
|  | def setUp(self): | 
|  | super(TarFileWriterTest, self).setUp() | 
|  | self.tempfile = os.path.join(os.environ["TEST_TMPDIR"], "test.tar") | 
|  |  | 
|  | def tearDown(self): | 
|  | super(TarFileWriterTest, self).tearDown() | 
|  | if os.path.exists(self.tempfile): | 
|  | os.remove(self.tempfile) | 
|  |  | 
|  | def test_empty_tar_file(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile): | 
|  | pass | 
|  | self.assertTarFileContent(self.tempfile, []) | 
|  |  | 
|  | def test_default_mtime_not_provided(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile) as f: | 
|  | self.assertEqual(f.default_mtime, 0) | 
|  |  | 
|  | def test_default_mtime_provided(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile, default_mtime=1234) as f: | 
|  | self.assertEqual(f.default_mtime, 1234) | 
|  |  | 
|  | def test_portable_mtime(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile, default_mtime="portable") as f: | 
|  | self.assertEqual(f.default_mtime, 946684800) | 
|  |  | 
|  | def test_files_with_dots(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile) as f: | 
|  | f.add_file_and_parents("a") | 
|  | f.add_file_and_parents("b/.c") | 
|  | f.add_file_and_parents("..d") | 
|  | f.add_file_and_parents(".e") | 
|  | content = [ | 
|  | { | 
|  | "name": "a" | 
|  | }, | 
|  | { | 
|  | "name": "b" | 
|  | }, | 
|  | { | 
|  | "name": "b/.c" | 
|  | }, | 
|  | { | 
|  | "name": "..d" | 
|  | }, | 
|  | { | 
|  | "name": ".e" | 
|  | }, | 
|  | ] | 
|  | self.assertTarFileContent(self.tempfile, content) | 
|  |  | 
|  | def test_add_parents(self): | 
|  | with mini_tar.TarFileWriter(self.tempfile) as f: | 
|  | f.add_parents("a/b/c/d/file") | 
|  | f.add_file_and_parents("a/b/foo") | 
|  | f.add_parents("a/b/e/file") | 
|  | content = [ | 
|  | { | 
|  | "name": "a", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | { | 
|  | "name": "a/b", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | { | 
|  | "name": "a/b/c", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | { | 
|  | "name": "a/b/c/d", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | { | 
|  | "name": "a/b/foo", | 
|  | "mode": 0o644 | 
|  | }, | 
|  | { | 
|  | "name": "a/b/e", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | ] | 
|  | self.assertTarFileContent(self.tempfile, content) | 
|  |  | 
|  | def test_adding_tree(self): | 
|  | content = [ | 
|  | { | 
|  | "name": "./a", | 
|  | "mode": 0o750 | 
|  | }, | 
|  | { | 
|  | "name": "./a/b", | 
|  | "data": b"ab", | 
|  | "mode": 0o640 | 
|  | }, | 
|  | { | 
|  | "name": "./a/c", | 
|  | "mode": 0o750 | 
|  | }, | 
|  | { | 
|  | "name": "./a/c/d", | 
|  | "data": b"acd", | 
|  | "mode": 0o640 | 
|  | }, | 
|  | ] | 
|  | tempdir = os.path.join(os.environ["TEST_TMPDIR"], "test_dir") | 
|  | # Iterate over the `content` array to create the directory | 
|  | # structure it describes. | 
|  | for c in content: | 
|  | if "data" in c: | 
|  | p = os.path.join(tempdir, c["name"]) | 
|  | os.makedirs(os.path.dirname(p)) | 
|  | with open(p, "wb") as f: | 
|  | f.write(c["data"]) | 
|  | with mini_tar.TarFileWriter(self.tempfile) as f: | 
|  | f.add_file_at_dest(in_path=tempdir, dest_path=".", mode=0o640) | 
|  | self.assertTarFileContent(self.tempfile, content) | 
|  |  | 
|  | # Try it again, but re-rooted | 
|  | with mini_tar.TarFileWriter(self.tempfile, root_directory="foo") as f: | 
|  | f.add_file_at_dest(in_path=tempdir, dest_path="x", mode=0o640) | 
|  | n_content = [ | 
|  | { | 
|  | "name": "foo", | 
|  | "mode": 0o755 | 
|  | }, | 
|  | { | 
|  | "name": "foo/x", | 
|  | "mode": 0o750 | 
|  | }, | 
|  | ] | 
|  | for c in content: | 
|  | nc = copy.copy(c) | 
|  | nc["name"] = "foo/x/" + c["name"][2:] | 
|  | n_content.append(nc) | 
|  | self.assertTarFileContent(self.tempfile, n_content) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | unittest.main() |