diff options
author | Brian Harring <ferringb@gmail.com> | 2022-12-24 13:14:53 -0800 |
---|---|---|
committer | Arthur Zamarin <arthurzam@gentoo.org> | 2022-12-25 19:49:11 +0200 |
commit | d6a7c2e44b4f497357f8569d423104232a58f384 (patch) | |
tree | 625ac52169356714a9f5e69e11f2b6cc2d72355a /tests | |
parent | compression: prefer gtar over tar if available (diff) | |
download | snakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.tar.gz snakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.tar.bz2 snakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.zip |
Reformat w/ black 22.12.0 for consistency.
Signed-off-by: Brian Harring <ferringb@gmail.com>
Signed-off-by: Arthur Zamarin <arthurzam@gentoo.org>
Diffstat (limited to 'tests')
33 files changed, 1567 insertions, 1307 deletions
diff --git a/tests/cli/test_arghparse.py b/tests/cli/test_arghparse.py index ccaa65e7..4741d861 100644 --- a/tests/cli/test_arghparse.py +++ b/tests/cli/test_arghparse.py @@ -11,56 +11,60 @@ from snakeoil.test import argparse_helpers class TestArgparseDocs: - def test_add_argument_docs(self): # force using an unpatched version of argparse reload(argparse) parser = argparse.ArgumentParser() - parser.add_argument('--foo', action='store_true') + parser.add_argument("--foo", action="store_true") # vanilla argparse doesn't support docs kwargs with pytest.raises(TypeError): parser.add_argument( - '-b', '--blah', action='store_true', docs='Blah blah blah') + "-b", "--blah", action="store_true", docs="Blah blah blah" + ) with pytest.raises(TypeError): - parser.add_argument_group('fa', description='fa la la', docs='fa la la la') + parser.add_argument_group("fa", description="fa la la", docs="fa la la la") with pytest.raises(TypeError): - parser.add_mutually_exclusive_group('fee', description='fi', docs='fo fum') + parser.add_mutually_exclusive_group("fee", description="fi", docs="fo fum") # forcibly monkey-patch argparse to allow docs kwargs reload(arghparse) - default = 'baz baz' - docs = 'blah blah' + default = "baz baz" + docs = "blah blah" for enable_docs, expected_txt in ((False, default), (True, docs)): arghparse._generate_docs = enable_docs parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(description=default, docs=docs) - subparser = subparsers.add_parser('foo', description=default, docs=docs) + subparser = subparsers.add_parser("foo", description=default, docs=docs) action = parser.add_argument( - '-b', '--blah', action='store_true', help=default, docs=docs) - arg_group = parser.add_argument_group('fa', description=default, docs=docs) + "-b", "--blah", action="store_true", help=default, docs=docs + ) + arg_group = parser.add_argument_group("fa", description=default, docs=docs) mut_arg_group = parser.add_mutually_exclusive_group() mut_action = mut_arg_group.add_argument( - '-f', '--fee', action='store_true', help=default, docs=docs) + "-f", "--fee", action="store_true", help=default, docs=docs + ) - assert getattr(parser._subparsers, 'description', None) == expected_txt - assert getattr(subparser, 'description', None) == expected_txt - assert getattr(action, 'help', None) == expected_txt - assert getattr(arg_group, 'description', None) == expected_txt - assert getattr(mut_action, 'help', None) == expected_txt + assert getattr(parser._subparsers, "description", None) == expected_txt + assert getattr(subparser, "description", None) == expected_txt + assert getattr(action, "help", None) == expected_txt + assert getattr(arg_group, "description", None) == expected_txt + assert getattr(mut_action, "help", None) == expected_txt # list/tuple-based docs arghparse._generate_docs = True - docs = 'foo bar' + docs = "foo bar" parser = argparse.ArgumentParser() list_action = parser.add_argument( - '-b', '--blah', action='store_true', help=default, docs=list(docs.split())) + "-b", "--blah", action="store_true", help=default, docs=list(docs.split()) + ) tuple_action = parser.add_argument( - '-c', '--cat', action='store_true', help=default, docs=tuple(docs.split())) - assert getattr(list_action, 'help', None) == 'foo\nbar' - assert getattr(tuple_action, 'help', None) == 'foo\nbar' + "-c", "--cat", action="store_true", help=default, docs=tuple(docs.split()) + ) + assert getattr(list_action, "help", None) == "foo\nbar" + assert getattr(tuple_action, "help", None) == "foo\nbar" class TestOptionalsParser: @@ -68,7 +72,9 @@ class TestOptionalsParser: # TODO: move this to a generic argparse fixture @pytest.fixture(autouse=True) def __setup_optionals_parser(self): - self.optionals_parser = argparse_helpers.mangle_parser(arghparse.OptionalsParser()) + self.optionals_parser = argparse_helpers.mangle_parser( + arghparse.OptionalsParser() + ) def test_no_args(self): args, unknown = self.optionals_parser.parse_known_optionals([]) @@ -76,14 +82,14 @@ class TestOptionalsParser: assert unknown == [] def test_only_positionals(self): - self.optionals_parser.add_argument('args') + self.optionals_parser.add_argument("args") args, unknown = self.optionals_parser.parse_known_optionals([]) - assert vars(args) == {'args': None} + assert vars(args) == {"args": None} assert unknown == [] def test_optionals(self): - self.optionals_parser.add_argument('--opt1') - self.optionals_parser.add_argument('args') + self.optionals_parser.add_argument("--opt1") + self.optionals_parser.add_argument("args") parse = self.optionals_parser.parse_known_optionals # no args @@ -92,37 +98,37 @@ class TestOptionalsParser: assert unknown == [] # only known optional - args, unknown = parse(['--opt1', 'yes']) - assert args.opt1 == 'yes' + args, unknown = parse(["--opt1", "yes"]) + assert args.opt1 == "yes" assert unknown == [] # unknown optional - args, unknown = parse(['--foo']) + args, unknown = parse(["--foo"]) assert args.opt1 is None - assert unknown == ['--foo'] + assert unknown == ["--foo"] # unknown optional and positional - args, unknown = parse(['--foo', 'arg']) + args, unknown = parse(["--foo", "arg"]) assert args.opt1 is None - assert unknown == ['--foo', 'arg'] + assert unknown == ["--foo", "arg"] # known optional with unknown optional - args, unknown = parse(['--opt1', 'yes', '--foo']) - assert args.opt1 == 'yes' - assert unknown == ['--foo'] + args, unknown = parse(["--opt1", "yes", "--foo"]) + assert args.opt1 == "yes" + assert unknown == ["--foo"] # different order - args, unknown = parse(['--foo', '--opt1', 'yes']) - assert args.opt1 == 'yes' - assert unknown == ['--foo'] + args, unknown = parse(["--foo", "--opt1", "yes"]) + assert args.opt1 == "yes" + assert unknown == ["--foo"] # known optional with unknown positional - args, unknown = parse(['--opt1', 'yes', 'arg']) - assert args.opt1 == 'yes' - assert unknown == ['arg'] + args, unknown = parse(["--opt1", "yes", "arg"]) + assert args.opt1 == "yes" + assert unknown == ["arg"] # known optionals parsing stops at the first positional arg - args, unknown = parse(['arg', '--opt1', 'yes']) + args, unknown = parse(["arg", "--opt1", "yes"]) assert args.opt1 is None - assert unknown == ['arg', '--opt1', 'yes'] + assert unknown == ["arg", "--opt1", "yes"] class TestCsvActionsParser: @@ -134,20 +140,19 @@ class TestCsvActionsParser: def test_bad_action(self): with pytest.raises(ValueError) as excinfo: - self.csv_parser.add_argument('--arg1', action='unknown') + self.csv_parser.add_argument("--arg1", action="unknown") assert 'unknown action "unknown"' == str(excinfo.value) def test_csv_actions(self): - self.csv_parser.add_argument('--arg1', action='csv') - self.csv_parser.add_argument('--arg2', action='csv_append') - self.csv_parser.add_argument('--arg3', action='csv_negations') - self.csv_parser.add_argument('--arg4', action='csv_negations_append') - self.csv_parser.add_argument('--arg5', action='csv_elements') - self.csv_parser.add_argument('--arg6', action='csv_elements_append') + self.csv_parser.add_argument("--arg1", action="csv") + self.csv_parser.add_argument("--arg2", action="csv_append") + self.csv_parser.add_argument("--arg3", action="csv_negations") + self.csv_parser.add_argument("--arg4", action="csv_negations_append") + self.csv_parser.add_argument("--arg5", action="csv_elements") + self.csv_parser.add_argument("--arg6", action="csv_elements_append") class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser): - def test_debug(self): # debug passed parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser(debug=True)) @@ -161,8 +166,10 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser): assert namespace.debug is False # debug passed in sys.argv -- early debug attr on the parser instance is set - with mock.patch('sys.argv', ['script', '--debug']): - parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser(debug=True)) + with mock.patch("sys.argv", ["script", "--debug"]): + parser = argparse_helpers.mangle_parser( + arghparse.ArgumentParser(debug=True) + ) assert parser.debug is True def test_debug_disabled(self): @@ -176,34 +183,36 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser): # parser attribute still exists assert parser.debug is False # but namespace attribute doesn't - assert not hasattr(namespace, 'debug') + assert not hasattr(namespace, "debug") def test_verbosity(self): values = ( ([], 0), - (['-q'], -1), - (['--quiet'], -1), - (['-v'], 1), - (['--verbose'], 1), - (['-q', '-v'], 0), - (['--quiet', '--verbose'], 0), - (['-q', '-q'], -2), - (['-v', '-v'], 2), + (["-q"], -1), + (["--quiet"], -1), + (["-v"], 1), + (["--verbose"], 1), + (["-q", "-v"], 0), + (["--quiet", "--verbose"], 0), + (["-q", "-q"], -2), + (["-v", "-v"], 2), ) for args, val in values: - with mock.patch('sys.argv', ['script'] + args): + with mock.patch("sys.argv", ["script"] + args): parser = argparse_helpers.mangle_parser( - arghparse.ArgumentParser(quiet=True, verbose=True)) + arghparse.ArgumentParser(quiet=True, verbose=True) + ) namespace = parser.parse_args(args) - assert parser.verbosity == val, '{} failed'.format(args) - assert namespace.verbosity == val, '{} failed'.format(args) + assert parser.verbosity == val, "{} failed".format(args) + assert namespace.verbosity == val, "{} failed".format(args) def test_verbosity_disabled(self): parser = argparse_helpers.mangle_parser( - arghparse.ArgumentParser(quiet=False, verbose=False)) + arghparse.ArgumentParser(quiet=False, verbose=False) + ) # ensure the options aren't there if disabled - for args in ('-q', '--quiet', '-v', '--verbose'): + for args in ("-q", "--quiet", "-v", "--verbose"): with pytest.raises(argparse_helpers.Error): namespace = parser.parse_args([args]) @@ -211,17 +220,15 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser): # parser attribute still exists assert parser.verbosity == 0 # but namespace attribute doesn't - assert not hasattr(namespace, 'verbosity') + assert not hasattr(namespace, "verbosity") class BaseArgparseOptions: - def setup_method(self, method): self.parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser()) class TestStoreBoolAction(BaseArgparseOptions): - def setup_method(self, method): super().setup_method(method) self.parser.add_argument("--testing", action=arghparse.StoreBool, default=None) @@ -229,13 +236,13 @@ class TestStoreBoolAction(BaseArgparseOptions): def test_bool_disabled(self): for raw_val in ("n", "no", "false"): for allowed in (raw_val.upper(), raw_val.lower()): - namespace = self.parser.parse_args(['--testing=' + allowed]) + namespace = self.parser.parse_args(["--testing=" + allowed]) assert namespace.testing is False def test_bool_enabled(self): for raw_val in ("y", "yes", "true"): for allowed in (raw_val.upper(), raw_val.lower()): - namespace = self.parser.parse_args(['--testing=' + allowed]) + namespace = self.parser.parse_args(["--testing=" + allowed]) assert namespace.testing is True def test_bool_invalid(self): @@ -244,249 +251,244 @@ class TestStoreBoolAction(BaseArgparseOptions): class ParseStdinTest(BaseArgparseOptions): - def setup_method(self, method): super().setup_method(method) - self.parser.add_argument( - "testing", nargs='+', action=arghparse.ParseStdin) + self.parser.add_argument("testing", nargs="+", action=arghparse.ParseStdin) def test_none_invalid(self): with pytest.raises(argparse_helpers.Error): self.parser.parse_args([]) def test_non_stdin(self): - namespace = self.parser.parse_args(['foo']) - assert namespace.testing == ['foo'] + namespace = self.parser.parse_args(["foo"]) + assert namespace.testing == ["foo"] def test_non_stdin_multiple(self): - namespace = self.parser.parse_args(['foo', 'bar']) - assert namespace.testing == ['foo', 'bar'] + namespace = self.parser.parse_args(["foo", "bar"]) + assert namespace.testing == ["foo", "bar"] def test_stdin(self): # stdin is an interactive tty - with mock.patch('sys.stdin.isatty', return_value=True): + with mock.patch("sys.stdin.isatty", return_value=True): with pytest.raises(argparse_helpers.Error) as excinfo: - namespace = self.parser.parse_args(['-']) - assert 'only valid when piping data in' in str(excinfo.value) + namespace = self.parser.parse_args(["-"]) + assert "only valid when piping data in" in str(excinfo.value) # fake piping data in for readlines, expected in ( - ([], []), - ([' '], []), - (['\n'], []), - (['\n', '\n'], []), - (['foo'], ['foo']), - (['foo '], ['foo']), - (['foo\n'], ['foo']), - (['foo', 'bar', 'baz'], ['foo', 'bar', 'baz']), - (['\nfoo\n', ' bar ', '\nbaz'], ['\nfoo', ' bar', '\nbaz']), + ([], []), + ([" "], []), + (["\n"], []), + (["\n", "\n"], []), + (["foo"], ["foo"]), + (["foo "], ["foo"]), + (["foo\n"], ["foo"]), + (["foo", "bar", "baz"], ["foo", "bar", "baz"]), + (["\nfoo\n", " bar ", "\nbaz"], ["\nfoo", " bar", "\nbaz"]), ): - with mock.patch('sys.stdin') as stdin, \ - mock.patch("builtins.open", mock.mock_open()) as mock_file: + with mock.patch("sys.stdin") as stdin, mock.patch( + "builtins.open", mock.mock_open() + ) as mock_file: stdin.readlines.return_value = readlines stdin.isatty.return_value = False - namespace = self.parser.parse_args(['-']) + namespace = self.parser.parse_args(["-"]) mock_file.assert_called_once_with("/dev/tty") assert namespace.testing == expected class TestCommaSeparatedValuesAction(BaseArgparseOptions): - def setup_method(self, method): super().setup_method(method) self.test_values = ( - ('', []), - (',', []), - (',,', []), - ('a', ['a']), - ('a,b,-c', ['a', 'b', '-c']), + ("", []), + (",", []), + (",,", []), + ("a", ["a"]), + ("a,b,-c", ["a", "b", "-c"]), ) - self.action = 'csv' + self.action = "csv" self.single_expected = lambda x: x self.multi_expected = lambda x: x def test_parse_args(self): - self.parser.add_argument('--testing', action=self.action) + self.parser.add_argument("--testing", action=self.action) for raw_val, expected in self.test_values: - namespace = self.parser.parse_args(['--testing=' + raw_val]) + namespace = self.parser.parse_args(["--testing=" + raw_val]) assert namespace.testing == self.single_expected(expected) def test_parse_multi_args(self): - self.parser.add_argument('--testing', action=self.action) + self.parser.add_argument("--testing", action=self.action) for raw_val, expected in self.test_values: - namespace = self.parser.parse_args([ - '--testing=' + raw_val, '--testing=' + raw_val, - ]) + namespace = self.parser.parse_args( + [ + "--testing=" + raw_val, + "--testing=" + raw_val, + ] + ) assert namespace.testing == self.multi_expected(expected) class TestCommaSeparatedValuesAppendAction(TestCommaSeparatedValuesAction): - def setup_method(self, method): super().setup_method(method) - self.action = 'csv_append' + self.action = "csv_append" self.multi_expected = lambda x: x + x class TestCommaSeparatedNegationsAction(TestCommaSeparatedValuesAction): - def setup_method(self, method): super().setup_method(method) self.test_values = ( - ('', ([], [])), - (',', ([], [])), - (',,', ([], [])), - ('a', ([], ['a'])), - ('-a', (['a'], [])), - ('a,-b,-c,d', (['b', 'c'], ['a', 'd'])), + ("", ([], [])), + (",", ([], [])), + (",,", ([], [])), + ("a", ([], ["a"])), + ("-a", (["a"], [])), + ("a,-b,-c,d", (["b", "c"], ["a", "d"])), ) - self.bad_args = ('-',) - self.action = 'csv_negations' + self.bad_args = ("-",) + self.action = "csv_negations" def test_parse_bad_args(self): - self.parser.add_argument('--testing', action=self.action) + self.parser.add_argument("--testing", action=self.action) for arg in self.bad_args: with pytest.raises(argparse.ArgumentTypeError) as excinfo: - namespace = self.parser.parse_args(['--testing=' + arg]) - assert 'without a token' in str(excinfo.value) + namespace = self.parser.parse_args(["--testing=" + arg]) + assert "without a token" in str(excinfo.value) class TestCommaSeparatedNegationsAppendAction(TestCommaSeparatedNegationsAction): - def setup_method(self, method): super().setup_method(method) - self.action = 'csv_negations_append' + self.action = "csv_negations_append" self.multi_expected = lambda x: tuple(x + y for x, y in zip(x, x)) class TestCommaSeparatedElementsAction(TestCommaSeparatedNegationsAction): - def setup_method(self, method): super().setup_method(method) self.test_values = ( - ('', ([], [], [])), - (',', ([], [], [])), - (',,', ([], [], [])), - ('-a', (['a'], [], [])), - ('a', ([], ['a'], [])), - ('+a', ([], [], ['a'])), - ('a,-b,-c,d', (['b', 'c'], ['a', 'd'], [])), - ('a,-b,+c,-d,+e,f', (['b', 'd'], ['a', 'f'], ['c', 'e'])), + ("", ([], [], [])), + (",", ([], [], [])), + (",,", ([], [], [])), + ("-a", (["a"], [], [])), + ("a", ([], ["a"], [])), + ("+a", ([], [], ["a"])), + ("a,-b,-c,d", (["b", "c"], ["a", "d"], [])), + ("a,-b,+c,-d,+e,f", (["b", "d"], ["a", "f"], ["c", "e"])), ) - self.bad_values = ('-', '+') - self.action = 'csv_elements' + self.bad_values = ("-", "+") + self.action = "csv_elements" class TestCommaSeparatedElementsAppendAction(TestCommaSeparatedElementsAction): - def setup_method(self, method): super().setup_method(method) - self.action = 'csv_elements_append' + self.action = "csv_elements_append" self.multi_expected = lambda x: tuple(x + y for x, y in zip(x, x)) class TestExistentPathType(BaseArgparseOptions): - def setup_method(self, method): super().setup_method(method) - self.parser.add_argument('--path', type=arghparse.existent_path) + self.parser.add_argument("--path", type=arghparse.existent_path) def test_nonexistent(self): # nonexistent path arg raises an error with pytest.raises(argparse_helpers.Error): - self.parser.parse_args(['--path=/path/to/nowhere']) + self.parser.parse_args(["--path=/path/to/nowhere"]) def test_os_errors(self, tmpdir): # random OS/FS issues raise errors - with mock.patch('os.path.realpath') as realpath: - realpath.side_effect = OSError(19, 'Random OS error') + with mock.patch("os.path.realpath") as realpath: + realpath.side_effect = OSError(19, "Random OS error") with pytest.raises(argparse_helpers.Error): - self.parser.parse_args(['--path=%s' % tmpdir]) + self.parser.parse_args(["--path=%s" % tmpdir]) def test_regular_usage(self, tmpdir): - namespace = self.parser.parse_args(['--path=%s' % tmpdir]) + namespace = self.parser.parse_args(["--path=%s" % tmpdir]) assert namespace.path == str(tmpdir) class TestExistentDirType(BaseArgparseOptions): - def setup_method(self, method): super().setup_method(method) - self.parser.add_argument('--path', type=arghparse.existent_dir) + self.parser.add_argument("--path", type=arghparse.existent_dir) def test_nonexistent(self): # nonexistent path arg raises an error with pytest.raises(argparse_helpers.Error): - self.parser.parse_args(['--path=/path/to/nowhere']) + self.parser.parse_args(["--path=/path/to/nowhere"]) def test_os_errors(self, tmp_path): # random OS/FS issues raise errors - with mock.patch('os.path.realpath') as realpath: - realpath.side_effect = OSError(19, 'Random OS error') + with mock.patch("os.path.realpath") as realpath: + realpath.side_effect = OSError(19, "Random OS error") with pytest.raises(argparse_helpers.Error): - self.parser.parse_args([f'--path={tmp_path}']) + self.parser.parse_args([f"--path={tmp_path}"]) def test_file_path(self, tmp_path): - f = tmp_path / 'file' + f = tmp_path / "file" f.touch() with pytest.raises(argparse_helpers.Error): - self.parser.parse_args([f'--path={f}']) + self.parser.parse_args([f"--path={f}"]) def test_regular_usage(self, tmp_path): - namespace = self.parser.parse_args([f'--path={tmp_path}']) + namespace = self.parser.parse_args([f"--path={tmp_path}"]) assert namespace.path == str(tmp_path) class TestNamespace: - def setup_method(self, method): self.parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser()) def test_pop(self): - self.parser.set_defaults(test='test') + self.parser.set_defaults(test="test") namespace = self.parser.parse_args([]) - assert namespace.pop('test') == 'test' + assert namespace.pop("test") == "test" # re-popping raises an exception since the attr has been removed with pytest.raises(AttributeError): - namespace.pop('test') + namespace.pop("test") # popping a nonexistent attr with a fallback returns the fallback - assert namespace.pop('nonexistent', 'foo') == 'foo' + assert namespace.pop("nonexistent", "foo") == "foo" def test_collapse_delayed(self): def _delayed_val(namespace, attr, val): setattr(namespace, attr, val) - self.parser.set_defaults(delayed=arghparse.DelayedValue(partial(_delayed_val, val=42))) + + self.parser.set_defaults( + delayed=arghparse.DelayedValue(partial(_delayed_val, val=42)) + ) namespace = self.parser.parse_args([]) assert namespace.delayed == 42 def test_bool(self): namespace = arghparse.Namespace() assert not namespace - namespace.arg = 'foo' + namespace.arg = "foo" assert namespace class TestManHelpAction: - def test_help(self, capsys): parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser()) - with mock.patch('subprocess.Popen') as popen: + with mock.patch("subprocess.Popen") as popen: # --help long option tries man page first before falling back to help output with pytest.raises(argparse_helpers.Exit): - namespace = parser.parse_args(['--help']) + namespace = parser.parse_args(["--help"]) popen.assert_called_once() - assert popen.call_args[0][0][0] == 'man' + assert popen.call_args[0][0][0] == "man" captured = capsys.readouterr() - assert captured.out.strip().startswith('usage: ') + assert captured.out.strip().startswith("usage: ") popen.reset_mock() # -h short option just displays the regular help output with pytest.raises(argparse_helpers.Exit): - namespace = parser.parse_args(['-h']) + namespace = parser.parse_args(["-h"]) popen.assert_not_called() captured = capsys.readouterr() - assert captured.out.strip().startswith('usage: ') + assert captured.out.strip().startswith("usage: ") popen.reset_mock() diff --git a/tests/cli/test_input.py b/tests/cli/test_input.py index 8efb1f59..2f15f256 100644 --- a/tests/cli/test_input.py +++ b/tests/cli/test_input.py @@ -9,12 +9,11 @@ from snakeoil.test.argparse_helpers import FakeStreamFormatter @pytest.fixture def mocked_input(): - with mock.patch('builtins.input') as mocked_input: + with mock.patch("builtins.input") as mocked_input: yield mocked_input class TestUserQuery: - @pytest.fixture(autouse=True) def __setup(self): self.out = FakeStreamFormatter() @@ -22,98 +21,104 @@ class TestUserQuery: self.query = partial(userquery, out=self.out, err=self.err) def test_default_answer(self, mocked_input): - mocked_input.return_value = '' - assert self.query('foo') == True + mocked_input.return_value = "" + assert self.query("foo") == True def test_tuple_prompt(self, mocked_input): - mocked_input.return_value = '' - prompt = 'perhaps a tuple' + mocked_input.return_value = "" + prompt = "perhaps a tuple" assert self.query(tuple(prompt.split())) == True - output = ''.join(prompt.split()) - assert self.out.get_text_stream().strip().split('\n')[0][:len(output)] == output + output = "".join(prompt.split()) + assert ( + self.out.get_text_stream().strip().split("\n")[0][: len(output)] == output + ) def test_no_default_answer(self, mocked_input): responses = { - 'a': ('z', 'Yes'), - 'b': ('y', 'No'), + "a": ("z", "Yes"), + "b": ("y", "No"), } # no default answer returns None for empty input - mocked_input.return_value = '' - assert self.query('foo', responses=responses) == None - mocked_input.return_value = 'a' - assert self.query('foo', responses=responses) == 'z' - mocked_input.return_value = 'b' - assert self.query('foo', responses=responses) == 'y' + mocked_input.return_value = "" + assert self.query("foo", responses=responses) == None + mocked_input.return_value = "a" + assert self.query("foo", responses=responses) == "z" + mocked_input.return_value = "b" + assert self.query("foo", responses=responses) == "y" def test_ambiguous_input(self, mocked_input): responses = { - 'a': ('z', 'Yes'), - 'A': ('y', 'No'), + "a": ("z", "Yes"), + "A": ("y", "No"), } - mocked_input.return_value = 'a' + mocked_input.return_value = "a" with pytest.raises(NoChoice): - self.query('foo', responses=responses) - error_output = self.err.get_text_stream().strip().split('\n')[1] - expected = 'Response %r is ambiguous (%s)' % ( - mocked_input.return_value, ', '.join(sorted(responses.keys()))) + self.query("foo", responses=responses) + error_output = self.err.get_text_stream().strip().split("\n")[1] + expected = "Response %r is ambiguous (%s)" % ( + mocked_input.return_value, + ", ".join(sorted(responses.keys())), + ) assert error_output == expected def test_default_correct_input(self, mocked_input): - for input, output in (('no', False), - ('No', False), - ('yes', True), - ('Yes', True)): + for input, output in ( + ("no", False), + ("No", False), + ("yes", True), + ("Yes", True), + ): mocked_input.return_value = input - assert self.query('foo') == output + assert self.query("foo") == output def test_default_answer_no_matches(self, mocked_input): - mocked_input.return_value = '' + mocked_input.return_value = "" with pytest.raises(ValueError): - self.query('foo', default_answer='foo') + self.query("foo", default_answer="foo") assert self.out.stream == [] def test_custom_default_answer(self, mocked_input): - mocked_input.return_value = '' - assert self.query('foo', default_answer=False) == False + mocked_input.return_value = "" + assert self.query("foo", default_answer=False) == False def test_eof_nochoice(self, mocked_input): # user hits ctrl-d mocked_input.side_effect = EOFError with pytest.raises(NoChoice): - self.query('foo') - output = self.out.get_text_stream().strip().split('\n')[1] - expected = 'Not answerable: EOF on STDIN' + self.query("foo") + output = self.out.get_text_stream().strip().split("\n")[1] + expected = "Not answerable: EOF on STDIN" assert output == expected def test_stdin_closed_nochoice(self, mocked_input): - mocked_input.side_effect = IOError(errno.EBADF, '') + mocked_input.side_effect = IOError(errno.EBADF, "") with pytest.raises(NoChoice): - self.query('foo') - output = self.out.get_text_stream().strip().split('\n')[1] - expected = 'Not answerable: STDIN is either closed, or not readable' + self.query("foo") + output = self.out.get_text_stream().strip().split("\n")[1] + expected = "Not answerable: STDIN is either closed, or not readable" assert output == expected def test_unhandled_ioerror(self, mocked_input): - mocked_input.side_effect = IOError(errno.ENODEV, '') + mocked_input.side_effect = IOError(errno.ENODEV, "") with pytest.raises(IOError): - self.query('foo') + self.query("foo") def test_bad_choice_limit(self, mocked_input): # user hits enters a bad choice 3 times in a row - mocked_input.return_value = 'bad' + mocked_input.return_value = "bad" with pytest.raises(NoChoice): - self.query('foo') + self.query("foo") assert mocked_input.call_count == 3 - output = self.err.get_text_stream().strip().split('\n')[1] + output = self.err.get_text_stream().strip().split("\n")[1] expected = "Sorry, response %r not understood." % (mocked_input.return_value,) assert output == expected def test_custom_choice_limit(self, mocked_input): # user hits enters a bad choice 5 times in a row - mocked_input.return_value = 'haha' + mocked_input.return_value = "haha" with pytest.raises(NoChoice): - self.query('foo', limit=5) + self.query("foo", limit=5) assert mocked_input.call_count == 5 - output = self.err.get_text_stream().strip().split('\n')[1] + output = self.err.get_text_stream().strip().split("\n")[1] expected = "Sorry, response %r not understood." % (mocked_input.return_value,) assert output == expected diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py index 3b70dcba..0bf26d04 100644 --- a/tests/compression/__init__.py +++ b/tests/compression/__init__.py @@ -4,78 +4,100 @@ import pytest from snakeoil import compression from snakeoil.process import CommandNotFound, find_binary + def hide_binary(*binaries: str): def mock_find_binary(name): if name in binaries: raise CommandNotFound(name) return find_binary(name) - return patch('snakeoil.process.find_binary', side_effect=mock_find_binary) + return patch("snakeoil.process.find_binary", side_effect=mock_find_binary) class Base: - module: str = '' - decompressed_test_data: bytes = b'' - compressed_test_data: bytes = b'' + module: str = "" + decompressed_test_data: bytes = b"" + compressed_test_data: bytes = b"" def decompress(self, data: bytes) -> bytes: - raise NotImplementedError(self, 'decompress') + raise NotImplementedError(self, "decompress") - @pytest.mark.parametrize('parallelize', (True, False)) - @pytest.mark.parametrize('level', (1, 9)) + @pytest.mark.parametrize("parallelize", (True, False)) + @pytest.mark.parametrize("level", (1, 9)) def test_compress_data(self, level, parallelize): - compressed = compression.compress_data(self.module, self.decompressed_test_data, level=level, parallelize=parallelize) + compressed = compression.compress_data( + self.module, + self.decompressed_test_data, + level=level, + parallelize=parallelize, + ) assert compressed assert self.decompress(compressed) == self.decompressed_test_data - @pytest.mark.parametrize('parallelize', (True, False)) + @pytest.mark.parametrize("parallelize", (True, False)) def test_decompress_data(self, parallelize): - assert self.decompressed_test_data == compression.decompress_data(self.module, self.compressed_test_data, parallelize=parallelize) + assert self.decompressed_test_data == compression.decompress_data( + self.module, self.compressed_test_data, parallelize=parallelize + ) - @pytest.mark.parametrize('parallelize', (True, False)) - @pytest.mark.parametrize('level', (1, 9)) + @pytest.mark.parametrize("parallelize", (True, False)) + @pytest.mark.parametrize("level", (1, 9)) def test_compress_handle(self, tmp_path, level, parallelize): - path = tmp_path / f'test.{self.module}' + path = tmp_path / f"test.{self.module}" - stream = compression.compress_handle(self.module, str(path), level=level, parallelize=parallelize) + stream = compression.compress_handle( + self.module, str(path), level=level, parallelize=parallelize + ) stream.write(self.decompressed_test_data) stream.close() assert self.decompress(path.read_bytes()) == self.decompressed_test_data with path.open("wb") as file: - stream = compression.compress_handle(self.module, file, level=level, parallelize=parallelize) + stream = compression.compress_handle( + self.module, file, level=level, parallelize=parallelize + ) stream.write(self.decompressed_test_data) stream.close() assert self.decompress(path.read_bytes()) == self.decompressed_test_data with path.open("wb") as file: - stream = compression.compress_handle(self.module, file.fileno(), level=level, parallelize=parallelize) + stream = compression.compress_handle( + self.module, file.fileno(), level=level, parallelize=parallelize + ) stream.write(self.decompressed_test_data) stream.close() assert self.decompress(path.read_bytes()) == self.decompressed_test_data with pytest.raises(TypeError): - compression.compress_handle(self.module, b'', level=level, parallelize=parallelize) + compression.compress_handle( + self.module, b"", level=level, parallelize=parallelize + ) - @pytest.mark.parametrize('parallelize', (True, False)) + @pytest.mark.parametrize("parallelize", (True, False)) def test_decompress_handle(self, tmp_path, parallelize): - path = tmp_path / f'test.{self.module}' + path = tmp_path / f"test.{self.module}" path.write_bytes(self.compressed_test_data) - stream = compression.decompress_handle(self.module, str(path), parallelize=parallelize) + stream = compression.decompress_handle( + self.module, str(path), parallelize=parallelize + ) assert stream.read() == self.decompressed_test_data stream.close() with path.open("rb") as file: - stream = compression.decompress_handle(self.module, file, parallelize=parallelize) + stream = compression.decompress_handle( + self.module, file, parallelize=parallelize + ) assert stream.read() == self.decompressed_test_data stream.close() with path.open("rb") as file: - stream = compression.decompress_handle(self.module, file.fileno(), parallelize=parallelize) + stream = compression.decompress_handle( + self.module, file.fileno(), parallelize=parallelize + ) assert stream.read() == self.decompressed_test_data stream.close() with pytest.raises(TypeError): - compression.decompress_handle(self.module, b'', parallelize=parallelize) + compression.decompress_handle(self.module, b"", parallelize=parallelize) diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py index f3093d09..9fdffd9a 100644 --- a/tests/compression/test_bzip2.py +++ b/tests/compression/test_bzip2.py @@ -10,28 +10,29 @@ from . import Base, hide_binary def test_no_native(): - with hide_imports('bz2'): + with hide_imports("bz2"): importlib.reload(_bzip2) assert not _bzip2.native def test_missing_bzip2_binary(): - with hide_binary('bzip2'): - with pytest.raises(CommandNotFound, match='bzip2'): + with hide_binary("bzip2"): + with pytest.raises(CommandNotFound, match="bzip2"): importlib.reload(_bzip2) def test_missing_lbzip2_binary(): - with hide_binary('lbzip2'): + with hide_binary("lbzip2"): importlib.reload(_bzip2) assert not _bzip2.parallelizable + class Bzip2Base(Base): - module = 'bzip2' - decompressed_test_data = b'Some text here\n' + module = "bzip2" + decompressed_test_data = b"Some text here\n" compressed_test_data = ( - b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02' + b"BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02" b'B\x94@ \x00"\r\x03\xd4\x0c \t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02' ) @@ -40,37 +41,36 @@ class Bzip2Base(Base): class TestStdlib(Bzip2Base): - - @pytest.fixture(autouse=True, scope='class') + @pytest.fixture(autouse=True, scope="class") def _setup(self): try: - find_binary('bzip2') + find_binary("bzip2") except CommandNotFound: - pytest.skip('bzip2 binary not found') - with hide_binary('lbzip2'): + pytest.skip("bzip2 binary not found") + with hide_binary("lbzip2"): importlib.reload(_bzip2) yield class TestBzip2(Bzip2Base): - - @pytest.fixture(autouse=True, scope='class') + @pytest.fixture(autouse=True, scope="class") def _setup(self): - with hide_binary('lbzip2'): + with hide_binary("lbzip2"): importlib.reload(_bzip2) yield class TestLbzip2(Bzip2Base): - - @pytest.fixture(autouse=True, scope='class') + @pytest.fixture(autouse=True, scope="class") def _setup(self): try: - find_binary('lbzip2') + find_binary("lbzip2") except CommandNotFound: - pytest.skip('lbzip2 binary not found') + pytest.skip("lbzip2 binary not found") importlib.reload(_bzip2) def test_bad_level(self): with pytest.raises(ValueError, match='unknown option "-0"'): - _bzip2.compress_data(self.decompressed_test_data, level=90, parallelize=True) + _bzip2.compress_data( + self.decompressed_test_data, level=90, parallelize=True + ) diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py index f3a40270..f1fe5bda 100644 --- a/tests/compression/test_init.py +++ b/tests/compression/test_init.py @@ -11,78 +11,77 @@ from . import hide_binary @pytest.mark.skipif(sys.platform == "darwin", reason="darwin fails with bzip2") class TestArComp: - - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def tar_file(self, tmp_path_factory): data = tmp_path_factory.mktemp("data") - (data / 'file1').write_text('Hello world') - (data / 'file2').write_text('Larry the Cow') - path = data / 'test 1.tar' - subprocess.run(['tar', 'cf', str(path), 'file1', 'file2'], cwd=data, check=True) - (data / 'file1').unlink() - (data / 'file2').unlink() + (data / "file1").write_text("Hello world") + (data / "file2").write_text("Larry the Cow") + path = data / "test 1.tar" + subprocess.run(["tar", "cf", str(path), "file1", "file2"], cwd=data, check=True) + (data / "file1").unlink() + (data / "file2").unlink() return str(path) - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def tar_bz2_file(self, tar_file): - subprocess.run(['bzip2', '-z', '-k', tar_file], check=True) + subprocess.run(["bzip2", "-z", "-k", tar_file], check=True) return tar_file + ".bz2" - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def tbz2_file(self, tar_bz2_file): - new_path = tar_bz2_file.replace('.tar.bz2', '.tbz2') + new_path = tar_bz2_file.replace(".tar.bz2", ".tbz2") shutil.copyfile(tar_bz2_file, new_path) return new_path - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def lzma_file(self, tmp_path_factory): - data = (tmp_path_factory.mktemp("data") / 'test 2.lzma') - with data.open('wb') as f: - subprocess.run(['lzma'], check=True, input=b'Hello world', stdout=f) + data = tmp_path_factory.mktemp("data") / "test 2.lzma" + with data.open("wb") as f: + subprocess.run(["lzma"], check=True, input=b"Hello world", stdout=f) return str(data) def test_unknown_extenstion(self, tmp_path): - file = tmp_path / 'test.file' - with pytest.raises(ArCompError, match='unknown compression file extension'): - ArComp(file, ext='.foo') + file = tmp_path / "test.file" + with pytest.raises(ArCompError, match="unknown compression file extension"): + ArComp(file, ext=".foo") def test_missing_tar(self, tmp_path, tar_file): - with hide_binary('tar'), chdir(tmp_path): - with pytest.raises(ArCompError, match='required binary not found'): - ArComp(tar_file, ext='.tar').unpack(dest=tmp_path) + with hide_binary("tar"), chdir(tmp_path): + with pytest.raises(ArCompError, match="required binary not found"): + ArComp(tar_file, ext=".tar").unpack(dest=tmp_path) def test_tar(self, tmp_path, tar_file): with chdir(tmp_path): - ArComp(tar_file, ext='.tar').unpack(dest=tmp_path) - assert (tmp_path / 'file1').read_text() == 'Hello world' - assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + ArComp(tar_file, ext=".tar").unpack(dest=tmp_path) + assert (tmp_path / "file1").read_text() == "Hello world" + assert (tmp_path / "file2").read_text() == "Larry the Cow" def test_tar_bz2(self, tmp_path, tar_bz2_file): with chdir(tmp_path): - ArComp(tar_bz2_file, ext='.tar.bz2').unpack(dest=tmp_path) - assert (tmp_path / 'file1').read_text() == 'Hello world' - assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + ArComp(tar_bz2_file, ext=".tar.bz2").unpack(dest=tmp_path) + assert (tmp_path / "file1").read_text() == "Hello world" + assert (tmp_path / "file2").read_text() == "Larry the Cow" def test_tbz2(self, tmp_path, tbz2_file): with chdir(tmp_path): - ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) - assert (tmp_path / 'file1').read_text() == 'Hello world' - assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path) + assert (tmp_path / "file1").read_text() == "Hello world" + assert (tmp_path / "file2").read_text() == "Larry the Cow" def test_fallback_tbz2(self, tmp_path, tbz2_file): with hide_binary(*next(zip(*_TarBZ2.compress_binary[:-1]))): with chdir(tmp_path): - ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) - assert (tmp_path / 'file1').read_text() == 'Hello world' - assert (tmp_path / 'file2').read_text() == 'Larry the Cow' + ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path) + assert (tmp_path / "file1").read_text() == "Hello world" + assert (tmp_path / "file2").read_text() == "Larry the Cow" def test_no_fallback_tbz2(self, tmp_path, tbz2_file): with hide_binary(*next(zip(*_TarBZ2.compress_binary))), chdir(tmp_path): - with pytest.raises(ArCompError, match='no compression binary'): - ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path) + with pytest.raises(ArCompError, match="no compression binary"): + ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path) def test_lzma(self, tmp_path, lzma_file): - dest = tmp_path / 'file' + dest = tmp_path / "file" with chdir(tmp_path): - ArComp(lzma_file, ext='.lzma').unpack(dest=dest) - assert (dest).read_bytes() == b'Hello world' + ArComp(lzma_file, ext=".lzma").unpack(dest=dest) + assert (dest).read_bytes() == b"Hello world" diff --git a/tests/compression/test_xz.py b/tests/compression/test_xz.py index f8417b30..0af7c645 100644 --- a/tests/compression/test_xz.py +++ b/tests/compression/test_xz.py @@ -10,26 +10,26 @@ from . import Base, hide_binary def test_no_native(): - with hide_imports('lzma'): + with hide_imports("lzma"): importlib.reload(_xz) assert not _xz.native def test_missing_xz_binary(): - with hide_binary('xz'): - with pytest.raises(CommandNotFound, match='xz'): + with hide_binary("xz"): + with pytest.raises(CommandNotFound, match="xz"): importlib.reload(_xz) class XzBase(Base): - module = 'xz' - decompressed_test_data = b'Some text here\n' * 2 + module = "xz" + decompressed_test_data = b"Some text here\n" * 2 compressed_test_data = ( - b'\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x04\xc0\x1e\x1e!\x01\x16\x00\x00\x00' - b'\x00\x00\x00\x00\x00\x00j\xf6\x947\xe0\x00\x1d\x00\x16]\x00)\x9b\xc9\xa6g' - b'Bw\x8c\xb3\x9eA\x9a\xbeT\xc9\xfa\xe3\x19\x8f(\x00\x00\x00\x00\x00\x96N' - b'\xa8\x8ed\xa2WH\x00\x01:\x1e1V \xff\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ' + b"\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x04\xc0\x1e\x1e!\x01\x16\x00\x00\x00" + b"\x00\x00\x00\x00\x00\x00j\xf6\x947\xe0\x00\x1d\x00\x16]\x00)\x9b\xc9\xa6g" + b"Bw\x8c\xb3\x9eA\x9a\xbeT\xc9\xfa\xe3\x19\x8f(\x00\x00\x00\x00\x00\x96N" + b"\xa8\x8ed\xa2WH\x00\x01:\x1e1V \xff\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ" ) def decompress(self, data: bytes) -> bytes: @@ -37,20 +37,18 @@ class XzBase(Base): class TestStdlib(XzBase): - - @pytest.fixture(autouse=True, scope='class') + @pytest.fixture(autouse=True, scope="class") def _setup(self): try: - find_binary('xz') + find_binary("xz") except CommandNotFound: - pytest.skip('xz binary not found') + pytest.skip("xz binary not found") importlib.reload(_xz) class TestXz(XzBase): - - @pytest.fixture(autouse=True, scope='class') + @pytest.fixture(autouse=True, scope="class") def _setup(self): - with hide_imports('lzma'): + with hide_imports("lzma"): importlib.reload(_xz) yield diff --git a/tests/test_bash.py b/tests/test_bash.py index ec9df537..3d2157b8 100644 --- a/tests/test_bash.py +++ b/tests/test_bash.py @@ -1,128 +1,138 @@ from io import StringIO import pytest -from snakeoil.bash import (BashParseError, iter_read_bash, read_bash, - read_bash_dict, read_dict) +from snakeoil.bash import ( + BashParseError, + iter_read_bash, + read_bash, + read_bash_dict, + read_dict, +) class TestBashCommentStripping: - def test_iter_read_bash(self): - output = iter_read_bash(StringIO( - '\n' - '# hi I am a comment\n' - 'I am not \n' - ' asdf # inline comment\n')) - assert list(output) == ['I am not', 'asdf'] + output = iter_read_bash( + StringIO( + "\n" "# hi I am a comment\n" "I am not \n" " asdf # inline comment\n" + ) + ) + assert list(output) == ["I am not", "asdf"] - output = iter_read_bash(StringIO( - 'inline # comment '), allow_inline_comments=False) - assert list(output) == ['inline # comment'] + output = iter_read_bash( + StringIO("inline # comment "), allow_inline_comments=False + ) + assert list(output) == ["inline # comment"] def test_iter_read_bash_line_cont(self): - output = iter_read_bash(StringIO( - '\n' - '# hi I am a comment\\\n' - 'I am not \\\n' - 'a comment \n' - ' asdf # inline comment\\\n'), - allow_line_cont=True) - assert list(output) == ['I am not a comment', 'asdf'] + output = iter_read_bash( + StringIO( + "\n" + "# hi I am a comment\\\n" + "I am not \\\n" + "a comment \n" + " asdf # inline comment\\\n" + ), + allow_line_cont=True, + ) + assert list(output) == ["I am not a comment", "asdf"] # continuation into inline comment - output = iter_read_bash(StringIO( - '\n' - '# hi I am a comment\n' - 'I am \\\n' - 'not a \\\n' - 'comment # inline comment\n'), - allow_line_cont=True) - assert list(output) == ['I am not a comment'] + output = iter_read_bash( + StringIO( + "\n" + "# hi I am a comment\n" + "I am \\\n" + "not a \\\n" + "comment # inline comment\n" + ), + allow_line_cont=True, + ) + assert list(output) == ["I am not a comment"] # ends with continuation - output = iter_read_bash(StringIO( - '\n' - '# hi I am a comment\n' - 'I am \\\n' - '\\\n' - 'not a \\\n' - 'comment\\\n' - '\\\n'), - allow_line_cont=True) - assert list(output) == ['I am not a comment'] + output = iter_read_bash( + StringIO( + "\n" + "# hi I am a comment\n" + "I am \\\n" + "\\\n" + "not a \\\n" + "comment\\\n" + "\\\n" + ), + allow_line_cont=True, + ) + assert list(output) == ["I am not a comment"] # embedded comment prefix via continued lines - output = iter_read_bash(StringIO( - '\\\n' - '# comment\\\n' - ' not a comment\n' - '\\\n' - ' # inner comment\n' - 'also not\\\n' - '#\\\n' - 'a comment\n'), - allow_line_cont=True) - assert list(output) == ['not a comment', 'also not#a comment'] + output = iter_read_bash( + StringIO( + "\\\n" + "# comment\\\n" + " not a comment\n" + "\\\n" + " # inner comment\n" + "also not\\\n" + "#\\\n" + "a comment\n" + ), + allow_line_cont=True, + ) + assert list(output) == ["not a comment", "also not#a comment"] # Line continuations have to end with \<newline> without any backslash # before the pattern. - output = iter_read_bash(StringIO( - 'I am \\ \n' - 'not a comment'), - allow_line_cont=True) - assert list(output) == ['I am \\', 'not a comment'] - output = iter_read_bash(StringIO( - '\\\n' - 'I am \\\\\n' - 'not a comment'), - allow_line_cont=True) - assert list(output) == ['I am \\\\', 'not a comment'] + output = iter_read_bash( + StringIO("I am \\ \n" "not a comment"), allow_line_cont=True + ) + assert list(output) == ["I am \\", "not a comment"] + output = iter_read_bash( + StringIO("\\\n" "I am \\\\\n" "not a comment"), allow_line_cont=True + ) + assert list(output) == ["I am \\\\", "not a comment"] def test_read_bash(self): - output = read_bash(StringIO( - '\n' - '# hi I am a comment\n' - 'I am not\n')) - assert output == ['I am not'] + output = read_bash(StringIO("\n" "# hi I am a comment\n" "I am not\n")) + assert output == ["I am not"] class TestReadDictConfig: - def test_read_dict(self): - bash_dict = read_dict(StringIO( - '\n' - '# hi I am a comment\n' - 'foo1=bar\n' - 'foo2="bar"\n' - 'foo3=\'bar"\n')) + bash_dict = read_dict( + StringIO( + "\n" "# hi I am a comment\n" "foo1=bar\n" 'foo2="bar"\n' "foo3='bar\"\n" + ) + ) assert bash_dict == { - 'foo1': 'bar', - 'foo2': 'bar', - 'foo3': '\'bar"', - } - assert read_dict(['foo=bar'], source_isiter=True) == {'foo': 'bar'} + "foo1": "bar", + "foo2": "bar", + "foo3": "'bar\"", + } + assert read_dict(["foo=bar"], source_isiter=True) == {"foo": "bar"} with pytest.raises(BashParseError): - read_dict(['invalid'], source_isiter=True) + read_dict(["invalid"], source_isiter=True) - bash_dict = read_dict(StringIO("foo bar\nfoo2 bar\nfoo3\tbar\n"), splitter=None) - assert bash_dict == dict.fromkeys(('foo', 'foo2', 'foo3'), 'bar') - bash_dict = read_dict(['foo = blah', 'foo2= blah ', 'foo3=blah'], strip=True) - assert bash_dict == dict.fromkeys(('foo', 'foo2', 'foo3'), 'blah') + bash_dict = read_dict( + StringIO("foo bar\nfoo2 bar\nfoo3\tbar\n"), splitter=None + ) + assert bash_dict == dict.fromkeys(("foo", "foo2", "foo3"), "bar") + bash_dict = read_dict(["foo = blah", "foo2= blah ", "foo3=blah"], strip=True) + assert bash_dict == dict.fromkeys(("foo", "foo2", "foo3"), "blah") class TestReadBashDict: - @pytest.fixture(autouse=True) def _setup(self, tmp_path): self.valid_file = tmp_path / "valid" self.valid_file.write_text( - '# hi I am a comment\n' - 'foo1=bar\n' + "# hi I am a comment\n" + "foo1=bar\n" "foo2='bar'\n" 'foo3="bar"\n' - 'foo4=-/:j4\n' - 'foo5=\n' + "foo4=-/:j4\n" + "foo5=\n" 'export foo6="bar"\n' ) self.sourcing_file = tmp_path / "sourcing" @@ -131,18 +141,13 @@ class TestReadBashDict: self.sourcing_file2.write_text(f'source "{self.valid_file}"\n') self.advanced_file = tmp_path / "advanced" self.advanced_file.write_text( - 'one1=1\n' - 'one_=$one1\n' - 'two1=2\n' - 'two_=${two1}\n' + "one1=1\n" "one_=$one1\n" "two1=2\n" "two_=${two1}\n" ) self.env_file = tmp_path / "env" - self.env_file.write_text('imported=${external}\n') + self.env_file.write_text("imported=${external}\n") self.escaped_file = tmp_path / "escaped" self.escaped_file.write_text( - 'end=bye\n' - 'quoteddollar="\\${dollar}"\n' - 'quotedexpansion="\\${${end}}"\n' + "end=bye\n" 'quoteddollar="\\${dollar}"\n' 'quotedexpansion="\\${${end}}"\n' ) self.unclosed_file = tmp_path / "unclosed" self.unclosed_file.write_text('foo="bar') @@ -151,19 +156,19 @@ class TestReadBashDict: try: return read_bash_dict(handle, *args, **kwds) finally: - if hasattr(handle, 'close'): + if hasattr(handle, "close"): handle.close() def test_read_bash_dict(self): # TODO this is not even close to complete bash_dict = self.invoke_and_close(str(self.valid_file)) d = { - 'foo1': 'bar', - 'foo2': 'bar', - 'foo3': 'bar', - 'foo4': '-/:j4', - 'foo5': '', - 'foo6': 'bar', + "foo1": "bar", + "foo2": "bar", + "foo3": "bar", + "foo4": "-/:j4", + "foo5": "", + "foo6": "bar", } assert bash_dict == d @@ -171,59 +176,81 @@ class TestReadBashDict: self.invoke_and_close(StringIO("a=b\ny='")) def test_var_read(self): - assert self.invoke_and_close(StringIO("x=y@a\n")) == {'x': 'y@a'} - assert self.invoke_and_close(StringIO("x=y~a\n")) == {'x': 'y~a'} - assert self.invoke_and_close(StringIO("x=y^a\n")) == {'x': 'y^a'} - assert self.invoke_and_close(StringIO('x="\nasdf\nfdsa"')) == {'x': '\nasdf\nfdsa'} + assert self.invoke_and_close(StringIO("x=y@a\n")) == {"x": "y@a"} + assert self.invoke_and_close(StringIO("x=y~a\n")) == {"x": "y~a"} + assert self.invoke_and_close(StringIO("x=y^a\n")) == {"x": "y^a"} + assert self.invoke_and_close(StringIO('x="\nasdf\nfdsa"')) == { + "x": "\nasdf\nfdsa" + } def test_empty_assign(self): self.valid_file.write_text("foo=\ndar=blah\n") - assert self.invoke_and_close(str(self.valid_file)) == {'foo': '', 'dar': 'blah'} + assert self.invoke_and_close(str(self.valid_file)) == {"foo": "", "dar": "blah"} self.valid_file.write_text("foo=\ndar=\n") - assert self.invoke_and_close(str(self.valid_file)) == {'foo': '', 'dar': ''} + assert self.invoke_and_close(str(self.valid_file)) == {"foo": "", "dar": ""} self.valid_file.write_text("foo=blah\ndar=\n") - assert self.invoke_and_close(str(self.valid_file)) == {'foo': 'blah', 'dar': ''} + assert self.invoke_and_close(str(self.valid_file)) == {"foo": "blah", "dar": ""} def test_quoting(self): - assert self.invoke_and_close(StringIO("x='y \\\na'")) == {'x': 'y \\\na'} - assert self.invoke_and_close(StringIO("x='y'a\n")) == {'x': "ya"} - assert self.invoke_and_close(StringIO('x="y \\\nasdf"')) == {'x': 'y asdf'} + assert self.invoke_and_close(StringIO("x='y \\\na'")) == {"x": "y \\\na"} + assert self.invoke_and_close(StringIO("x='y'a\n")) == {"x": "ya"} + assert self.invoke_and_close(StringIO('x="y \\\nasdf"')) == {"x": "y asdf"} def test_eof_without_newline(self): - assert self.invoke_and_close(StringIO("x=y")) == {'x': 'y'} - assert self.invoke_and_close(StringIO("x='y'a")) == {'x': 'ya'} + assert self.invoke_and_close(StringIO("x=y")) == {"x": "y"} + assert self.invoke_and_close(StringIO("x='y'a")) == {"x": "ya"} def test_sourcing(self): - output = self.invoke_and_close(str(self.sourcing_file), sourcing_command='source') - expected = {'foo1': 'bar', 'foo2': 'bar', 'foo3': 'bar', 'foo4': '-/:j4', 'foo5': '', 'foo6': 'bar'} + output = self.invoke_and_close( + str(self.sourcing_file), sourcing_command="source" + ) + expected = { + "foo1": "bar", + "foo2": "bar", + "foo3": "bar", + "foo4": "-/:j4", + "foo5": "", + "foo6": "bar", + } assert output == expected - output = self.invoke_and_close(str(self.sourcing_file2), sourcing_command='source') - expected = {'foo1': 'bar', 'foo2': 'bar', 'foo3': 'bar', 'foo4': '-/:j4', 'foo5': '', 'foo6': 'bar'} + output = self.invoke_and_close( + str(self.sourcing_file2), sourcing_command="source" + ) + expected = { + "foo1": "bar", + "foo2": "bar", + "foo3": "bar", + "foo4": "-/:j4", + "foo5": "", + "foo6": "bar", + } assert output == expected def test_read_advanced(self): output = self.invoke_and_close(str(self.advanced_file)) expected = { - 'one1': '1', - 'one_': '1', - 'two1': '2', - 'two_': '2', + "one1": "1", + "one_": "1", + "two1": "2", + "two_": "2", } assert output == expected def test_env(self): - assert self.invoke_and_close(str(self.env_file)) == {'imported': ''} - env = {'external': 'imported foo'} + assert self.invoke_and_close(str(self.env_file)) == {"imported": ""} + env = {"external": "imported foo"} env_backup = env.copy() - assert self.invoke_and_close(str(self.env_file), env) == {'imported': 'imported foo'} + assert self.invoke_and_close(str(self.env_file), env) == { + "imported": "imported foo" + } assert env_backup == env def test_escaping(self): output = self.invoke_and_close(str(self.escaped_file)) expected = { - 'end': 'bye', - 'quoteddollar': '${dollar}', - 'quotedexpansion': '${bye}', + "end": "bye", + "quoteddollar": "${dollar}", + "quotedexpansion": "${bye}", } assert output == expected diff --git a/tests/test_caching.py b/tests/test_caching.py index eaa5014c..06615d38 100644 --- a/tests/test_caching.py +++ b/tests/test_caching.py @@ -5,17 +5,20 @@ from snakeoil.caching import WeakInstMeta class weak_slotted(metaclass=WeakInstMeta): __inst_caching__ = True - __slots__ = ('one',) + __slots__ = ("one",) class weak_inst(metaclass=WeakInstMeta): __inst_caching__ = True counter = 0 + def __new__(cls, *args, **kwargs): cls.counter += 1 return object.__new__(cls) + def __init__(self, *args, **kwargs): pass + @classmethod def reset(cls): cls.counter = 0 @@ -34,7 +37,6 @@ class reenabled_weak_inst(automatic_disabled_weak_inst): class TestWeakInstMeta: - def test_reuse(self, kls=weak_inst): kls.reset() o = kls() @@ -99,8 +101,8 @@ class TestWeakInstMeta: # (RaisingHashFor...). # UserWarning is ignored and everything other warning is an error. - @pytest.mark.filterwarnings('ignore::UserWarning') - @pytest.mark.filterwarnings('error') + @pytest.mark.filterwarnings("ignore::UserWarning") + @pytest.mark.filterwarnings("error") def test_uncachable(self): weak_inst.reset() @@ -108,21 +110,24 @@ class TestWeakInstMeta: class RaisingHashForTestUncachable: def __init__(self, error): self.error = error + def __hash__(self): raise self.error assert weak_inst([]) is not weak_inst([]) assert weak_inst.counter == 2 for x in (TypeError, NotImplementedError): - assert weak_inst(RaisingHashForTestUncachable(x)) is not \ - weak_inst(RaisingHashForTestUncachable(x)) + assert weak_inst(RaisingHashForTestUncachable(x)) is not weak_inst( + RaisingHashForTestUncachable(x) + ) - @pytest.mark.filterwarnings('error::UserWarning') + @pytest.mark.filterwarnings("error::UserWarning") def test_uncachable_warning_msg(self): # This name is *important*, see above. class RaisingHashForTestUncachableWarnings: def __init__(self, error): self.error = error + def __hash__(self): raise self.error @@ -134,6 +139,7 @@ class TestWeakInstMeta: class BrokenHash: def __hash__(self): return 1 + assert weak_inst(BrokenHash()) is not weak_inst(BrokenHash()) def test_weak_slot(self): @@ -148,7 +154,7 @@ class TestWeakInstMeta: # The actual test is that the class definition works. class ExistingWeakrefSlot: __inst_caching__ = True - __slots__ = ('one', '__weakref__') + __slots__ = ("one", "__weakref__") assert ExistingWeakrefSlot() diff --git a/tests/test_chksum.py b/tests/test_chksum.py index 016e3f73..b4c1ab24 100644 --- a/tests/test_chksum.py +++ b/tests/test_chksum.py @@ -3,15 +3,16 @@ from snakeoil import chksum class Test_funcs: - def setup_method(self, method): chksum.__inited__ = False chksum.chksum_types.clear() self._saved_init = chksum.init self._inited_count = 0 + def f(): self._inited_count += 1 chksum.__inited__ = True + chksum.init = f # ensure we aren't mangling chksum state for other tests. @@ -41,4 +42,3 @@ class Test_funcs: assert chksum.get_handler("x") == 1 assert chksum.get_handler("y") == 2 assert self._inited_count == 1 - diff --git a/tests/test_chksum_defaults.py b/tests/test_chksum_defaults.py index 7f867d8a..a22d339c 100644 --- a/tests/test_chksum_defaults.py +++ b/tests/test_chksum_defaults.py @@ -14,14 +14,15 @@ def require_chf(func): def subfunc(self): if self.chf is None: pytest.skip( - 'no handler for %s, do you need to install PyCrypto or mhash?' - % (self.chf_type,)) + "no handler for %s, do you need to install PyCrypto or mhash?" + % (self.chf_type,) + ) func(self) + return subfunc class base: - def get_chf(self): try: self.chf = chksum.get_handler(self.chf_type) @@ -53,14 +54,17 @@ class base: @require_chf def test_data_source_check(self): assert self.chf(local_source(self.fn)) == self.expected_long - assert self.chf(data_source(fileutils.readfile_ascii(self.fn))) == self.expected_long + assert ( + self.chf(data_source(fileutils.readfile_ascii(self.fn))) + == self.expected_long + ) -class ChksumTest(base): +class ChksumTest(base): @require_chf def test_str2long(self): assert self.chf.str2long(self.expected_str) == self.expected_long - if self.chf_type == 'size': + if self.chf_type == "size": return for x in extra_chksums.get(self.chf_type, ()): assert self.chf.str2long(x) == int(x, 16) @@ -68,11 +72,12 @@ class ChksumTest(base): @require_chf def test_long2str(self): assert self.chf.long2str(self.expected_long) == self.expected_str - if self.chf_type == 'size': + if self.chf_type == "size": return for x in extra_chksums.get(self.chf_type, ()): assert self.chf.long2str(int(x == 16)), x + checksums = { "rmd160": "b83ad488d624e7911f886420ab230f78f6368b9f", "sha1": "63cd8cce8a1773dffb400ee184be3ec7d89791f5", @@ -87,22 +92,22 @@ checksums = { checksums.update((k, (int(v, 16), v)) for k, v in checksums.items()) checksums["size"] = (int(len(data) * multi), str(int(len(data) * multi))) -extra_chksums = { - "md5": - ["2dfd84279314a178d0fa842af3a40e25577e1bc"] -} +extra_chksums = {"md5": ["2dfd84279314a178d0fa842af3a40e25577e1bc"]} for k, v in checksums.items(): - extra_chksums.setdefault(k, []).extend((''.rjust(len(v[1]), '0'), '01'.rjust(len(v[1]), '0'))) + extra_chksums.setdefault(k, []).extend( + ("".rjust(len(v[1]), "0"), "01".rjust(len(v[1]), "0")) + ) # trick: create subclasses for each checksum with a useful class name. for chf_type, expected in checksums.items(): expectedsum = expected[0] expectedstr = expected[1] - globals()['TestChksum' + chf_type.capitalize()] = type( - 'TestChksum' + chf_type.capitalize(), + globals()["TestChksum" + chf_type.capitalize()] = type( + "TestChksum" + chf_type.capitalize(), (ChksumTest,), - dict(chf_type=chf_type, expected_long=expectedsum, expected_str=expectedstr)) + dict(chf_type=chf_type, expected_long=expectedsum, expected_str=expectedstr), + ) # pylint: disable=undefined-loop-variable del chf_type, expected @@ -110,7 +115,7 @@ del chf_type, expected class TestGetChksums(base): - chfs = [k for k in sorted(checksums) if k in ('md5', 'sha1')] + chfs = [k for k in sorted(checksums) if k in ("md5", "sha1")] expected_long = [checksums[k][0] for k in chfs] def get_chf(self): diff --git a/tests/test_constraints.py b/tests/test_constraints.py index 5e938a9e..15d360c2 100644 --- a/tests/test_constraints.py +++ b/tests/test_constraints.py @@ -2,61 +2,75 @@ import pytest from snakeoil.constraints import Problem + def any_of(**kwargs): return any(kwargs.values()) + def all_of(**kwargs): return all(kwargs.values()) + def test_readd_variables(): p = Problem() - p.add_variable((True, False), 'x', 'y') + p.add_variable((True, False), "x", "y") with pytest.raises(AssertionError, match="variable 'y' was already added"): - p.add_variable((True, False), 'y', 'z') + p.add_variable((True, False), "y", "z") + def test_constraint_unknown_variable(): p = Problem() - p.add_variable((True, False), 'x', 'y') + p.add_variable((True, False), "x", "y") with pytest.raises(AssertionError, match="unknown variable 'z'"): - p.add_constraint(any_of, ('y', 'z')) + p.add_constraint(any_of, ("y", "z")) + def test_empty_problem(): p = Problem() - assert tuple(p) == ({}, ) + assert tuple(p) == ({},) + def test_empty_constraints(): p = Problem() - p.add_variable((True, False), 'x', 'y') - p.add_variable((True, ), 'z') + p.add_variable((True, False), "x", "y") + p.add_variable((True,), "z") assert len(tuple(p)) == 4 + def test_domain_prefer_later(): p = Problem() - p.add_variable((False, True), 'x', 'y') - p.add_constraint(any_of, ('x', 'y')) - assert next(iter(p)) == {'x': True, 'y': True} + p.add_variable((False, True), "x", "y") + p.add_constraint(any_of, ("x", "y")) + assert next(iter(p)) == {"x": True, "y": True} + def test_constraint_single_variable(): p = Problem() - p.add_variable((True, False), 'x', 'y') - p.add_constraint(lambda x: x, ('x', )) - p.add_constraint(lambda y: not y, ('y', )) - assert tuple(p) == ({'x': True, 'y': False}, ) + p.add_variable((True, False), "x", "y") + p.add_constraint(lambda x: x, ("x",)) + p.add_constraint(lambda y: not y, ("y",)) + assert tuple(p) == ({"x": True, "y": False},) + def test_no_solution(): p = Problem() - p.add_variable((True, ), 'x') - p.add_variable((True, False), 'y', 'z') - p.add_constraint(lambda x, y: not x or y, ('x', 'y')) - p.add_constraint(lambda y, z: not y or not z, ('y', 'z')) - p.add_constraint(lambda x, z: not x or z, ('x', 'z')) + p.add_variable((True,), "x") + p.add_variable((True, False), "y", "z") + p.add_constraint(lambda x, y: not x or y, ("x", "y")) + p.add_constraint(lambda y, z: not y or not z, ("y", "z")) + p.add_constraint(lambda x, z: not x or z, ("x", "z")) assert not tuple(p) + def test_forward_check(): p = Problem() - p.add_variable(range(2, 10), 'x', 'y', 'z') - p.add_constraint(lambda x, y: (x + y) % 2 == 0, ('x', 'y')) - p.add_constraint(lambda x, y, z: (x * y * z) % 2 != 0, ('x', 'y', 'z')) - p.add_constraint(lambda y, z: y < z, ('y', 'z')) - p.add_constraint(lambda z, x: x ** 2 <= z, ('x', 'z')) - assert tuple(p) == ({'x': 3, 'y': 7, 'z': 9}, {'x': 3, 'y': 5, 'z': 9}, {'x': 3, 'y': 3, 'z': 9}) + p.add_variable(range(2, 10), "x", "y", "z") + p.add_constraint(lambda x, y: (x + y) % 2 == 0, ("x", "y")) + p.add_constraint(lambda x, y, z: (x * y * z) % 2 != 0, ("x", "y", "z")) + p.add_constraint(lambda y, z: y < z, ("y", "z")) + p.add_constraint(lambda z, x: x**2 <= z, ("x", "z")) + assert tuple(p) == ( + {"x": 3, "y": 7, "z": 9}, + {"x": 3, "y": 5, "z": 9}, + {"x": 3, "y": 3, "z": 9}, + ) diff --git a/tests/test_containers.py b/tests/test_containers.py index f6940c90..9df32588 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -5,7 +5,6 @@ from snakeoil import containers class TestInvertedContains: - def setup_method(self, method): self.set = containers.InvertedContains(range(12)) @@ -17,7 +16,7 @@ class TestInvertedContains: class BasicSet(containers.SetMixin): - __slots__ = ('_data',) + __slots__ = ("_data",) def __init__(self, data): self._data = set(data) @@ -28,7 +27,7 @@ class BasicSet(containers.SetMixin): def __contains__(self, other): return other in self._data - #def __str__(self): + # def __str__(self): # return 'BasicSet([%s])' % ', '.join((str(x) for x in self._data)) def __eq__(self, other): @@ -43,7 +42,6 @@ class BasicSet(containers.SetMixin): class TestSetMethods: - def test_and(self): c = BasicSet(range(100)) s = set(range(25, 75)) @@ -80,8 +78,8 @@ class TestSetMethods: assert c - s == r1 assert s - c == r2 -class TestLimitedChangeSet: +class TestLimitedChangeSet: def setup_method(self, method): self.set = containers.LimitedChangeSet(range(12)) @@ -89,17 +87,18 @@ class TestLimitedChangeSet: def f(val): assert isinstance(val, int) return val + self.set = containers.LimitedChangeSet(range(12), key_validator=f) self.set.add(13) self.set.add(14) self.set.remove(11) assert 5 in self.set with pytest.raises(AssertionError): - self.set.add('2') + self.set.add("2") with pytest.raises(AssertionError): - self.set.remove('2') + self.set.remove("2") with pytest.raises(AssertionError): - self.set.__contains__('2') + self.set.__contains__("2") def test_basic(self, changes=0): # this should be a no-op @@ -188,7 +187,7 @@ class TestLimitedChangeSet: assert sorted(list(self.set)) == list(range(-1, 13)) def test_str(self): - assert str(containers.LimitedChangeSet([7])) == 'LimitedChangeSet([7])' + assert str(containers.LimitedChangeSet([7])) == "LimitedChangeSet([7])" def test__eq__(self): c = containers.LimitedChangeSet(range(99)) @@ -199,7 +198,6 @@ class TestLimitedChangeSet: class TestLimitedChangeSetWithBlacklist: - def setup_method(self, method): self.set = containers.LimitedChangeSet(range(12), [3, 13]) @@ -222,7 +220,6 @@ class TestLimitedChangeSetWithBlacklist: class TestProtectedSet: - def setup_method(self, method): self.set = containers.ProtectedSet(set(range(12))) diff --git a/tests/test_contexts.py b/tests/test_contexts.py index 219d5ee8..be212a23 100644 --- a/tests/test_contexts.py +++ b/tests/test_contexts.py @@ -44,9 +44,10 @@ def test_syspath(tmpdir): assert mangled_syspath == tuple(sys.path) -@pytest.mark.skip(reason='this currently breaks on github ci, https://github.com/pkgcore/snakeoil/issues/68') +@pytest.mark.skip( + reason="this currently breaks on github ci, https://github.com/pkgcore/snakeoil/issues/68" +) class TestSplitExec: - def test_context_process(self): # code inside the with statement is run in a separate process pid = os.getpid() @@ -77,9 +78,9 @@ class TestSplitExec: b = 3 # changes to locals aren't propagated back assert a == 1 - assert 'b' not in locals() + assert "b" not in locals() # but they're accessible via the 'locals' attr - expected = {'a': 2, 'b': 3} + expected = {"a": 2, "b": 3} for k, v in expected.items(): assert c.locals[k] == v @@ -87,20 +88,21 @@ class TestSplitExec: with SplitExec() as c: func = lambda x: x from sys import implementation + a = 4 - assert c.locals == {'a': 4} + assert c.locals == {"a": 4} def test_context_exceptions(self): # exceptions in the child process are sent back to the parent and re-raised with pytest.raises(IOError) as e: with SplitExec() as c: - raise IOError(errno.EBUSY, 'random error') + raise IOError(errno.EBUSY, "random error") assert e.value.errno == errno.EBUSY def test_child_setup_raises_exception(self): class ChildSetupException(SplitExec): def _child_setup(self): - raise IOError(errno.EBUSY, 'random error') + raise IOError(errno.EBUSY, "random error") with pytest.raises(IOError) as e: with ChildSetupException() as c: @@ -108,26 +110,33 @@ class TestSplitExec: assert e.value.errno == errno.EBUSY -@pytest.mark.skipif(not sys.platform.startswith('linux'), reason='supported on Linux only') -@pytest.mark.xfail(platform.python_implementation() == "PyPy", reason='Fails on PyPy') +@pytest.mark.skipif( + not sys.platform.startswith("linux"), reason="supported on Linux only" +) +@pytest.mark.xfail(platform.python_implementation() == "PyPy", reason="Fails on PyPy") class TestNamespace: - - @pytest.mark.skipif(not os.path.exists('/proc/self/ns/user'), - reason='user namespace support required') + @pytest.mark.skipif( + not os.path.exists("/proc/self/ns/user"), + reason="user namespace support required", + ) def test_user_namespace(self): try: with Namespace(user=True) as ns: assert os.getuid() == 0 except PermissionError: - pytest.skip('No permission to use user namespace') - - @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/user') and os.path.exists('/proc/self/ns/uts')), - reason='user and uts namespace support required') + pytest.skip("No permission to use user namespace") + + @pytest.mark.skipif( + not ( + os.path.exists("/proc/self/ns/user") and os.path.exists("/proc/self/ns/uts") + ), + reason="user and uts namespace support required", + ) def test_uts_namespace(self): try: - with Namespace(user=True, uts=True, hostname='host') as ns: - ns_hostname, _, ns_domainname = socket.getfqdn().partition('.') - assert ns_hostname == 'host' - assert ns_domainname == '' + with Namespace(user=True, uts=True, hostname="host") as ns: + ns_hostname, _, ns_domainname = socket.getfqdn().partition(".") + assert ns_hostname == "host" + assert ns_domainname == "" except PermissionError: - pytest.skip('No permission to use user and uts namespace') + pytest.skip("No permission to use user and uts namespace") diff --git a/tests/test_currying.py b/tests/test_currying.py index d14f0fcc..7f8618fc 100644 --- a/tests/test_currying.py +++ b/tests/test_currying.py @@ -5,8 +5,10 @@ from snakeoil import currying def passthrough(*args, **kwargs): return args, kwargs + # docstring is part of the test + def documented(): """original docstring""" @@ -18,36 +20,37 @@ class TestPreCurry: def test_pre_curry(self): noop = self.pre_curry(passthrough) assert noop() == ((), {}) - assert noop('foo', 'bar') == (('foo', 'bar'), {}) - assert noop(foo='bar') == ((), {'foo': 'bar'}) - assert noop('foo', bar='baz') == (('foo',), {'bar': 'baz'}) + assert noop("foo", "bar") == (("foo", "bar"), {}) + assert noop(foo="bar") == ((), {"foo": "bar"}) + assert noop("foo", bar="baz") == (("foo",), {"bar": "baz"}) one_arg = self.pre_curry(passthrough, 42) assert one_arg() == ((42,), {}) - assert one_arg('foo', 'bar') == ((42, 'foo', 'bar'), {}) - assert one_arg(foo='bar') == ((42,), {'foo': 'bar'}) - assert one_arg('foo', bar='baz') == ((42, 'foo'), {'bar': 'baz'}) + assert one_arg("foo", "bar") == ((42, "foo", "bar"), {}) + assert one_arg(foo="bar") == ((42,), {"foo": "bar"}) + assert one_arg("foo", bar="baz") == ((42, "foo"), {"bar": "baz"}) keyword_arg = self.pre_curry(passthrough, foo=42) - assert keyword_arg() == ((), {'foo': 42}) - assert keyword_arg('foo', 'bar') == (('foo', 'bar'), {'foo': 42}) - assert keyword_arg(foo='bar') == ((), {'foo': 'bar'}) - assert keyword_arg('foo', bar='baz') == (('foo',), {'bar': 'baz', 'foo': 42}) + assert keyword_arg() == ((), {"foo": 42}) + assert keyword_arg("foo", "bar") == (("foo", "bar"), {"foo": 42}) + assert keyword_arg(foo="bar") == ((), {"foo": "bar"}) + assert keyword_arg("foo", bar="baz") == (("foo",), {"bar": "baz", "foo": 42}) both = self.pre_curry(passthrough, 42, foo=42) - assert both() == ((42,), {'foo': 42}) - assert both('foo', 'bar') == ((42, 'foo', 'bar'), {'foo': 42}) - assert both(foo='bar') == ((42,), {'foo': 'bar'}) - assert both('foo', bar='baz') == ((42, 'foo'), {'bar': 'baz', 'foo': 42}) + assert both() == ((42,), {"foo": 42}) + assert both("foo", "bar") == ((42, "foo", "bar"), {"foo": 42}) + assert both(foo="bar") == ((42,), {"foo": "bar"}) + assert both("foo", bar="baz") == ((42, "foo"), {"bar": "baz", "foo": 42}) def test_curry_original(self): assert self.pre_curry(passthrough).func is passthrough def test_instancemethod(self): class Test: - method = self.pre_curry(passthrough, 'test') + method = self.pre_curry(passthrough, "test") + test = Test() - assert (('test', test), {}) == test.method() + assert (("test", test), {}) == test.method() class Test_pretty_docs: @@ -56,58 +59,63 @@ class Test_pretty_docs: def test_module_magic(self): for target in self.currying_targets: - assert currying.pretty_docs(target(passthrough)).__module__ is \ - passthrough.__module__ + assert ( + currying.pretty_docs(target(passthrough)).__module__ + is passthrough.__module__ + ) # test is kinda useless if they are identical without pretty_docs - assert getattr(target(passthrough), '__module__', None) is not \ - passthrough.__module__ + assert ( + getattr(target(passthrough), "__module__", None) + is not passthrough.__module__ + ) def test_pretty_docs(self): for target in self.currying_targets: for func in (passthrough, documented): - assert currying.pretty_docs(target(func), 'new doc').__doc__ == 'new doc' + assert ( + currying.pretty_docs(target(func), "new doc").__doc__ == "new doc" + ) assert currying.pretty_docs(target(func)).__doc__ is func.__doc__ class TestPostCurry: - def test_post_curry(self): noop = currying.post_curry(passthrough) assert noop() == ((), {}) - assert noop('foo', 'bar') == (('foo', 'bar'), {}) - assert noop(foo='bar') == ((), {'foo': 'bar'}) - assert noop('foo', bar='baz') == (('foo',), {'bar': 'baz'}) + assert noop("foo", "bar") == (("foo", "bar"), {}) + assert noop(foo="bar") == ((), {"foo": "bar"}) + assert noop("foo", bar="baz") == (("foo",), {"bar": "baz"}) one_arg = currying.post_curry(passthrough, 42) assert one_arg() == ((42,), {}) - assert one_arg('foo', 'bar') == (('foo', 'bar', 42), {}) - assert one_arg(foo='bar') == ((42,), {'foo': 'bar'}) - assert one_arg('foo', bar='baz') == (('foo', 42), {'bar': 'baz'}) + assert one_arg("foo", "bar") == (("foo", "bar", 42), {}) + assert one_arg(foo="bar") == ((42,), {"foo": "bar"}) + assert one_arg("foo", bar="baz") == (("foo", 42), {"bar": "baz"}) keyword_arg = currying.post_curry(passthrough, foo=42) - assert keyword_arg() == ((), {'foo': 42}) - assert keyword_arg('foo', 'bar') == (('foo', 'bar'), {'foo': 42}) - assert keyword_arg(foo='bar') == ((), {'foo': 42}) - assert keyword_arg('foo', bar='baz') == (('foo',), {'bar': 'baz', 'foo': 42}) + assert keyword_arg() == ((), {"foo": 42}) + assert keyword_arg("foo", "bar") == (("foo", "bar"), {"foo": 42}) + assert keyword_arg(foo="bar") == ((), {"foo": 42}) + assert keyword_arg("foo", bar="baz") == (("foo",), {"bar": "baz", "foo": 42}) both = currying.post_curry(passthrough, 42, foo=42) - assert both() == ((42,), {'foo': 42}) - assert both('foo', 'bar') == (('foo', 'bar', 42), {'foo': 42}) - assert both(foo='bar') == ((42,), {'foo': 42}) - assert both('foo', bar='baz') == (('foo', 42), {'bar': 'baz', 'foo': 42}) + assert both() == ((42,), {"foo": 42}) + assert both("foo", "bar") == (("foo", "bar", 42), {"foo": 42}) + assert both(foo="bar") == ((42,), {"foo": 42}) + assert both("foo", bar="baz") == (("foo", 42), {"bar": "baz", "foo": 42}) def test_curry_original(self): assert currying.post_curry(passthrough).func is passthrough def test_instancemethod(self): class Test: - method = currying.post_curry(passthrough, 'test') + method = currying.post_curry(passthrough, "test") + test = Test() - assert ((test, 'test'), {}) == test.method() + assert ((test, "test"), {}) == test.method() class Test_wrap_exception: - def test_wrap_exception_complex(self): inner, outer = [], [] @@ -118,33 +126,33 @@ class Test_wrap_exception: assert isinstance(exception, inner_exception) assert functor is throwing_func assert fargs == (False,) - assert fkwds == {'monkey': 'bone'} + assert fkwds == {"monkey": "bone"} outer.append(True) raise wrapping_exception() def throwing_func(*args, **kwds): assert args == (False,) - assert kwds == {'monkey': 'bone'} + assert kwds == {"monkey": "bone"} inner.append(True) raise inner_exception() func = currying.wrap_exception_complex(f, IndexError)(throwing_func) # basic behaviour - pytest.raises(IndexError, func, False, monkey='bone') + pytest.raises(IndexError, func, False, monkey="bone") assert len(inner) == 1 assert len(outer) == 1 # ensure pass thru if it's an allowed exception inner_exception = IndexError - pytest.raises(IndexError, func, False, monkey='bone') + pytest.raises(IndexError, func, False, monkey="bone") assert len(inner) == 2 assert len(outer) == 1 # finally, ensure it doesn't intercept, and passes thru for # exceptions it shouldn't handle inner_exception = MemoryError - pytest.raises(MemoryError, func, False, monkey='bone') + pytest.raises(MemoryError, func, False, monkey="bone") assert len(inner) == 3 assert len(outer) == 1 @@ -159,9 +167,10 @@ class Test_wrap_exception: self.args = args self.kwds = kwds - func = currying.wrap_exception(my_exception, 1, 3, 2, monkey='bone', - ignores=ValueError)(throwing_func) - assert func.__name__ == 'throwing_func' + func = currying.wrap_exception( + my_exception, 1, 3, 2, monkey="bone", ignores=ValueError + )(throwing_func) + assert func.__name__ == "throwing_func" pytest.raises(ValueError, func) throw_kls = IndexError pytest.raises(my_exception, func) @@ -170,17 +179,23 @@ class Test_wrap_exception: raise AssertionError("shouldn't have been able to reach here") except my_exception as e: assert e.args == (1, 3, 2) - assert e.kwds == {'monkey': 'bone'} + assert e.kwds == {"monkey": "bone"} # finally, verify that the exception can be pased in. func = currying.wrap_exception( - my_exception, 1, 3, 2, monkey='bone', - ignores=ValueError, pass_error="the_exception")(throwing_func) - assert func.__name__ == 'throwing_func' + my_exception, + 1, + 3, + 2, + monkey="bone", + ignores=ValueError, + pass_error="the_exception", + )(throwing_func) + assert func.__name__ == "throwing_func" pytest.raises(my_exception, func) try: func() raise AssertionError("shouldn't have been able to reach here") except my_exception as e: assert e.args == (1, 3, 2) - assert e.kwds == {'monkey': 'bone', 'the_exception': e.__cause__} + assert e.kwds == {"monkey": "bone", "the_exception": e.__cause__} diff --git a/tests/test_data_source.py b/tests/test_data_source.py index ddd3eee6..1ede9aa0 100644 --- a/tests/test_data_source.py +++ b/tests/test_data_source.py @@ -53,15 +53,15 @@ class TestDataSource: assert reader_data == writer_data def _mk_data(self, size=(100000)): - return ''.join(str(x % 10) for x in range(size)) + return "".join(str(x % 10) for x in range(size)) def test_transfer_to_data_source(self): data = self._mk_data() reader = self.get_obj(data=data) if self.supports_mutable: - writer = self.get_obj(data='', mutable=True) + writer = self.get_obj(data="", mutable=True) else: - writer = data_source.data_source('', mutable=True) + writer = data_source.data_source("", mutable=True) reader.transfer_to_data_source(writer) self.assertContents(reader, writer) @@ -70,9 +70,11 @@ class TestDataSource: data = self._mk_data() reader = self.get_obj(data=data) if isinstance(reader, data_source.bz2_source): - writer = data_source.bz2_source(tmp_path / 'transfer_to_path', mutable=True) + writer = data_source.bz2_source(tmp_path / "transfer_to_path", mutable=True) else: - writer = data_source.local_source(tmp_path / 'transfer_to_path', mutable=True) + writer = data_source.local_source( + tmp_path / "transfer_to_path", mutable=True + ) reader.transfer_to_path(writer.path) @@ -82,9 +84,9 @@ class TestDataSource: data = self._mk_data() reader = self.get_obj(data=data) if self.supports_mutable: - writer = self.get_obj(data='', mutable=True) + writer = self.get_obj(data="", mutable=True) else: - writer = data_source.data_source('', mutable=True) + writer = data_source.data_source("", mutable=True) with reader.bytes_fileobj() as reader_f, writer.bytes_fileobj(True) as writer_f: data_source.transfer_between_files(reader_f, writer_f) @@ -93,15 +95,14 @@ class TestDataSource: class TestLocalSource(TestDataSource): - def get_obj(self, data="foonani", mutable=False, test_creation=False): self.fp = self.dir / "localsource.test" if not test_creation: mode = None if isinstance(data, bytes): - mode = 'wb' + mode = "wb" elif mode is None: - mode = 'w' + mode = "w" with open(self.fp, mode) as f: f.write(data) return data_source.local_source(self.fp, mutable=mutable) @@ -118,21 +119,20 @@ class TestLocalSource(TestDataSource): obj = self.get_obj(test_creation=True, mutable=True) # this will blow up if tries to ascii decode it. with obj.bytes_fileobj(True) as f: - assert f.read() == b'' + assert f.read() == b"" f.write(data) with obj.bytes_fileobj() as f: assert f.read() == data class TestBz2Source(TestDataSource): - def get_obj(self, data="foonani", mutable=False, test_creation=False): self.fp = self.dir / "bz2source.test.bz2" if not test_creation: if isinstance(data, str): data = data.encode() - with open(self.fp, 'wb') as f: - f.write(compression.compress_data('bzip2', data)) + with open(self.fp, "wb") as f: + f.write(compression.compress_data("bzip2", data)) return data_source.bz2_source(self.fp, mutable=mutable) def test_bytes_fileobj(self): @@ -150,8 +150,7 @@ class Test_invokable_data_source(TestDataSource): def get_obj(self, data="foonani", mutable=False): if isinstance(data, str): data = data.encode("utf8") - return data_source.invokable_data_source( - partial(self._get_data, data)) + return data_source.invokable_data_source(partial(self._get_data, data)) @staticmethod def _get_data(data, is_text=False): @@ -168,10 +167,10 @@ class Test_invokable_data_source_wrapper_text(Test_invokable_data_source): def get_obj(self, mutable=False, data="foonani"): return data_source.invokable_data_source.wrap_function( - partial(self._get_data, data), - self.text_mode) + partial(self._get_data, data), self.text_mode + ) - def _get_data(self, data='foonani'): + def _get_data(self, data="foonani"): if isinstance(data, str): if not self.text_mode: return data.encode("utf8") diff --git a/tests/test_decorators.py b/tests/test_decorators.py index ce00ac4b..92c0fb8c 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -8,7 +8,6 @@ from snakeoil.decorators import coroutine, namespace, splitexec class TestSplitExecDecorator: - def setup_method(self, method): self.pid = os.getpid() @@ -18,11 +17,14 @@ class TestSplitExecDecorator: assert self.pid != os.getpid() -@pytest.mark.skipif(not sys.platform.startswith('linux'), reason='supported on Linux only') +@pytest.mark.skipif( + not sys.platform.startswith("linux"), reason="supported on Linux only" +) class TestNamespaceDecorator: - - @pytest.mark.skipif(not os.path.exists('/proc/self/ns/user'), - reason='user namespace support required') + @pytest.mark.skipif( + not os.path.exists("/proc/self/ns/user"), + reason="user namespace support required", + ) def test_user_namespace(self): @namespace(user=True) def do_test(): @@ -31,31 +33,34 @@ class TestNamespaceDecorator: try: do_test() except PermissionError: - pytest.skip('No permission to use user namespace') - - @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/user') and os.path.exists('/proc/self/ns/uts')), - reason='user and uts namespace support required') + pytest.skip("No permission to use user namespace") + + @pytest.mark.skipif( + not ( + os.path.exists("/proc/self/ns/user") and os.path.exists("/proc/self/ns/uts") + ), + reason="user and uts namespace support required", + ) def test_uts_namespace(self): - @namespace(user=True, uts=True, hostname='host') + @namespace(user=True, uts=True, hostname="host") def do_test(): - ns_hostname, _, ns_domainname = socket.getfqdn().partition('.') - assert ns_hostname == 'host' - assert ns_domainname == '' + ns_hostname, _, ns_domainname = socket.getfqdn().partition(".") + assert ns_hostname == "host" + assert ns_domainname == "" try: do_test() except PermissionError: - pytest.skip('No permission to use user and uts namespace') + pytest.skip("No permission to use user and uts namespace") class TestCoroutineDecorator: - def test_coroutine(self): @coroutine def count(): i = 0 while True: - val = (yield i) + val = yield i i = val if val is not None else i + 1 cr = count() diff --git a/tests/test_demandload.py b/tests/test_demandload.py index ec2661c9..843cf801 100644 --- a/tests/test_demandload.py +++ b/tests/test_demandload.py @@ -9,6 +9,7 @@ from snakeoil import demandload # setup is what the test expects. # it also explicitly resets the state on the way out. + def reset_globals(functor): def f(*args, **kwds): orig_demandload = demandload.demandload @@ -22,60 +23,61 @@ def reset_globals(functor): demandload.demand_compile_regexp = orig_demand_compile demandload._protection_enabled = orig_protection demandload._noisy_protection = orig_noisy + return f class TestParser: - @reset_globals def test_parse(self): for input, output in [ - ('foo', [('foo', 'foo')]), - ('foo:bar', [('foo.bar', 'bar')]), - ('foo:bar,baz@spork', [('foo.bar', 'bar'), ('foo.baz', 'spork')]), - ('foo@bar', [('foo', 'bar')]), - ('foo_bar', [('foo_bar', 'foo_bar')]), - ]: + ("foo", [("foo", "foo")]), + ("foo:bar", [("foo.bar", "bar")]), + ("foo:bar,baz@spork", [("foo.bar", "bar"), ("foo.baz", "spork")]), + ("foo@bar", [("foo", "bar")]), + ("foo_bar", [("foo_bar", "foo_bar")]), + ]: assert output == list(demandload.parse_imports([input])) - pytest.raises(ValueError, list, demandload.parse_imports(['a.b'])) - pytest.raises(ValueError, list, demandload.parse_imports(['a:,'])) - pytest.raises(ValueError, list, demandload.parse_imports(['a:b,x@'])) - pytest.raises(ValueError, list, demandload.parse_imports(['b-x'])) - pytest.raises(ValueError, list, demandload.parse_imports([' b_x'])) + pytest.raises(ValueError, list, demandload.parse_imports(["a.b"])) + pytest.raises(ValueError, list, demandload.parse_imports(["a:,"])) + pytest.raises(ValueError, list, demandload.parse_imports(["a:b,x@"])) + pytest.raises(ValueError, list, demandload.parse_imports(["b-x"])) + pytest.raises(ValueError, list, demandload.parse_imports([" b_x"])) class TestPlaceholder: - @reset_globals def test_getattr(self): scope = {} - placeholder = demandload.Placeholder(scope, 'foo', list) - assert scope == object.__getattribute__(placeholder, '_scope') + placeholder = demandload.Placeholder(scope, "foo", list) + assert scope == object.__getattribute__(placeholder, "_scope") assert placeholder.__doc__ == [].__doc__ - assert scope['foo'] == [] + assert scope["foo"] == [] demandload._protection_enabled = lambda: True with pytest.raises(ValueError): - getattr(placeholder, '__doc__') + getattr(placeholder, "__doc__") @reset_globals def test__str__(self): scope = {} - placeholder = demandload.Placeholder(scope, 'foo', list) - assert scope == object.__getattribute__(placeholder, '_scope') + placeholder = demandload.Placeholder(scope, "foo", list) + assert scope == object.__getattribute__(placeholder, "_scope") assert str(placeholder) == str([]) - assert scope['foo'] == [] + assert scope["foo"] == [] @reset_globals def test_call(self): def passthrough(*args, **kwargs): return args, kwargs + def get_func(): return passthrough + scope = {} - placeholder = demandload.Placeholder(scope, 'foo', get_func) - assert scope == object.__getattribute__(placeholder, '_scope') - assert (('arg',), {'kwarg': 42}) == placeholder('arg', kwarg=42) - assert passthrough is scope['foo'] + placeholder = demandload.Placeholder(scope, "foo", get_func) + assert scope == object.__getattribute__(placeholder, "_scope") + assert (("arg",), {"kwarg": 42}) == placeholder("arg", kwarg=42) + assert passthrough is scope["foo"] @reset_globals def test_setattr(self): @@ -83,45 +85,43 @@ class TestPlaceholder: pass scope = {} - placeholder = demandload.Placeholder(scope, 'foo', Struct) + placeholder = demandload.Placeholder(scope, "foo", Struct) placeholder.val = 7 demandload._protection_enabled = lambda: True with pytest.raises(ValueError): - getattr(placeholder, 'val') - assert 7 == scope['foo'].val + getattr(placeholder, "val") + assert 7 == scope["foo"].val class TestImport: - @reset_globals def test_demandload(self): scope = {} - demandload.demandload('snakeoil:demandload', scope=scope) - assert demandload is not scope['demandload'] - assert demandload.demandload is scope['demandload'].demandload - assert demandload is scope['demandload'] + demandload.demandload("snakeoil:demandload", scope=scope) + assert demandload is not scope["demandload"] + assert demandload.demandload is scope["demandload"].demandload + assert demandload is scope["demandload"] @reset_globals def test_disabled_demandload(self): scope = {} - demandload.disabled_demandload('snakeoil:demandload', scope=scope) - assert demandload is scope['demandload'] + demandload.disabled_demandload("snakeoil:demandload", scope=scope) + assert demandload is scope["demandload"] class TestDemandCompileRegexp: - @reset_globals def test_demand_compile_regexp(self): scope = {} - demandload.demand_compile_regexp('foo', 'frob', scope=scope) - assert list(scope.keys()) == ['foo'] - assert 'frob' == scope['foo'].pattern - assert 'frob' == scope['foo'].pattern + demandload.demand_compile_regexp("foo", "frob", scope=scope) + assert list(scope.keys()) == ["foo"] + assert "frob" == scope["foo"].pattern + assert "frob" == scope["foo"].pattern # verify it's delayed via a bad regex. - demandload.demand_compile_regexp('foo', 'f(', scope=scope) - assert list(scope.keys()) == ['foo'] + demandload.demand_compile_regexp("foo", "f(", scope=scope) + assert list(scope.keys()) == ["foo"] # should blow up on accessing an attribute. - obj = scope['foo'] + obj = scope["foo"] with pytest.raises(sre_constants.error): - getattr(obj, 'pattern') + getattr(obj, "pattern") diff --git a/tests/test_demandload_usage.py b/tests/test_demandload_usage.py index 79ccfe03..ddde0563 100644 --- a/tests/test_demandload_usage.py +++ b/tests/test_demandload_usage.py @@ -4,7 +4,7 @@ from snakeoil.test import mixins class TestDemandLoadTargets(mixins.PythonNamespaceWalker): - target_namespace = 'snakeoil' + target_namespace = "snakeoil" ignore_all_import_failures = False @pytest.fixture(autouse=True) @@ -16,8 +16,8 @@ class TestDemandLoadTargets(mixins.PythonNamespaceWalker): def test_demandload_targets(self): for x in self.walk_namespace( - self.target_namespace, - ignore_failed_imports=self.ignore_all_import_failures): + self.target_namespace, ignore_failed_imports=self.ignore_all_import_failures + ): self.check_space(x) def check_space(self, mod): diff --git a/tests/test_dependant_methods.py b/tests/test_dependant_methods.py index ffd6d363..186e175a 100644 --- a/tests/test_dependant_methods.py +++ b/tests/test_dependant_methods.py @@ -8,7 +8,6 @@ def func(self, seq, data, val=True): class TestDependantMethods: - @staticmethod def generate_instance(methods, dependencies): class Class(metaclass=dm.ForcedDepends): @@ -25,13 +24,15 @@ class TestDependantMethods: results = [] o = self.generate_instance( {str(x): currying.post_curry(func, results, x) for x in range(10)}, - {str(x): str(x - 1) for x in range(1, 10)}) + {str(x): str(x - 1) for x in range(1, 10)}, + ) getattr(o, "9")() assert results == list(range(10)) results = [] o = self.generate_instance( {str(x): currying.post_curry(func, results, x, False) for x in range(10)}, - {str(x): str(x - 1) for x in range(1, 10)}) + {str(x): str(x - 1) for x in range(1, 10)}, + ) getattr(o, "9")() assert results == [0] getattr(o, "9")() @@ -41,7 +42,8 @@ class TestDependantMethods: results = [] o = self.generate_instance( {str(x): currying.post_curry(func, results, x) for x in range(10)}, - {str(x): str(x - 1) for x in range(1, 10)}) + {str(x): str(x - 1) for x in range(1, 10)}, + ) getattr(o, "1")() assert results == [0, 1] getattr(o, "2")() @@ -71,14 +73,15 @@ class TestDependantMethods: results = [] o = self.generate_instance( {str(x): currying.post_curry(func, results, x) for x in range(10)}, - {str(x): str(x - 1) for x in range(1, 10)}) - getattr(o, '2')(ignore_deps=True) + {str(x): str(x - 1) for x in range(1, 10)}, + ) + getattr(o, "2")(ignore_deps=True) assert [2] == results def test_no_deps(self): results = [] o = self.generate_instance( - {str(x): currying.post_curry(func, results, x) for x in range(10)}, - {}) - getattr(o, '2')() + {str(x): currying.post_curry(func, results, x) for x in range(10)}, {} + ) + getattr(o, "2")() assert [2] == results diff --git a/tests/test_fileutils.py b/tests/test_fileutils.py index a4555f89..356eb74d 100644 --- a/tests/test_fileutils.py +++ b/tests/test_fileutils.py @@ -13,7 +13,6 @@ from snakeoil.test import random_str class TestTouch: - @pytest.fixture def random_path(self, tmp_path): return tmp_path / random_str(10) @@ -124,19 +123,19 @@ class TestAtomicWriteFile: def cpy_setup_class(scope, func_name): - if getattr(fileutils, 'native_%s' % func_name) \ - is getattr(fileutils, func_name): - scope['skip'] = 'extensions disabled' + if getattr(fileutils, "native_%s" % func_name) is getattr(fileutils, func_name): + scope["skip"] = "extensions disabled" else: - scope['func'] = staticmethod(getattr(fileutils, func_name)) + scope["func"] = staticmethod(getattr(fileutils, func_name)) + class Test_readfile: func = staticmethod(fileutils.readfile) - test_cases = ['asdf\nfdasswer\1923', '', '987234'] + test_cases = ["asdf\nfdasswer\1923", "", "987234"] - default_encoding = 'ascii' - none_on_missing_ret_data = 'dar' + default_encoding = "ascii" + none_on_missing_ret_data = "dar" @staticmethod def convert_data(data, encoding): @@ -147,7 +146,7 @@ class Test_readfile: return data def test_it(self, tmp_path): - fp = tmp_path / 'testfile' + fp = tmp_path / "testfile" for expected in self.test_cases: raised = None encoding = self.default_encoding @@ -168,16 +167,16 @@ class Test_readfile: assert self.func(path) == expected def test_none_on_missing(self, tmp_path): - fp = tmp_path / 'nonexistent' + fp = tmp_path / "nonexistent" with pytest.raises(FileNotFoundError): self.func(fp) assert self.func(fp, True) is None - fp.write_bytes(self.convert_data('dar', 'ascii')) + fp.write_bytes(self.convert_data("dar", "ascii")) assert self.func(fp, True) == self.none_on_missing_ret_data # ensure it handles paths that go through files- # still should be suppress - assert self.func(fp / 'extra', True) is None + assert self.func(fp / "extra", True) is None class Test_readfile_ascii(Test_readfile): @@ -186,85 +185,86 @@ class Test_readfile_ascii(Test_readfile): class Test_readfile_utf8(Test_readfile): func = staticmethod(fileutils.readfile_utf8) - default_encoding = 'utf8' + default_encoding = "utf8" class Test_readfile_bytes(Test_readfile): func = staticmethod(fileutils.readfile_bytes) default_encoding = None - test_cases = list(map( - currying.post_curry(Test_readfile.convert_data, 'ascii'), - Test_readfile.test_cases)) - test_cases.append('\ua000fa'.encode("utf8")) + test_cases = list( + map( + currying.post_curry(Test_readfile.convert_data, "ascii"), + Test_readfile.test_cases, + ) + ) + test_cases.append("\ua000fa".encode("utf8")) none_on_missing_ret_data = Test_readfile.convert_data( - Test_readfile.none_on_missing_ret_data, 'ascii') + Test_readfile.none_on_missing_ret_data, "ascii" + ) class readlines_mixin: - def assertFunc(self, path, expected): expected = tuple(expected.split()) - if expected == ('',): + if expected == ("",): expected = () - if 'utf8' not in self.encoding_mode: + if "utf8" not in self.encoding_mode: assert tuple(self.func(path)) == expected return assert tuple(self.func(path)) == expected def test_none_on_missing(self, tmp_path): - fp = tmp_path / 'nonexistent' + fp = tmp_path / "nonexistent" with pytest.raises(FileNotFoundError): self.func(fp) assert not tuple(self.func(fp, False, True)) - fp.write_bytes(self.convert_data('dar', 'ascii')) + fp.write_bytes(self.convert_data("dar", "ascii")) assert tuple(self.func(fp, True)) == (self.none_on_missing_ret_data,) - assert not tuple(self.func(fp / 'missing', False, True)) + assert not tuple(self.func(fp / "missing", False, True)) def test_strip_whitespace(self, tmp_path): - fp = tmp_path / 'data' + fp = tmp_path / "data" - fp.write_bytes(self.convert_data(' dar1 \ndar2 \n dar3\n', - 'ascii')) + fp.write_bytes(self.convert_data(" dar1 \ndar2 \n dar3\n", "ascii")) results = tuple(self.func(fp, True)) - expected = ('dar1', 'dar2', 'dar3') - if self.encoding_mode == 'bytes': + expected = ("dar1", "dar2", "dar3") + if self.encoding_mode == "bytes": expected = tuple(x.encode("ascii") for x in expected) assert results == expected # this time without the trailing newline... - fp.write_bytes(self.convert_data(' dar1 \ndar2 \n dar3', - 'ascii')) + fp.write_bytes(self.convert_data(" dar1 \ndar2 \n dar3", "ascii")) results = tuple(self.func(fp, True)) assert results == expected # test a couple of edgecases; underly c extension has gotten these # wrong before. - fp.write_bytes(self.convert_data('0', 'ascii')) + fp.write_bytes(self.convert_data("0", "ascii")) results = tuple(self.func(fp, True)) - expected = ('0',) - if self.encoding_mode == 'bytes': + expected = ("0",) + if self.encoding_mode == "bytes": expected = tuple(x.encode("ascii") for x in expected) assert results == expected - fp.write_bytes(self.convert_data('0\n', 'ascii')) + fp.write_bytes(self.convert_data("0\n", "ascii")) results = tuple(self.func(fp, True)) - expected = ('0',) - if self.encoding_mode == 'bytes': + expected = ("0",) + if self.encoding_mode == "bytes": expected = tuple(x.encode("ascii") for x in expected) assert results == expected - fp.write_bytes(self.convert_data('0 ', 'ascii')) + fp.write_bytes(self.convert_data("0 ", "ascii")) results = tuple(self.func(fp, True)) - expected = ('0',) - if self.encoding_mode == 'bytes': + expected = ("0",) + if self.encoding_mode == "bytes": expected = tuple(x.encode("ascii") for x in expected) assert results == expected def mk_readlines_test(scope, mode): - func_name = 'readlines_%s' % mode - base = globals()['Test_readfile_%s' % mode] + func_name = "readlines_%s" % mode + base = globals()["Test_readfile_%s" % mode] class kls(readlines_mixin, base): func = staticmethod(getattr(fileutils, func_name)) @@ -273,14 +273,15 @@ def mk_readlines_test(scope, mode): kls.__name__ = "Test_%s" % func_name scope["Test_%s" % func_name] = kls + for case in ("ascii", "bytes", "utf8"): - name = 'readlines_%s' % case + name = "readlines_%s" % case mk_readlines_test(locals(), case) class TestBrokenStats: - test_cases = ['/proc/crypto', '/sys/devices/system/cpu/present'] + test_cases = ["/proc/crypto", "/sys/devices/system/cpu/present"] def test_readfile(self): for path in self.test_cases: @@ -292,7 +293,7 @@ class TestBrokenStats: def _check_path(self, path, func, split_it=False): try: - with open(path, 'r') as handle: + with open(path, "r") as handle: data = handle.read() except EnvironmentError as e: if e.errno not in (errno.ENOENT, errno.EPERM): @@ -302,7 +303,7 @@ class TestBrokenStats: func_data = func(path) if split_it: func_data = list(func_data) - data = [x for x in data.split('\n') if x] + data = [x for x in data.split("\n") if x] func_data = [x for x in func_data if x] assert func_data == data @@ -313,13 +314,13 @@ class Test_mmap_or_open_for_read: func = staticmethod(fileutils.mmap_or_open_for_read) def test_zero_length(self, tmp_path): - (path := tmp_path / "target").write_text('') + (path := tmp_path / "target").write_text("") m, f = self.func(path) assert m is None - assert f.read() == b'' + assert f.read() == b"" f.close() - def test_mmap(self, tmp_path, data=b'foonani'): + def test_mmap(self, tmp_path, data=b"foonani"): (path := tmp_path / "target").write_bytes(data) m, f = self.func(path) assert len(m) == len(data) @@ -329,14 +330,14 @@ class Test_mmap_or_open_for_read: class Test_mmap_and_close: - def test_it(self, tmp_path): - (path := tmp_path / "target").write_bytes(data := b'asdfasdf') + (path := tmp_path / "target").write_bytes(data := b"asdfasdf") fd, m = None, None try: fd = os.open(path, os.O_RDONLY) m = _fileutils.mmap_and_close( - fd, len(data), mmap.MAP_PRIVATE, mmap.PROT_READ) + fd, len(data), mmap.MAP_PRIVATE, mmap.PROT_READ + ) # and ensure it closed the fd... with pytest.raises(EnvironmentError): os.read(fd, 1) diff --git a/tests/test_formatters.py b/tests/test_formatters.py index 266ef1e0..549f2adc 100644 --- a/tests/test_formatters.py +++ b/tests/test_formatters.py @@ -18,16 +18,16 @@ class TestPlainTextFormatter: def test_basics(self): # As many sporks as fit in 20 chars. - sporks = ' '.join(3 * ('spork',)) + sporks = " ".join(3 * ("spork",)) for inputs, output in [ - (('\N{SNOWMAN}',), '?'), - ((7 * 'spork ',), '%s\n%s\n%s' % (sporks, sporks, 'spork ')), - (7 * ('spork ',), '%s \n%s \n%s' % (sporks, sporks, 'spork ')), - ((30 * 'a'), 20 * 'a' + '\n' + 10 * 'a'), - (30 * ('a',), 20 * 'a' + '\n' + 10 * 'a'), - ]: + (("\N{SNOWMAN}",), "?"), + ((7 * "spork ",), "%s\n%s\n%s" % (sporks, sporks, "spork ")), + (7 * ("spork ",), "%s \n%s \n%s" % (sporks, sporks, "spork ")), + ((30 * "a"), 20 * "a" + "\n" + 10 * "a"), + (30 * ("a",), 20 * "a" + "\n" + 10 * "a"), + ]: stream = BytesIO() - formatter = self.kls(stream, encoding='ascii') + formatter = self.kls(stream, encoding="ascii") formatter.width = 20 formatter.write(autoline=False, wrap=True, *inputs) assert output.encode() == stream.getvalue() @@ -35,69 +35,70 @@ class TestPlainTextFormatter: def test_first_prefix(self): # As many sporks as fit in 20 chars. for inputs, output in [ - (('\N{SNOWMAN}',), 'foon:?'), - ((7 * 'spork ',), - 'foon:spork spork\n' - 'spork spork spork\n' - 'spork spork '), - (7 * ('spork ',), - 'foon:spork spork \n' - 'spork spork spork \n' - 'spork spork '), - ((30 * 'a'), 'foon:' + 15 * 'a' + '\n' + 15 * 'a'), - (30 * ('a',), 'foon:' + 15 * 'a' + '\n' + 15 * 'a'), - ]: + (("\N{SNOWMAN}",), "foon:?"), + ( + (7 * "spork ",), + "foon:spork spork\n" "spork spork spork\n" "spork spork ", + ), + ( + 7 * ("spork ",), + "foon:spork spork \n" "spork spork spork \n" "spork spork ", + ), + ((30 * "a"), "foon:" + 15 * "a" + "\n" + 15 * "a"), + (30 * ("a",), "foon:" + 15 * "a" + "\n" + 15 * "a"), + ]: stream = BytesIO() - formatter = self.kls(stream, encoding='ascii') + formatter = self.kls(stream, encoding="ascii") formatter.width = 20 - formatter.write(autoline=False, wrap=True, first_prefix='foon:', *inputs) + formatter.write(autoline=False, wrap=True, first_prefix="foon:", *inputs) assert output.encode() == stream.getvalue() def test_later_prefix(self): for inputs, output in [ - (('\N{SNOWMAN}',), '?'), - ((7 * 'spork ',), - 'spork spork spork\n' - 'foon:spork spork\n' - 'foon:spork spork '), - (7 * ('spork ',), - 'spork spork spork \n' - 'foon:spork spork \n' - 'foon:spork spork '), - ((30 * 'a'), 20 * 'a' + '\n' + 'foon:' + 10 * 'a'), - (30 * ('a',), 20 * 'a' + '\n' + 'foon:' + 10 * 'a'), - ]: + (("\N{SNOWMAN}",), "?"), + ( + (7 * "spork ",), + "spork spork spork\n" "foon:spork spork\n" "foon:spork spork ", + ), + ( + 7 * ("spork ",), + "spork spork spork \n" "foon:spork spork \n" "foon:spork spork ", + ), + ((30 * "a"), 20 * "a" + "\n" + "foon:" + 10 * "a"), + (30 * ("a",), 20 * "a" + "\n" + "foon:" + 10 * "a"), + ]: stream = BytesIO() - formatter = self.kls(stream, encoding='ascii') + formatter = self.kls(stream, encoding="ascii") formatter.width = 20 - formatter.later_prefix = ['foon:'] + formatter.later_prefix = ["foon:"] formatter.write(wrap=True, autoline=False, *inputs) assert output.encode() == stream.getvalue() def test_complex(self): stream = BytesIO() - formatter = self.kls(stream, encoding='ascii') + formatter = self.kls(stream, encoding="ascii") formatter.width = 9 - formatter.first_prefix = ['foo', None, ' d'] - formatter.later_prefix = ['dorkey'] + formatter.first_prefix = ["foo", None, " d"] + formatter.later_prefix = ["dorkey"] formatter.write("dar bl", wrap=True, autoline=False) assert "foo ddar\ndorkeybl".encode() == stream.getvalue() - formatter.write(" "*formatter.width, wrap=True, autoline=True) + formatter.write(" " * formatter.width, wrap=True, autoline=True) formatter.stream = stream = BytesIO() formatter.write("dar", " b", wrap=True, autoline=False) assert "foo ddar\ndorkeyb".encode() == stream.getvalue() - output = \ -""" rdepends: >=dev-lang/python-2.3 >=sys-apps/sed-4.0.5 + output = """ rdepends: >=dev-lang/python-2.3 >=sys-apps/sed-4.0.5 dev-python/python-fchksum """ stream = BytesIO() - formatter = self.kls(stream, encoding='ascii', width=80) + formatter = self.kls(stream, encoding="ascii", width=80) formatter.wrap = True assert formatter.autoline assert formatter.width == 80 - formatter.later_prefix = [' '] - formatter.write(" rdepends: >=dev-lang/python-2.3 " - ">=sys-apps/sed-4.0.5 dev-python/python-fchksum") + formatter.later_prefix = [" "] + formatter.write( + " rdepends: >=dev-lang/python-2.3 " + ">=sys-apps/sed-4.0.5 dev-python/python-fchksum" + ) assert len(formatter.first_prefix) == 0 assert len(formatter.later_prefix) == 1 assert output.encode() == stream.getvalue() @@ -105,148 +106,176 @@ class TestPlainTextFormatter: formatter.stream = stream = BytesIO() # push it right up to the limit. formatter.width = 82 - formatter.write(" rdepends: >=dev-lang/python-2.3 " - ">=sys-apps/sed-4.0.5 dev-python/python-fchksum") + formatter.write( + " rdepends: >=dev-lang/python-2.3 " + ">=sys-apps/sed-4.0.5 dev-python/python-fchksum" + ) assert output.encode() == stream.getvalue() formatter.first_prefix = [] - formatter.later_prefix = [' '] + formatter.later_prefix = [" "] formatter.width = 28 formatter.autoline = False formatter.wrap = True formatter.stream = stream = BytesIO() input = (" description: ", "The Portage") formatter.write(*input) - output = ''.join(input).rsplit(" ", 1) - output[1] = ' %s' % output[1] - assert '\n'.join(output).encode() == stream.getvalue() - + output = "".join(input).rsplit(" ", 1) + output[1] = " %s" % output[1] + assert "\n".join(output).encode() == stream.getvalue() def test_wrap_autoline(self): for inputs, output in [ - ((3 * ('spork',)), 'spork\nspork\nspork\n'), - (3 * (('spork',),), 'spork\nspork\nspork\n'), - (((3 * 'spork',),), - '\n' - 'foonsporks\n' - 'foonporksp\n' - 'foonork\n'), - ((('fo',), (2 * 'spork',),), 'fo\nsporkspork\n'), - ((('fo',), (3 * 'spork',),), - 'fo\n' - '\n' - 'foonsporks\n' - 'foonporksp\n' - 'foonork\n'), - ]: + ((3 * ("spork",)), "spork\nspork\nspork\n"), + (3 * (("spork",),), "spork\nspork\nspork\n"), + (((3 * "spork",),), "\n" "foonsporks\n" "foonporksp\n" "foonork\n"), + ( + ( + ("fo",), + (2 * "spork",), + ), + "fo\nsporkspork\n", + ), + ( + ( + ("fo",), + (3 * "spork",), + ), + "fo\n" "\n" "foonsporks\n" "foonporksp\n" "foonork\n", + ), + ]: stream = BytesIO() - formatter = self.kls(stream, encoding='ascii') + formatter = self.kls(stream, encoding="ascii") formatter.width = 10 for input in inputs: - formatter.write(wrap=True, later_prefix='foon', *input) + formatter.write(wrap=True, later_prefix="foon", *input) assert output.encode() == stream.getvalue() class TerminfoFormatterTest: - def _test_stream(self, stream, formatter, inputs, output): stream.seek(0) stream.truncate() formatter.write(*inputs) stream.seek(0) result = stream.read() - output = ''.join(output) - assert output.encode() == result, \ - "given(%r), expected(%r), got(%r)" % (inputs, output, result) + output = "".join(output) + assert output.encode() == result, "given(%r), expected(%r), got(%r)" % ( + inputs, + output, + result, + ) @issue7567 def test_terminfo(self): - esc = '\x1b[' + esc = "\x1b[" stream = TemporaryFile() - f = formatters.TerminfoFormatter(stream, 'ansi', True, 'ascii') + f = formatters.TerminfoFormatter(stream, "ansi", True, "ascii") f.autoline = False for inputs, output in ( - ((f.bold, 'bold'), (esc, '1m', 'bold', esc, '0;10m')), - ((f.underline, 'underline'), - (esc, '4m', 'underline', esc, '0;10m')), - ((f.fg('red'), 'red'), (esc, '31m', 'red', esc, '39;49m')), - ((f.fg('red'), 'red', f.bold, 'boldred', f.fg(), 'bold', - f.reset, 'done'), - (esc, '31m', 'red', esc, '1m', 'boldred', esc, '39;49m', 'bold', - esc, '0;10m', 'done')), - ((42,), ('42',)), - (('\N{SNOWMAN}',), ('?',)) - ): + ((f.bold, "bold"), (esc, "1m", "bold", esc, "0;10m")), + ((f.underline, "underline"), (esc, "4m", "underline", esc, "0;10m")), + ((f.fg("red"), "red"), (esc, "31m", "red", esc, "39;49m")), + ( + ( + f.fg("red"), + "red", + f.bold, + "boldred", + f.fg(), + "bold", + f.reset, + "done", + ), + ( + esc, + "31m", + "red", + esc, + "1m", + "boldred", + esc, + "39;49m", + "bold", + esc, + "0;10m", + "done", + ), + ), + ((42,), ("42",)), + (("\N{SNOWMAN}",), ("?",)), + ): self._test_stream(stream, f, inputs, output) f.autoline = True - self._test_stream( - stream, f, ('lala',), ('lala', '\n')) + self._test_stream(stream, f, ("lala",), ("lala", "\n")) def test_unsupported_term(self): stream = TemporaryFile() with pytest.raises(formatters.TerminfoUnsupported): - formatters.TerminfoFormatter(stream, term='dumb') + formatters.TerminfoFormatter(stream, term="dumb") @issue7567 def test_title(self): stream = TemporaryFile() try: - f = formatters.TerminfoFormatter(stream, 'xterm+sl', True, 'ascii') + f = formatters.TerminfoFormatter(stream, "xterm+sl", True, "ascii") except curses.error: pytest.skip("xterm+sl not in terminfo db") - f.title('TITLE') + f.title("TITLE") stream.seek(0) - assert b'\x1b]0;TITLE\x07' == stream.read() + assert b"\x1b]0;TITLE\x07" == stream.read() def _with_term(term, func, *args, **kwargs): - orig_term = os.environ.get('TERM') + orig_term = os.environ.get("TERM") try: - os.environ['TERM'] = term + os.environ["TERM"] = term return func(*args, **kwargs) finally: if orig_term is None: - del os.environ['TERM'] + del os.environ["TERM"] else: - os.environ['TERM'] = orig_term + os.environ["TERM"] = orig_term + # XXX ripped from pkgcore's test_commandline -def _get_pty_pair(encoding='ascii'): +def _get_pty_pair(encoding="ascii"): master_fd, slave_fd = pty.openpty() - master = os.fdopen(master_fd, 'rb', 0) - out = os.fdopen(slave_fd, 'wb', 0) + master = os.fdopen(master_fd, "rb", 0) + out = os.fdopen(slave_fd, "wb", 0) return master, out -@pytest.mark.skip(reason='this currently breaks on github ci due to the issue7567 workaround') +@pytest.mark.skip( + reason="this currently breaks on github ci due to the issue7567 workaround" +) class TestGetFormatter: - @issue7567 def test_dumb_terminal(self): master, _out = _get_pty_pair() - formatter = _with_term('dumb', formatters.get_formatter, master) + formatter = _with_term("dumb", formatters.get_formatter, master) assert isinstance(formatter, formatters.PlainTextFormatter) @issue7567 def test_vt100_terminal(self): master, _out = _get_pty_pair() - formatter = _with_term('vt100', formatters.get_formatter, master) + formatter = _with_term("vt100", formatters.get_formatter, master) assert isinstance(formatter, formatters.PlainTextFormatter) @issue7567 def test_smart_terminal(self): master, _out = _get_pty_pair() - formatter = _with_term('xterm', formatters.get_formatter, master) + formatter = _with_term("xterm", formatters.get_formatter, master) assert isinstance(formatter, formatters.TerminfoFormatter) @issue7567 def test_not_a_tty(self): stream = TemporaryFile() - formatter = _with_term('xterm', formatters.get_formatter, stream) + formatter = _with_term("xterm", formatters.get_formatter, stream) assert isinstance(formatter, formatters.PlainTextFormatter) @issue7567 def test_no_fd(self): stream = BytesIO() - formatter = _with_term('xterm', formatters.get_formatter, stream) + formatter = _with_term("xterm", formatters.get_formatter, stream) assert isinstance(formatter, formatters.PlainTextFormatter) diff --git a/tests/test_iterables.py b/tests/test_iterables.py index 3345c5e8..d0d57683 100644 --- a/tests/test_iterables.py +++ b/tests/test_iterables.py @@ -1,12 +1,10 @@ import operator import pytest -from snakeoil.iterables import (caching_iter, expandable_chain, iter_sort, - partition) +from snakeoil.iterables import caching_iter, expandable_chain, iter_sort, partition class TestPartition: - def test_empty(self): a, b = partition(()) assert list(a) == [] @@ -23,19 +21,18 @@ class TestPartition: class TestExpandableChain: - def test_normal_function(self): i = [iter(range(100)) for x in range(3)] e = expandable_chain() e.extend(i) - assert list(e) == list(range(100))*3 + assert list(e) == list(range(100)) * 3 for x in i + [e]: pytest.raises(StopIteration, x.__next__) def test_extend(self): e = expandable_chain() e.extend(range(100) for i in (1, 2)) - assert list(e) == list(range(100))*2 + assert list(e) == list(range(100)) * 2 with pytest.raises(StopIteration): e.extend([[]]) @@ -62,7 +59,6 @@ class TestExpandableChain: class TestCachingIter: - def test_iter_consumption(self): i = iter(range(100)) c = caching_iter(i) @@ -147,6 +143,7 @@ class Test_iter_sort: def test_ordering(self): def f(l): return sorted(l, key=operator.itemgetter(0)) + result = list(iter_sort(f, *[iter(range(x, x + 10)) for x in (30, 20, 0, 10)])) expected = list(range(40)) assert result == expected diff --git a/tests/test_klass.py b/tests/test_klass.py index 773925d2..25728fa5 100644 --- a/tests/test_klass.py +++ b/tests/test_klass.py @@ -14,7 +14,8 @@ class Test_GetAttrProxy: class foo1: def __init__(self, obj): self.obj = obj - __getattr__ = self.kls('obj') + + __getattr__ = self.kls("obj") class foo2: pass @@ -27,18 +28,18 @@ class Test_GetAttrProxy: o2.foon = "dar" assert o.foon == "dar" o.foon = "foo" - assert o.foon == 'foo' + assert o.foon == "foo" def test_attrlist(self): def make_class(attr_list=None): class foo(metaclass=self.kls): if attr_list is not None: - locals()['__attr_comparison__'] = attr_list + locals()["__attr_comparison__"] = attr_list with pytest.raises(TypeError): make_class() with pytest.raises(TypeError): - make_class(['foon']) + make_class(["foon"]) with pytest.raises(TypeError): make_class([None]) @@ -47,38 +48,39 @@ class Test_GetAttrProxy: bar = "baz" class Test: - method = self.kls('test') + method = self.kls("test") test = foo() test = Test() - assert test.method('bar') == foo.bar + assert test.method("bar") == foo.bar class TestDirProxy: - @staticmethod def noninternal_attrs(obj): - return sorted(x for x in dir(obj) if not re.match(r'__\w+__', x)) + return sorted(x for x in dir(obj) if not re.match(r"__\w+__", x)) def test_combined(self): class foo1: def __init__(self, obj): self.obj = obj - __dir__ = klass.DirProxy('obj') + + __dir__ = klass.DirProxy("obj") class foo2: def __init__(self): - self.attr = 'foo' + self.attr = "foo" o2 = foo2() o = foo1(o2) - assert self.noninternal_attrs(o) == ['attr', 'obj'] + assert self.noninternal_attrs(o) == ["attr", "obj"] def test_empty(self): class foo1: def __init__(self, obj): self.obj = obj - __dir__ = klass.DirProxy('obj') + + __dir__ = klass.DirProxy("obj") class foo2: pass @@ -86,23 +88,26 @@ class TestDirProxy: o2 = foo2() o = foo1(o2) assert self.noninternal_attrs(o2) == [] - assert self.noninternal_attrs(o) == ['obj'] + assert self.noninternal_attrs(o) == ["obj"] def test_slots(self): class foo1: - __slots__ = ('obj',) + __slots__ = ("obj",) + def __init__(self, obj): self.obj = obj - __dir__ = klass.DirProxy('obj') + + __dir__ = klass.DirProxy("obj") class foo2: - __slots__ = ('attr',) + __slots__ = ("attr",) + def __init__(self): - self.attr = 'foo' + self.attr = "foo" o2 = foo2() o = foo1(o2) - assert self.noninternal_attrs(o) == ['attr', 'obj'] + assert self.noninternal_attrs(o) == ["attr", "obj"] class Test_contains: @@ -111,6 +116,7 @@ class Test_contains: def test_it(self): class c(dict): __contains__ = self.func + d = c({"1": 2}) assert "1" in d assert 1 not in d @@ -122,6 +128,7 @@ class Test_get: def test_it(self): class c(dict): get = self.func + d = c({"1": 2}) assert d.get("1") == 2 assert d.get("1", 3) == 2 @@ -142,11 +149,13 @@ class Test_chained_getter: assert id(self.kls("fa2341fa")) == l[0] def test_eq(self): - assert self.kls("asdf", disable_inst_caching=True) == \ - self.kls("asdf", disable_inst_caching=True) + assert self.kls("asdf", disable_inst_caching=True) == self.kls( + "asdf", disable_inst_caching=True + ) - assert self.kls("asdf2", disable_inst_caching=True) != \ - self.kls("asdf", disable_inst_caching=True) + assert self.kls("asdf2", disable_inst_caching=True) != self.kls( + "asdf", disable_inst_caching=True + ) def test_it(self): class maze: @@ -159,13 +168,13 @@ class Test_chained_getter: d = {} m = maze(d) f = self.kls - assert f('foon')(m) == m + assert f("foon")(m) == m d["foon"] = 1 - assert f('foon')(m) == 1 - assert f('dar.foon')(m) == 1 - assert f('.'.join(['blah']*10))(m) == m + assert f("foon")(m) == 1 + assert f("dar.foon")(m) == 1 + assert f(".".join(["blah"] * 10))(m) == m with pytest.raises(AttributeError): - f('foon.dar')(m) + f("foon.dar")(m) class Test_jit_attr: @@ -184,23 +193,28 @@ class Test_jit_attr: def jit_attr_ext_method(self): return partial(klass.jit_attr_ext_method, kls=self.kls) - def mk_inst(self, attrname='_attr', method_lookup=False, - use_cls_setattr=False, func=None, - singleton=klass._uncached_singleton): + def mk_inst( + self, + attrname="_attr", + method_lookup=False, + use_cls_setattr=False, + func=None, + singleton=klass._uncached_singleton, + ): f = func if not func: + def f(self): self._invokes.append(self) return 54321 class cls: - def __init__(self): sf = partial(object.__setattr__, self) - sf('_sets', []) - sf('_reflects', []) - sf('_invokes', []) + sf("_sets", []) + sf("_reflects", []) + sf("_invokes", []) attr = self.kls(f, attrname, singleton, use_cls_setattr) @@ -219,13 +233,22 @@ class Test_jit_attr: sets = [instance] * sets reflects = [instance] * reflects invokes = [instance] * invokes - msg = ("checking %s: got(%r), expected(%r); state was sets=%r, " - "reflects=%r, invokes=%r" % ( - "%s", "%s", "%s", instance._sets, instance._reflects, - instance._invokes)) + msg = ( + "checking %s: got(%r), expected(%r); state was sets=%r, " + "reflects=%r, invokes=%r" + % ("%s", "%s", "%s", instance._sets, instance._reflects, instance._invokes) + ) assert instance._sets == sets, msg % ("sets", instance._sets, sets) - assert instance._reflects == reflects, msg % ("reflects", instance._reflects, reflects) - assert instance._invokes == invokes, msg % ("invokes", instance._invokes, invokes) + assert instance._reflects == reflects, msg % ( + "reflects", + instance._reflects, + reflects, + ) + assert instance._invokes == invokes, msg % ( + "invokes", + instance._invokes, + invokes, + ) def test_implementation(self): obj = self.mk_inst() @@ -298,7 +321,7 @@ class Test_jit_attr: object.__setattr__(self, attr, value) o = cls() - assert not hasattr(o, 'invoked') + assert not hasattr(o, "invoked") assert o.my_attr == now assert o._blah2 == now assert o.invoked @@ -315,34 +338,34 @@ class Test_jit_attr: return now2 def __setattr__(self, attr, value): - if not getattr(self, '_setattr_allowed', False): + if not getattr(self, "_setattr_allowed", False): raise TypeError("setattr isn't allowed for %s" % attr) object.__setattr__(self, attr, value) - base.attr = self.jit_attr_ext_method('f1', '_attr') + base.attr = self.jit_attr_ext_method("f1", "_attr") o = base() assert o.attr == now assert o._attr == now assert o.attr == now - base.attr = self.jit_attr_ext_method('f1', '_attr', use_cls_setattr=True) + base.attr = self.jit_attr_ext_method("f1", "_attr", use_cls_setattr=True) o = base() with pytest.raises(TypeError): - getattr(o, 'attr') + getattr(o, "attr") base._setattr_allowed = True assert o.attr == now - base.attr = self.jit_attr_ext_method('f2', '_attr2') + base.attr = self.jit_attr_ext_method("f2", "_attr2") o = base() assert o.attr == now2 assert o._attr2 == now2 # finally, check that it's doing lookups rather then storing the func. - base.attr = self.jit_attr_ext_method('func', '_attr2') + base.attr = self.jit_attr_ext_method("func", "_attr2") o = base() # no func... with pytest.raises(AttributeError): - getattr(o, 'attr') + getattr(o, "attr") base.func = base.f1 assert o.attr == now assert o._attr2 == now @@ -354,7 +377,13 @@ class Test_jit_attr: def test_check_singleton_is_compare(self): def throw_assert(*args, **kwds): - raise AssertionError("I shouldn't be invoked: %s, %s" % (args, kwds,)) + raise AssertionError( + "I shouldn't be invoked: %s, %s" + % ( + args, + kwds, + ) + ) class puker: __eq__ = throw_assert @@ -369,11 +398,13 @@ class Test_jit_attr: def test_cached_property(self): l = [] + class foo: @klass.cached_property def blah(self, l=l, i=iter(range(5))): l.append(None) return next(i) + f = foo() assert f.blah == 0 assert len(l) == 1 @@ -413,15 +444,15 @@ class Test_aliased_attr: o = cls() with pytest.raises(AttributeError): - getattr(o, 'attr') + getattr(o, "attr") o.dar = "foon" with pytest.raises(AttributeError): - getattr(o, 'attr') + getattr(o, "attr") o.dar = o o.blah = "monkey" - assert o.attr == 'monkey' + assert o.attr == "monkey" # verify it'll cross properties... class blah: @@ -431,6 +462,7 @@ class Test_aliased_attr: @property def foon(self): return blah() + alias = self.func("foon.target") o = cls() @@ -442,12 +474,15 @@ class Test_cached_hash: def test_it(self): now = int(time()) + class cls: invoked = [] + @self.func def __hash__(self): self.invoked.append(self) return now + o = cls() assert hash(o) == now assert o.invoked == [o] @@ -462,7 +497,7 @@ class Test_reflective_hash: def test_it(self): class cls: - __hash__ = self.func('_hash') + __hash__ = self.func("_hash") obj = cls() with pytest.raises(AttributeError): @@ -477,7 +512,8 @@ class Test_reflective_hash: hash(obj) class cls2: - __hash__ = self.func('_dar') + __hash__ = self.func("_dar") + obj = cls2() with pytest.raises(AttributeError): hash(obj) @@ -486,7 +522,6 @@ class Test_reflective_hash: class TestImmutableInstance: - def test_metaclass(self): self.common_test(lambda x: x, metaclass=klass.immutable_instance) @@ -506,7 +541,7 @@ class TestImmutableInstance: with pytest.raises(AttributeError): delattr(o, "dar") - object.__setattr__(o, 'dar', 'foon') + object.__setattr__(o, "dar", "foon") with pytest.raises(AttributeError): delattr(o, "dar") @@ -541,7 +576,6 @@ class TestAliasMethod: class TestPatch: - def setup_method(self, method): # cache original methods self._math_ceil = math.ceil @@ -556,7 +590,7 @@ class TestPatch: n = 0.1 assert math.ceil(n) == 1 - @klass.patch('math.ceil') + @klass.patch("math.ceil") def ceil(orig_ceil, n): return math.floor(n) @@ -567,8 +601,8 @@ class TestPatch: assert math.ceil(n) == 2 assert math.floor(n) == 1 - @klass.patch('math.ceil') - @klass.patch('math.floor') + @klass.patch("math.ceil") + @klass.patch("math.floor") def zero(orig_func, n): return 0 diff --git a/tests/test_mappings.py b/tests/test_mappings.py index b1aef254..1ffe7801 100644 --- a/tests/test_mappings.py +++ b/tests/test_mappings.py @@ -10,7 +10,6 @@ def a_dozen(): class BasicDict(mappings.DictMixin): - def __init__(self, i=None, **kwargs): self._d = {} mappings.DictMixin.__init__(self, i, **kwargs) @@ -20,7 +19,6 @@ class BasicDict(mappings.DictMixin): class MutableDict(BasicDict): - def __setitem__(self, key, val): self._d[key] = val @@ -36,7 +34,6 @@ class ImmutableDict(BasicDict): class TestDictMixin: - def test_immutability(self): d = ImmutableDict() pytest.raises(AttributeError, d.__setitem__, "spork", "foon") @@ -59,12 +56,12 @@ class TestDictMixin: pytest.raises(KeyError, d.pop, "spork") assert d.pop("spork", "bat") == "bat" assert d.pop("foo") == "bar" - assert d.popitem(), ("baz" == "cat") + assert d.popitem(), "baz" == "cat" pytest.raises(KeyError, d.popitem) assert d.pop("nonexistent", None) == None def test_init(self): - d = MutableDict((('foo', 'bar'), ('spork', 'foon')), baz="cat") + d = MutableDict((("foo", "bar"), ("spork", "foon")), baz="cat") assert d["foo"] == "bar" assert d["baz"] == "cat" d.clear() @@ -73,19 +70,20 @@ class TestDictMixin: def test_bool(self): d = MutableDict() assert not d - d['x'] = 1 + d["x"] = 1 assert d - del d['x'] + del d["x"] assert not d class RememberingNegateMixin: - def setup_method(self, method): self.negate_calls = [] + def negate(i): self.negate_calls.append(i) return -i + self.negate = negate def teardown_method(self, method): @@ -94,7 +92,6 @@ class RememberingNegateMixin: class LazyValDictTestMixin: - def test_invalid_operations(self): pytest.raises(AttributeError, operator.setitem, self.dict, 7, 7) pytest.raises(AttributeError, operator.delitem, self.dict, 7) @@ -118,6 +115,7 @@ class LazyValDictTestMixin: # missing key def get(): return self.dict[42] + pytest.raises(KeyError, get) def test_caching(self): @@ -129,7 +127,6 @@ class LazyValDictTestMixin: class TestLazyValDictWithList(LazyValDictTestMixin, RememberingNegateMixin): - def setup_method(self, method): super().setup_method(method) self.dict = mappings.LazyValDict(list(range(12)), self.negate) @@ -148,14 +145,12 @@ class TestLazyValDictWithList(LazyValDictTestMixin, RememberingNegateMixin): class TestLazyValDictWithFunc(LazyValDictTestMixin, RememberingNegateMixin): - def setup_method(self, method): super().setup_method(method) self.dict = mappings.LazyValDict(a_dozen, self.negate) class TestLazyValDict: - def test_invalid_init_args(self): pytest.raises(TypeError, mappings.LazyValDict, [1], 42) pytest.raises(TypeError, mappings.LazyValDict, 42, a_dozen) @@ -164,36 +159,43 @@ class TestLazyValDict: # TODO check for valid values for dict.new, since that seems to be # part of the interface? class TestProtectedDict: - def setup_method(self, method): self.orig = {1: -1, 2: -2} self.dict = mappings.ProtectedDict(self.orig) def test_basic_operations(self): assert self.dict[1] == -1 + def get(i): return self.dict[i] + pytest.raises(KeyError, get, 3) assert sorted(self.dict.keys()) == [1, 2] assert -1 not in self.dict assert 2 in self.dict + def remove(i): del self.dict[i] + pytest.raises(KeyError, remove, 50) def test_basic_mutating(self): # add something self.dict[7] = -7 + def check_after_adding(): assert self.dict[7] == -7 assert 7 in self.dict assert sorted(self.dict.keys()) == [1, 2, 7] + check_after_adding() # remove it again del self.dict[7] assert 7 not in self.dict + def get(i): return self.dict[i] + pytest.raises(KeyError, get, 7) assert sorted(self.dict.keys()) == [1, 2] # add it back @@ -214,7 +216,6 @@ class TestProtectedDict: class TestImmutableDict: - def test_init_iterator(self): d = mappings.ImmutableDict((x, x) for x in range(3)) assert dict(d) == {0: 0, 1: 1, 2: 2} @@ -239,7 +240,7 @@ class TestImmutableDict: def test_init_dictmixin(self): d = MutableDict(baz="cat") e = mappings.ImmutableDict(d) - assert dict(d) == {'baz': 'cat'} + assert dict(d) == {"baz": "cat"} def test_init_bad_data(self): for data in (range(10), list(range(10)), [([], 1)]): @@ -288,7 +289,6 @@ class TestImmutableDict: class TestOrderedFrozenSet: - def test_magic_methods(self): s = mappings.OrderedFrozenSet(range(9)) for x in range(9): @@ -299,7 +299,7 @@ class TestOrderedFrozenSet: for i in range(9): assert s[i] == i assert list(s[1:]) == list(range(1, 9)) - with pytest.raises(IndexError, match='index out of range'): + with pytest.raises(IndexError, match="index out of range"): s[9] assert s == set(range(9)) @@ -308,12 +308,12 @@ class TestOrderedFrozenSet: assert hash(s) def test_ordering(self): - s = mappings.OrderedFrozenSet('set') - assert 'set' == ''.join(s) - assert 'tes' == ''.join(reversed(s)) - s = mappings.OrderedFrozenSet('setordered') - assert 'setord' == ''.join(s) - assert 'drotes' == ''.join(reversed(s)) + s = mappings.OrderedFrozenSet("set") + assert "set" == "".join(s) + assert "tes" == "".join(reversed(s)) + s = mappings.OrderedFrozenSet("setordered") + assert "setord" == "".join(s) + assert "drotes" == "".join(reversed(s)) def test_immmutability(self): s = mappings.OrderedFrozenSet(range(9)) @@ -355,41 +355,40 @@ class TestOrderedFrozenSet: class TestOrderedSet(TestOrderedFrozenSet): - def test_hash(self): with pytest.raises(TypeError): - assert hash(mappings.OrderedSet('set')) + assert hash(mappings.OrderedSet("set")) def test_add(self): s = mappings.OrderedSet() - s.add('a') - assert 'a' in s + s.add("a") + assert "a" in s s.add(1) assert 1 in s - assert list(s) == ['a', 1] + assert list(s) == ["a", 1] def test_discard(self): s = mappings.OrderedSet() - s.discard('a') - s.add('a') + s.discard("a") + s.add("a") assert s - s.discard('a') + s.discard("a") assert not s def test_remove(self): s = mappings.OrderedSet() with pytest.raises(KeyError): - s.remove('a') - s.add('a') - assert 'a' in s - s.remove('a') - assert 'a' not in s + s.remove("a") + s.add("a") + assert "a" in s + s.remove("a") + assert "a" not in s def test_clear(self): s = mappings.OrderedSet() s.clear() assert len(s) == 0 - s.add('a') + s.add("a") assert len(s) == 1 s.clear() assert len(s) == 0 @@ -425,8 +424,9 @@ class TestStackedDict: assert x in std def test_len(self): - assert sum(map(len, (self.orig_dict, self.new_dict))) == \ - len(mappings.StackedDict(self.orig_dict, self.new_dict)) + assert sum(map(len, (self.orig_dict, self.new_dict))) == len( + mappings.StackedDict(self.orig_dict, self.new_dict) + ) def test_setattr(self): pytest.raises(TypeError, mappings.StackedDict().__setitem__, (1, 2)) @@ -447,24 +447,28 @@ class TestStackedDict: assert len(s) == 0 def test_keys(self): - assert sorted(mappings.StackedDict(self.orig_dict, self.new_dict)) == \ - sorted(list(self.orig_dict.keys()) + list(self.new_dict.keys())) + assert sorted(mappings.StackedDict(self.orig_dict, self.new_dict)) == sorted( + list(self.orig_dict.keys()) + list(self.new_dict.keys()) + ) class TestIndeterminantDict: - def test_disabled_methods(self): d = mappings.IndeterminantDict(lambda *a: None) for x in ( - "clear", - ("update", {}), - ("setdefault", 1), - "__iter__", "__len__", "__hash__", - ("__delitem__", 1), - ("__setitem__", 2), - ("popitem", 2), - "keys", "items", "values", - ): + "clear", + ("update", {}), + ("setdefault", 1), + "__iter__", + "__len__", + "__hash__", + ("__delitem__", 1), + ("__setitem__", 2), + ("popitem", 2), + "keys", + "items", + "values", + ): if isinstance(x, tuple): pytest.raises(TypeError, getattr(d, x[0]), x[1]) else: @@ -472,7 +476,8 @@ class TestIndeterminantDict: def test_starter_dict(self): d = mappings.IndeterminantDict( - lambda key: False, starter_dict={}.fromkeys(range(100), True)) + lambda key: False, starter_dict={}.fromkeys(range(100), True) + ) for x in range(100): assert d[x] == True for x in range(100, 110): @@ -481,21 +486,24 @@ class TestIndeterminantDict: def test_behaviour(self): val = [] d = mappings.IndeterminantDict( - lambda key: val.append(key), {}.fromkeys(range(10), True)) + lambda key: val.append(key), {}.fromkeys(range(10), True) + ) assert d[0] == True assert d[11] == None assert val == [11] + def func(*a): raise KeyError + with pytest.raises(KeyError): mappings.IndeterminantDict(func).__getitem__(1) - def test_get(self): def func(key): if key == 2: raise KeyError return True + d = mappings.IndeterminantDict(func, {1: 1}) assert d.get(1, 1) == 1 assert d.get(1, 2) == 1 @@ -505,41 +513,42 @@ class TestIndeterminantDict: class TestFoldingDict: - def test_preserve(self): dct = mappings.PreservingFoldingDict( - str.lower, list({'Foo': 'bar', 'fnz': 'donkey'}.items())) - assert dct['fnz'] == 'donkey' - assert dct['foo'] == 'bar' - assert sorted(['bar' == 'donkey']), sorted(dct.values()) + str.lower, list({"Foo": "bar", "fnz": "donkey"}.items()) + ) + assert dct["fnz"] == "donkey" + assert dct["foo"] == "bar" + assert sorted(["bar" == "donkey"]), sorted(dct.values()) assert dct.copy() == dct - assert dct['foo'] == dct.get('Foo') - assert 'foo' in dct - keys = ['Foo', 'fnz'] + assert dct["foo"] == dct.get("Foo") + assert "foo" in dct + keys = ["Foo", "fnz"] keysList = list(dct) for key in keys: assert key in list(dct.keys()) assert key in keysList assert (key, dct[key]) in list(dct.items()) assert len(keys) == len(dct) - assert dct.pop('foo') == 'bar' - assert 'foo' not in dct - del dct['fnz'] - assert 'fnz' not in dct - dct['Foo'] = 'bar' + assert dct.pop("foo") == "bar" + assert "foo" not in dct + del dct["fnz"] + assert "fnz" not in dct + dct["Foo"] = "bar" dct.refold(lambda _: _) - assert 'foo' not in dct - assert 'Foo' in dct - assert list(dct.items()) == [('Foo', 'bar')] + assert "foo" not in dct + assert "Foo" in dct + assert list(dct.items()) == [("Foo", "bar")] dct.clear() assert {} == dict(dct) def test_no_preserve(self): dct = mappings.NonPreservingFoldingDict( - str.lower, list({'Foo': 'bar', 'fnz': 'monkey'}.items())) - assert sorted(['bar', 'monkey']) == sorted(dct.values()) + str.lower, list({"Foo": "bar", "fnz": "monkey"}.items()) + ) + assert sorted(["bar", "monkey"]) == sorted(dct.values()) assert dct.copy() == dct - keys = ['foo', 'fnz'] + keys = ["foo", "fnz"] keysList = [key for key in dct] for key in keys: assert key in list(dct.keys()) @@ -547,8 +556,8 @@ class TestFoldingDict: assert key in keysList assert (key, dct[key]) in list(dct.items()) assert len(keys) == len(dct) - assert dct.pop('foo') == 'bar' - del dct['fnz'] + assert dct.pop("foo") == "bar" + del dct["fnz"] assert list(dct.keys()) == [] dct.clear() assert {} == dict(dct) @@ -580,20 +589,20 @@ class Test_attr_to_item_mapping: if kls is None: kls = self.kls o = kls(f=2, g=3) - assert ['f', 'g'] == sorted(o) - self.assertBoth(o, 'g', 3) + assert ["f", "g"] == sorted(o) + self.assertBoth(o, "g", 3) o.g = 4 - self.assertBoth(o, 'g', 4) + self.assertBoth(o, "g", 4) del o.g with pytest.raises(KeyError): - operator.__getitem__(o, 'g') + operator.__getitem__(o, "g") with pytest.raises(AttributeError): - getattr(o, 'g') - del o['f'] + getattr(o, "g") + del o["f"] with pytest.raises(KeyError): - operator.__getitem__(o, 'f') + operator.__getitem__(o, "f") with pytest.raises(AttributeError): - getattr(o, 'f') + getattr(o, "f") def test_inject(self): class foon(dict): @@ -611,30 +620,31 @@ class Test_ProxiedAttrs: def __init__(self, **kwargs): for attr, val in kwargs.items(): setattr(self, attr, val) + obj = foo() d = self.kls(obj) with pytest.raises(KeyError): - operator.__getitem__(d, 'x') + operator.__getitem__(d, "x") with pytest.raises(KeyError): - operator.__delitem__(d, 'x') - assert 'x' not in d - d['x'] = 1 - assert d['x'] == 1 - assert 'x' in d - assert ['x'] == list(x for x in d if not x.startswith("__")) - del d['x'] - assert 'x' not in d + operator.__delitem__(d, "x") + assert "x" not in d + d["x"] = 1 + assert d["x"] == 1 + assert "x" in d + assert ["x"] == list(x for x in d if not x.startswith("__")) + del d["x"] + assert "x" not in d with pytest.raises(KeyError): - operator.__delitem__(d, 'x') + operator.__delitem__(d, "x") with pytest.raises(KeyError): - operator.__getitem__(d, 'x') + operator.__getitem__(d, "x") # Finally, verify that immutable attribute errors are handled correctly. d = self.kls(object()) with pytest.raises(KeyError): - operator.__setitem__(d, 'x', 1) + operator.__setitem__(d, "x", 1) with pytest.raises(KeyError): - operator.__delitem__(d, 'x') + operator.__delitem__(d, "x") class TestSlottedDict: @@ -642,9 +652,9 @@ class TestSlottedDict: kls = staticmethod(mappings.make_SlottedDict_kls) def test_exceptions(self): - d = self.kls(['spork'])() + d = self.kls(["spork"])() for op in (operator.getitem, operator.delitem): with pytest.raises(KeyError): - op(d, 'spork') + op(d, "spork") with pytest.raises(KeyError): - op(d, 'foon') + op(d, "foon") diff --git a/tests/test_modules.py b/tests/test_modules.py index f4174979..da20f4f8 100644 --- a/tests/test_modules.py +++ b/tests/test_modules.py @@ -5,19 +5,18 @@ from snakeoil import modules class TestModules: - @pytest.fixture(autouse=True) def _setup(self, tmp_path): # set up some test modules for our use - packdir = tmp_path / 'mod_testpack' + packdir = tmp_path / "mod_testpack" packdir.mkdir() # create an empty file - (packdir / '__init__.py').touch() + (packdir / "__init__.py").touch() for directory in (tmp_path, packdir): for i in range(3): - (directory / f'mod_test{i}.py').write_text('def foo(): pass\n') - (directory / 'mod_horked.py').write_text('1/0\n') + (directory / f"mod_test{i}.py").write_text("def foo(): pass\n") + (directory / "mod_horked.py").write_text("1/0\n") # append them to path sys.path.insert(0, str(tmp_path)) @@ -27,89 +26,93 @@ class TestModules: sys.path.pop(0) # make sure we don't keep the sys.modules entries around for i in range(3): - sys.modules.pop('mod_test%s' % i, None) - sys.modules.pop('mod_testpack.mod_test%s' % i, None) - sys.modules.pop('mod_testpack', None) - sys.modules.pop('mod_horked', None) - sys.modules.pop('mod_testpack.mod_horked', None) + sys.modules.pop("mod_test%s" % i, None) + sys.modules.pop("mod_testpack.mod_test%s" % i, None) + sys.modules.pop("mod_testpack", None) + sys.modules.pop("mod_horked", None) + sys.modules.pop("mod_testpack.mod_horked", None) def test_load_module(self): # import an already-imported module - assert modules.load_module('snakeoil.modules') is modules + assert modules.load_module("snakeoil.modules") is modules # and a system one, just for kicks - assert modules.load_module('sys') is sys + assert modules.load_module("sys") is sys # non-existing module from an existing package with pytest.raises(modules.FailedImport): - modules.load_module('snakeoil.__not_there') + modules.load_module("snakeoil.__not_there") # (hopefully :) non-existing top-level module/package with pytest.raises(modules.FailedImport): - modules.load_module('__not_there') + modules.load_module("__not_there") # "Unable to import" # pylint: disable=F0401 # unimported toplevel module - modtest1 = modules.load_module('mod_test1') + modtest1 = modules.load_module("mod_test1") import mod_test1 + assert mod_test1 is modtest1 # unimported in-package module - packtest2 = modules.load_module('mod_testpack.mod_test2') + packtest2 = modules.load_module("mod_testpack.mod_test2") from mod_testpack import mod_test2 + assert mod_test2 is packtest2 def test_load_attribute(self): # already imported - assert modules.load_attribute('sys.path') is sys.path + assert modules.load_attribute("sys.path") is sys.path # unimported - myfoo = modules.load_attribute('mod_testpack.mod_test2.foo') + myfoo = modules.load_attribute("mod_testpack.mod_test2.foo") # "Unable to import" # pylint: disable=F0401 from mod_testpack.mod_test2 import foo + assert foo is myfoo # nonexisting attribute with pytest.raises(modules.FailedImport): - modules.load_attribute('snakeoil.froznicator') + modules.load_attribute("snakeoil.froznicator") # nonexisting top-level with pytest.raises(modules.FailedImport): - modules.load_attribute('spork_does_not_exist.foo') + modules.load_attribute("spork_does_not_exist.foo") # not an attr with pytest.raises(modules.FailedImport): - modules.load_attribute('sys') + modules.load_attribute("sys") # not imported yet with pytest.raises(modules.FailedImport): - modules.load_attribute('mod_testpack.mod_test3') + modules.load_attribute("mod_testpack.mod_test3") def test_load_any(self): # import an already-imported module - assert modules.load_any('snakeoil.modules') is modules + assert modules.load_any("snakeoil.modules") is modules # attribute of an already imported module - assert modules.load_any('sys.path') is sys.path + assert modules.load_any("sys.path") is sys.path # already imported toplevel. - assert sys is modules.load_any('sys') + assert sys is modules.load_any("sys") # unimported - myfoo = modules.load_any('mod_testpack.mod_test2.foo') + myfoo = modules.load_any("mod_testpack.mod_test2.foo") # "Unable to import" # pylint: disable=F0401 from mod_testpack.mod_test2 import foo + assert foo is myfoo # nonexisting attribute with pytest.raises(modules.FailedImport): - modules.load_any('snakeoil.froznicator') + modules.load_any("snakeoil.froznicator") # nonexisting top-level with pytest.raises(modules.FailedImport): - modules.load_any('spork_does_not_exist.foo') + modules.load_any("spork_does_not_exist.foo") with pytest.raises(modules.FailedImport): - modules.load_any('spork_does_not_exist') + modules.load_any("spork_does_not_exist") # not imported yet with pytest.raises(modules.FailedImport): - modules.load_any('mod_testpack.mod_test3') + modules.load_any("mod_testpack.mod_test3") def test_broken_module(self): for func in [modules.load_module, modules.load_any]: with pytest.raises(modules.FailedImport): - func('mod_testpack.mod_horked') - assert 'mod_testpack.mod_horked' not in sys.modules + func("mod_testpack.mod_horked") + assert "mod_testpack.mod_horked" not in sys.modules diff --git a/tests/test_obj.py b/tests/test_obj.py index 83f9f776..78083d8b 100644 --- a/tests/test_obj.py +++ b/tests/test_obj.py @@ -7,7 +7,6 @@ make_DIkls = obj.DelayedInstantiation_kls class TestDelayedInstantiation: - def test_simple(self): t = tuple([1, 2, 3]) o = make_DI(tuple, lambda: t) @@ -19,19 +18,34 @@ class TestDelayedInstantiation: assert t >= o def test_descriptor_awareness(self): - def assertKls(cls, ignores=(), - default_ignores=("__new__", "__init__", "__init_subclass__", - "__getattribute__", "__class__", - "__getnewargs__", "__getstate__", - "__doc__", "__class_getitem__")): - required = set(x for x in dir(cls) - if x.startswith("__") and x.endswith("__")) + def assertKls( + cls, + ignores=(), + default_ignores=( + "__new__", + "__init__", + "__init_subclass__", + "__getattribute__", + "__class__", + "__getnewargs__", + "__getstate__", + "__doc__", + "__class_getitem__", + ), + ): + required = set( + x for x in dir(cls) if x.startswith("__") and x.endswith("__") + ) missing = required.difference(obj.kls_descriptors) missing.difference_update(obj.base_kls_descriptors) missing.difference_update(default_ignores) missing.difference_update(ignores) - assert not missing, ("object %r potentially has unsupported special " - "attributes: %s" % (cls, ', '.join(missing))) + assert ( + not missing + ), "object %r potentially has unsupported special " "attributes: %s" % ( + cls, + ", ".join(missing), + ) assertKls(object) assertKls(1) @@ -43,25 +57,38 @@ class TestDelayedInstantiation: def test_BaseDelayedObject(self): # assert that all methods/descriptors of object # are covered via the base. - o = set(dir(object)).difference(f"__{x}__" for x in ( - "class", "getattribute", "new", "init", "init_subclass", "getstate", "doc")) + o = set(dir(object)).difference( + f"__{x}__" + for x in ( + "class", + "getattribute", + "new", + "init", + "init_subclass", + "getstate", + "doc", + ) + ) diff = o.difference(obj.base_kls_descriptors) - assert not diff, ("base delayed instantiation class should cover all of object, but " - "%r was spotted" % (",".join(sorted(diff)),)) + assert not diff, ( + "base delayed instantiation class should cover all of object, but " + "%r was spotted" % (",".join(sorted(diff)),) + ) assert obj.DelayedInstantiation_kls(int, "1") + 2 == 3 - def test_klass_choice_optimization(self): """ensure that BaseDelayedObject is used whenever possible""" # note object is an odd one- it actually has a __doc__, thus # it must always be a custom o = make_DI(object, object) - assert object.__getattribute__(o, '__class__') is not obj.BaseDelayedObject + assert object.__getattribute__(o, "__class__") is not obj.BaseDelayedObject + class foon: pass + o = make_DI(foon, foon) - cls = object.__getattribute__(o, '__class__') + cls = object.__getattribute__(o, "__class__") assert cls is obj.BaseDelayedObject # now ensure we always get the same kls back for derivatives @@ -70,39 +97,43 @@ class TestDelayedInstantiation: return True o = make_DI(foon, foon) - cls = object.__getattribute__(o, '__class__') + cls = object.__getattribute__(o, "__class__") assert cls is not obj.BaseDelayedObject o = make_DI(foon, foon) - cls2 = object.__getattribute__(o, '__class__') + cls2 = object.__getattribute__(o, "__class__") assert cls is cls2 def test__class__(self): l = [] + def f(): l.append(False) return True + o = make_DI(bool, f) assert isinstance(o, bool) assert not l, "accessing __class__ shouldn't trigger instantiation" def test__doc__(self): l = [] + def f(): l.append(True) return foon() + class foon: __doc__ = "monkey" o = make_DI(foon, f) - assert o.__doc__ == 'monkey' + assert o.__doc__ == "monkey" assert not l, ( "in accessing __doc__, the instance was generated- " "this is a class level attribute, thus shouldn't " - "trigger instantiation") + "trigger instantiation" + ) class TestPopattr: - class Object: pass @@ -113,21 +144,21 @@ class TestPopattr: def test_no_attrs(self): # object without any attrs with pytest.raises(AttributeError): - obj.popattr(object(), 'nonexistent') + obj.popattr(object(), "nonexistent") def test_nonexistent_attr(self): # object with attr trying to get nonexistent attr with pytest.raises(AttributeError): - obj.popattr(self.o, 'nonexistent') + obj.popattr(self.o, "nonexistent") def test_fallback(self): # object with attr trying to get nonexistent attr using fallback - value = obj.popattr(self.o, 'nonexistent', 2) + value = obj.popattr(self.o, "nonexistent", 2) assert value == 2 def test_removed_attr(self): - value = obj.popattr(self.o, 'test') + value = obj.popattr(self.o, "test") assert value == 1 # verify that attr was removed from the object with pytest.raises(AttributeError): - obj.popattr(self.o, 'test') + obj.popattr(self.o, "test") diff --git a/tests/test_osutils.py b/tests/test_osutils.py index 18092823..264d670d 100644 --- a/tests/test_osutils.py +++ b/tests/test_osutils.py @@ -16,44 +16,45 @@ from snakeoil.osutils.mount import MNT_DETACH, MS_BIND, mount, umount class ReaddirCommon: - @pytest.fixture def subdir(self, tmp_path): - subdir = tmp_path / 'dir' + subdir = tmp_path / "dir" subdir.mkdir() - (tmp_path / 'file').touch() - os.mkfifo((tmp_path / 'fifo')) + (tmp_path / "file").touch() + os.mkfifo((tmp_path / "fifo")) return subdir def _test_missing(self, tmp_path, funcs): for func in funcs: - pytest.raises(OSError, func, tmp_path / 'spork') + pytest.raises(OSError, func, tmp_path / "spork") class TestNativeListDir(ReaddirCommon): - def test_listdir(self, tmp_path, subdir): - assert set(native_readdir.listdir(tmp_path)) == {'dir', 'fifo', 'file'} + assert set(native_readdir.listdir(tmp_path)) == {"dir", "fifo", "file"} assert native_readdir.listdir(subdir) == [] def test_listdir_dirs(self, tmp_path, subdir): - assert native_readdir.listdir_dirs(tmp_path) == ['dir'] + assert native_readdir.listdir_dirs(tmp_path) == ["dir"] assert native_readdir.listdir_dirs(subdir) == [] def test_listdir_files(self, tmp_path, subdir): - assert native_readdir.listdir_files(tmp_path) == ['file'] + assert native_readdir.listdir_files(tmp_path) == ["file"] assert native_readdir.listdir_dirs(subdir) == [] def test_missing(self, tmp_path, subdir): - return self._test_missing(tmp_path, ( - native_readdir.listdir, - native_readdir.listdir_dirs, - native_readdir.listdir_files, - )) + return self._test_missing( + tmp_path, + ( + native_readdir.listdir, + native_readdir.listdir_dirs, + native_readdir.listdir_files, + ), + ) def test_dangling_sym(self, tmp_path, subdir): (tmp_path / "monkeys").symlink_to("foon") - assert native_readdir.listdir_files(tmp_path) == ['file'] + assert native_readdir.listdir_files(tmp_path) == ["file"] class TestNativeReaddir(ReaddirCommon): @@ -78,36 +79,37 @@ class TestNativeReaddir(ReaddirCommon): class TestEnsureDirs: - def check_dir(self, path, uid, gid, mode): assert path.is_dir() st = os.stat(path) - assert stat.S_IMODE(st.st_mode) == mode, \ - '0%o != 0%o' % (stat.S_IMODE(st.st_mode), mode) + assert stat.S_IMODE(st.st_mode) == mode, "0%o != 0%o" % ( + stat.S_IMODE(st.st_mode), + mode, + ) assert st.st_uid == uid assert st.st_gid == gid def test_ensure_dirs(self, tmp_path): # default settings - path = tmp_path / 'foo' / 'bar' + path = tmp_path / "foo" / "bar" assert osutils.ensure_dirs(path) self.check_dir(path, os.geteuid(), os.getegid(), 0o777) def test_minimal_nonmodifying(self, tmp_path): - path = tmp_path / 'foo' / 'bar' + path = tmp_path / "foo" / "bar" assert osutils.ensure_dirs(path, mode=0o755) os.chmod(path, 0o777) assert osutils.ensure_dirs(path, mode=0o755, minimal=True) self.check_dir(path, os.geteuid(), os.getegid(), 0o777) def test_minimal_modifying(self, tmp_path): - path = tmp_path / 'foo' / 'bar' + path = tmp_path / "foo" / "bar" assert osutils.ensure_dirs(path, mode=0o750) assert osutils.ensure_dirs(path, mode=0o005, minimal=True) self.check_dir(path, os.geteuid(), os.getegid(), 0o755) def test_create_unwritable_subdir(self, tmp_path): - path = tmp_path / 'restricted' / 'restricted' + path = tmp_path / "restricted" / "restricted" # create the subdirs without 020 first assert osutils.ensure_dirs(path.parent) assert osutils.ensure_dirs(path, mode=0o020) @@ -118,38 +120,39 @@ class TestEnsureDirs: def test_path_is_a_file(self, tmp_path): # fail if passed a path to an existing file - path = tmp_path / 'file' + path = tmp_path / "file" touch(path) assert path.is_file() assert not osutils.ensure_dirs(path, mode=0o700) def test_non_dir_in_path(self, tmp_path): # fail if one of the parts of the path isn't a dir - path = tmp_path / 'file' / 'dir' - (tmp_path / 'file').touch() + path = tmp_path / "file" / "dir" + (tmp_path / "file").touch() assert not osutils.ensure_dirs(path, mode=0o700) def test_mkdir_failing(self, tmp_path): # fail if os.mkdir fails - with mock.patch('snakeoil.osutils.os.mkdir') as mkdir: - mkdir.side_effect = OSError(30, 'Read-only file system') - path = tmp_path / 'dir' + with mock.patch("snakeoil.osutils.os.mkdir") as mkdir: + mkdir.side_effect = OSError(30, "Read-only file system") + path = tmp_path / "dir" assert not osutils.ensure_dirs(path, mode=0o700) # force temp perms assert not osutils.ensure_dirs(path, mode=0o400) - mkdir.side_effect = OSError(17, 'File exists') + mkdir.side_effect = OSError(17, "File exists") assert not osutils.ensure_dirs(path, mode=0o700) def test_chmod_or_chown_failing(self, tmp_path): # fail if chmod or chown fails - path = tmp_path / 'dir' + path = tmp_path / "dir" path.mkdir() path.chmod(0o750) - with mock.patch('snakeoil.osutils.os.chmod') as chmod, \ - mock.patch('snakeoil.osutils.os.chown') as chown: - chmod.side_effect = OSError(5, 'Input/output error') + with mock.patch("snakeoil.osutils.os.chmod") as chmod, mock.patch( + "snakeoil.osutils.os.chown" + ) as chown: + chmod.side_effect = OSError(5, "Input/output error") # chmod failure when file exists and trying to reset perms to match # the specified mode @@ -163,13 +166,13 @@ class TestEnsureDirs: # chown failure when resetting perms on parents chmod.side_effect = None - chown.side_effect = OSError(5, 'Input/output error') + chown.side_effect = OSError(5, "Input/output error") assert not osutils.ensure_dirs(path, uid=1000, gid=1000, mode=0o400) def test_reset_sticky_parent_perms(self, tmp_path): # make sure perms are reset after traversing over sticky parents - sticky_parent = tmp_path / 'dir' - path = sticky_parent / 'dir' + sticky_parent = tmp_path / "dir" + path = sticky_parent / "dir" sticky_parent.mkdir() sticky_parent.chmod(0o2755) pre_sticky_parent = os.stat(sticky_parent) @@ -178,7 +181,7 @@ class TestEnsureDirs: assert pre_sticky_parent.st_mode == post_sticky_parent.st_mode def test_mode(self, tmp_path): - path = tmp_path / 'mode' / 'mode' + path = tmp_path / "mode" / "mode" assert osutils.ensure_dirs(path, mode=0o700) self.check_dir(path, os.geteuid(), os.getegid(), 0o700) # unrestrict it @@ -188,12 +191,12 @@ class TestEnsureDirs: def test_gid(self, tmp_path): # abuse the portage group as secondary group try: - portage_gid = grp.getgrnam('portage').gr_gid + portage_gid = grp.getgrnam("portage").gr_gid except KeyError: - pytest.skip('the portage group does not exist') + pytest.skip("the portage group does not exist") if portage_gid not in os.getgroups(): - pytest.skip('you are not in the portage group') - path = tmp_path / 'group' / 'group' + pytest.skip("you are not in the portage group") + path = tmp_path / "group" / "group" assert osutils.ensure_dirs(path, gid=portage_gid) self.check_dir(path, os.geteuid(), portage_gid, 0o777) assert osutils.ensure_dirs(path) @@ -203,12 +206,11 @@ class TestEnsureDirs: class TestAbsSymlink: - def test_abssymlink(self, tmp_path): - target = tmp_path / 'target' - linkname = tmp_path / 'link' + target = tmp_path / "target" + linkname = tmp_path / "link" target.mkdir() - linkname.symlink_to('target') + linkname.symlink_to("target") assert osutils.abssymlink(linkname) == str(target) @@ -223,28 +225,30 @@ class Test_Native_NormPath: got = f(src) assert got == val, f"{src!r}: expected {val!r}, got {got!r}" - check('/foo/', '/foo') - check('//foo/', '/foo') - check('//foo/.', '/foo') - check('//..', '/') - check('//..//foo', '/foo') - check('/foo/..', '/') - check('..//foo', '../foo') - check('../foo/../', '..') - check('../', '..') - check('../foo/..', '..') - check('../foo/../dar', '../dar') - check('.//foo', 'foo') - check('/foo/../../', '/') - check('/foo/../../..', '/') - check('/tmp/foo/../dar/', '/tmp/dar') - check('/tmp/foo/../dar', '/tmp/dar') + check("/foo/", "/foo") + check("//foo/", "/foo") + check("//foo/.", "/foo") + check("//..", "/") + check("//..//foo", "/foo") + check("/foo/..", "/") + check("..//foo", "../foo") + check("../foo/../", "..") + check("../", "..") + check("../foo/..", "..") + check("../foo/../dar", "../dar") + check(".//foo", "foo") + check("/foo/../../", "/") + check("/foo/../../..", "/") + check("/tmp/foo/../dar/", "/tmp/dar") + check("/tmp/foo/../dar", "/tmp/dar") # explicit unicode and bytes - check('/tmṕ/föo//../dár', '/tmṕ/dár') - check(b'/tm\xe1\xb9\x95/f\xc3\xb6o//../d\xc3\xa1r', b'/tm\xe1\xb9\x95/d\xc3\xa1r') - check('/föó/..', '/') - check(b'/f\xc3\xb6\xc3\xb3/..', b'/') + check("/tmṕ/föo//../dár", "/tmṕ/dár") + check( + b"/tm\xe1\xb9\x95/f\xc3\xb6o//../d\xc3\xa1r", b"/tm\xe1\xb9\x95/d\xc3\xa1r" + ) + check("/föó/..", "/") + check(b"/f\xc3\xb6\xc3\xb3/..", b"/") @pytest.mark.skipif(os.getuid() != 0, reason="these tests must be ran as root") @@ -253,7 +257,7 @@ class TestAccess: func = staticmethod(osutils.fallback_access) def test_fallback(self, tmp_path): - fp = tmp_path / 'file' + fp = tmp_path / "file" # create the file fp.touch() fp.chmod(0o000) @@ -270,9 +274,9 @@ class Test_unlink_if_exists: def test_it(self, tmp_path): f = self.func - path = tmp_path / 'target' + path = tmp_path / "target" f(path) - path.write_text('') + path.write_text("") f(path) assert not path.exists() # and once more for good measure... @@ -280,18 +284,17 @@ class Test_unlink_if_exists: class TestSupportedSystems: - def test_supported_system(self): - @supported_systems('supported') + @supported_systems("supported") def func(): return True - with mock.patch('snakeoil.osutils.sys') as _sys: - _sys.configure_mock(platform='supported') + with mock.patch("snakeoil.osutils.sys") as _sys: + _sys.configure_mock(platform="supported") assert func() def test_unsupported_system(self): - @supported_systems('unsupported') + @supported_systems("unsupported") def func(): return True @@ -299,39 +302,39 @@ class TestSupportedSystems: func() # make sure we're iterating through the system params correctly - with mock.patch('snakeoil.osutils.sys') as _sys: - _sys.configure_mock(platform='u') + with mock.patch("snakeoil.osutils.sys") as _sys: + _sys.configure_mock(platform="u") with pytest.raises(NotImplementedError): func() def test_multiple_systems(self): - @supported_systems('darwin', 'linux') + @supported_systems("darwin", "linux") def func(): return True - with mock.patch('snakeoil.osutils.sys') as _sys: - _sys.configure_mock(platform='nonexistent') + with mock.patch("snakeoil.osutils.sys") as _sys: + _sys.configure_mock(platform="nonexistent") with pytest.raises(NotImplementedError): func() - for platform in ('linux2', 'darwin'): + for platform in ("linux2", "darwin"): _sys.configure_mock(platform=platform) assert func() -@pytest.mark.skipif(not sys.platform.startswith('linux'), - reason='supported on Linux only') +@pytest.mark.skipif( + not sys.platform.startswith("linux"), reason="supported on Linux only" +) class TestMount: - @pytest.fixture def source(self, tmp_path): - source = tmp_path / 'source' + source = tmp_path / "source" source.mkdir() return source @pytest.fixture def target(self, tmp_path): - target = tmp_path / 'target' + target = tmp_path / "target" target.mkdir() return target @@ -340,21 +343,25 @@ class TestMount: # byte strings; if they are unicode strings the arguments get mangled # leading to errors when the syscall is run. This confirms mount() from # snakeoil.osutils always converts the arguments into byte strings. - for source, target, fstype in ((b'source', b'target', b'fstype'), - ('source', 'target', 'fstype')): - with mock.patch('snakeoil.osutils.mount.ctypes') as mock_ctypes: + for source, target, fstype in ( + (b"source", b"target", b"fstype"), + ("source", "target", "fstype"), + ): + with mock.patch("snakeoil.osutils.mount.ctypes") as mock_ctypes: with pytest.raises(OSError): mount(str(source), str(target), fstype, MS_BIND) - mount_call = next(x for x in mock_ctypes.mock_calls if x[0] == 'CDLL().mount') + mount_call = next( + x for x in mock_ctypes.mock_calls if x[0] == "CDLL().mount" + ) for arg in mount_call[1][0:3]: assert isinstance(arg, bytes) def test_missing_dirs(self): with pytest.raises(OSError) as cm: - mount('source', 'target', None, MS_BIND) + mount("source", "target", None, MS_BIND) assert cm.value.errno in (errno.EPERM, errno.ENOENT) - @pytest.mark.skipif(os.getuid() == 0, reason='this test must be run as non-root') + @pytest.mark.skipif(os.getuid() == 0, reason="this test must be run as non-root") def test_no_perms(self, source, target): with pytest.raises(OSError) as cm: mount(str(source), str(target), None, MS_BIND) @@ -363,11 +370,15 @@ class TestMount: umount(str(target)) assert cm.value.errno in (errno.EPERM, errno.EINVAL) - @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/mnt') and os.path.exists('/proc/self/ns/user')), - reason='user and mount namespace support required') + @pytest.mark.skipif( + not ( + os.path.exists("/proc/self/ns/mnt") and os.path.exists("/proc/self/ns/user") + ), + reason="user and mount namespace support required", + ) def test_bind_mount(self, source, target): - src_file = source / 'file' - bind_file = target / 'file' + src_file = source / "file" + bind_file = target / "file" src_file.touch() try: @@ -378,15 +389,19 @@ class TestMount: umount(str(target)) assert not bind_file.exists() except PermissionError: - pytest.skip('No permission to use user and mount namespace') - - @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/mnt') and os.path.exists('/proc/self/ns/user')), - reason='user and mount namespace support required') + pytest.skip("No permission to use user and mount namespace") + + @pytest.mark.skipif( + not ( + os.path.exists("/proc/self/ns/mnt") and os.path.exists("/proc/self/ns/user") + ), + reason="user and mount namespace support required", + ) def test_lazy_unmount(self, source, target): - src_file = source / 'file' - bind_file = target / 'file' + src_file = source / "file" + bind_file = target / "file" src_file.touch() - src_file.write_text('foo') + src_file.write_text("foo") try: with Namespace(user=True, mount=True): @@ -403,14 +418,14 @@ class TestMount: # confirm the file doesn't exist in the bind mount anymore assert not bind_file.exists() # but the file is still accessible to the process - assert f.read() == 'foo' + assert f.read() == "foo" # trying to reopen causes IOError with pytest.raises(IOError) as cm: f = bind_file.open() assert cm.value.errno == errno.ENOENT except PermissionError: - pytest.skip('No permission to use user and mount namespace') + pytest.skip("No permission to use user and mount namespace") class TestSizeofFmt: diff --git a/tests/test_process.py b/tests/test_process.py index bb45712e..488b7b03 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -30,8 +30,10 @@ class TestFindBinary: process.find_binary(self.script) def test_fallback(self): - fallback = process.find_binary(self.script, fallback=os.path.join('bin', self.script)) - assert fallback == os.path.join('bin', self.script) + fallback = process.find_binary( + self.script, fallback=os.path.join("bin", self.script) + ) + assert fallback == os.path.join("bin", self.script) def test_not_executable(self, tmp_path): fp = tmp_path / self.script diff --git a/tests/test_process_spawn.py b/tests/test_process_spawn.py index 8981c6e7..556b34cc 100644 --- a/tests/test_process_spawn.py +++ b/tests/test_process_spawn.py @@ -6,11 +6,11 @@ from snakeoil import process from snakeoil.contexts import chdir from snakeoil.process import spawn -BASH_BINARY = process.find_binary("bash", fallback='') +BASH_BINARY = process.find_binary("bash", fallback="") -@pytest.mark.skipif(not BASH_BINARY, reason='missing bash binary') -class TestSpawn: +@pytest.mark.skipif(not BASH_BINARY, reason="missing bash binary") +class TestSpawn: @pytest.fixture(autouse=True) def _setup(self, tmp_path): orig_path = os.environ["PATH"] @@ -37,21 +37,25 @@ class TestSpawn: def test_get_output(self, tmp_path, dev_null): filename = "spawn-getoutput.sh" for r, s, text, args in ( - [0, ["dar\n"], "echo dar\n", {}], - [0, ["dar"], "echo -n dar", {}], - [1, ["blah\n", "dar\n"], "echo blah\necho dar\nexit 1", {}], - [0, [], "echo dar 1>&2", {"fd_pipes": {1: 1, 2: dev_null}}]): + [0, ["dar\n"], "echo dar\n", {}], + [0, ["dar"], "echo -n dar", {}], + [1, ["blah\n", "dar\n"], "echo blah\necho dar\nexit 1", {}], + [0, [], "echo dar 1>&2", {"fd_pipes": {1: 1, 2: dev_null}}], + ): fp = self.generate_script(tmp_path, filename, text) - assert (r, s) == spawn.spawn_get_output(str(fp), spawn_type=spawn.spawn_bash, **args) + assert (r, s) == spawn.spawn_get_output( + str(fp), spawn_type=spawn.spawn_bash, **args + ) os.unlink(fp) @pytest.mark.skipif(not spawn.is_sandbox_capable(), reason="missing sandbox binary") def test_sandbox(self, tmp_path): - fp = self.generate_script( - tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD") + fp = self.generate_script(tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD") ret = spawn.spawn_get_output(str(fp), spawn_type=spawn.spawn_sandbox) assert ret[1], "no output; exit code was %s; script location %s" % (ret[0], fp) - assert "libsandbox.so" in [os.path.basename(x.strip()) for x in ret[1][0].split()] + assert "libsandbox.so" in [ + os.path.basename(x.strip()) for x in ret[1][0].split() + ] os.unlink(fp) @pytest.mark.skipif(not spawn.is_sandbox_capable(), reason="missing sandbox binary") @@ -60,15 +64,17 @@ class TestSpawn: this verifies our fix works. """ - fp = self.generate_script( - tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD") + fp = self.generate_script(tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD") dpath = tmp_path / "dar" dpath.mkdir() with chdir(dpath): dpath.rmdir() - assert "libsandbox.so" in \ - [os.path.basename(x.strip()) for x in spawn.spawn_get_output( - str(fp), spawn_type=spawn.spawn_sandbox, cwd='/')[1][0].split()] + assert "libsandbox.so" in [ + os.path.basename(x.strip()) + for x in spawn.spawn_get_output( + str(fp), spawn_type=spawn.spawn_sandbox, cwd="/" + )[1][0].split() + ] fp.unlink() def test_process_exit_code(self): @@ -98,13 +104,12 @@ class TestSpawn: def test_spawn_bash(self, capfd): # bash builtin for true without exec'ing true (eg, no path lookup) - assert 0 == spawn.spawn_bash('echo bash') + assert 0 == spawn.spawn_bash("echo bash") out, _err = capfd.readouterr() - assert out.strip() == 'bash' + assert out.strip() == "bash" def test_umask(self, tmp_path): - fp = self.generate_script( - tmp_path, "spawn-umask.sh", f"#!{BASH_BINARY}\numask") + fp = self.generate_script(tmp_path, "spawn-umask.sh", f"#!{BASH_BINARY}\numask") try: old_umask = os.umask(0) if old_umask == 0: @@ -113,7 +118,8 @@ class TestSpawn: os.umask(desired) else: desired = 0 - assert str(desired).lstrip("0") == \ - spawn.spawn_get_output(str(fp))[1][0].strip().lstrip("0") + assert str(desired).lstrip("0") == spawn.spawn_get_output(str(fp))[1][ + 0 + ].strip().lstrip("0") finally: os.umask(old_umask) diff --git a/tests/test_sequences.py b/tests/test_sequences.py index edbaa5a0..0d8c5a62 100644 --- a/tests/test_sequences.py +++ b/tests/test_sequences.py @@ -8,13 +8,11 @@ from snakeoil.sequences import split_elements, split_negations class UnhashableComplex(complex): - def __hash__(self): raise TypeError class TestStableUnique: - def common_check(self, func): # silly assert func(()) == [] @@ -23,9 +21,10 @@ class TestStableUnique: # neither def test_stable_unique(self, func=sequences.stable_unique): - assert list(set([1, 2, 3])) == [1, 2, 3], \ - "this test is reliant on the interpreter hasing 1,2,3 into a specific ordering- " \ + assert list(set([1, 2, 3])) == [1, 2, 3], ( + "this test is reliant on the interpreter hasing 1,2,3 into a specific ordering- " "for whatever reason, ordering differs, thus this test can't verify it" + ) assert func([3, 2, 1]) == [3, 2, 1] def test_iter_stable_unique(self): @@ -43,20 +42,19 @@ class TestStableUnique: uc = UnhashableComplex res = sequences.unstable_unique([uc(1, 0), uc(0, 1), uc(1, 0)]) # sortable - assert sorted(sequences.unstable_unique( - [[1, 2], [1, 3], [1, 2], [1, 3]])) == [[1, 2], [1, 3]] + assert sorted(sequences.unstable_unique([[1, 2], [1, 3], [1, 2], [1, 3]])) == [ + [1, 2], + [1, 3], + ] assert res == [uc(1, 0), uc(0, 1)] or res == [uc(0, 1), uc(1, 0)] assert sorted(sequences.unstable_unique(self._generator())) == sorted(range(6)) class TestChainedLists: - @staticmethod def gen_cl(): return sequences.ChainedLists( - list(range(3)), - list(range(3, 6)), - list(range(6, 100)) + list(range(3)), list(range(3, 6)), list(range(6, 100)) ) def test_contains(self): @@ -72,7 +70,7 @@ class TestChainedLists: def test_str(self): l = sequences.ChainedLists(list(range(3)), list(range(3, 5))) - assert str(l) == '[ [0, 1, 2], [3, 4] ]' + assert str(l) == "[ [0, 1, 2], [3, 4] ]" def test_getitem(self): cl = self.gen_cl() @@ -108,15 +106,18 @@ class Test_iflatten_instance: def test_it(self): o = OrderedDict((k, None) for k in range(10)) for l, correct, skip in ( - (["asdf", ["asdf", "asdf"], 1, None], - ["asdf", "asdf", "asdf", 1, None], str), - ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)), - ([o, 1, "fds"], list(range(10)) + [1, "fds"], str), - ("fds", ["fds"], str), - ("fds", ["f", "d", "s"], int), - ('', [''], str), - (1, [1], int), - ): + ( + ["asdf", ["asdf", "asdf"], 1, None], + ["asdf", "asdf", "asdf", 1, None], + str, + ), + ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)), + ([o, 1, "fds"], list(range(10)) + [1, "fds"], str), + ("fds", ["fds"], str), + ("fds", ["f", "d", "s"], int), + ("", [""], str), + (1, [1], int), + ): iterator = self.func(l, skip) assert list(iterator) == correct assert list(iterator) == [] @@ -126,6 +127,7 @@ class Test_iflatten_instance: # have to iterate. def fail(): return list(self.func(None)) + with pytest.raises(TypeError): fail() @@ -148,13 +150,16 @@ class Test_iflatten_func: def test_it(self): o = OrderedDict((k, None) for k in range(10)) for l, correct, skip in ( - (["asdf", ["asdf", "asdf"], 1, None], - ["asdf", "asdf", "asdf", 1, None], str), - ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)), - ([o, 1, "fds"], list(range(10)) + [1, "fds"], str), - ("fds", ["fds"], str), - (1, [1], int), - ): + ( + ["asdf", ["asdf", "asdf"], 1, None], + ["asdf", "asdf", "asdf", 1, None], + str, + ), + ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)), + ([o, 1, "fds"], list(range(10)) + [1, "fds"], str), + ("fds", ["fds"], str), + (1, [1], int), + ): iterator = self.func(l, lambda x: isinstance(x, skip)) assert list(iterator) == correct assert list(iterator) == [] @@ -164,6 +169,7 @@ class Test_iflatten_func: # have to iterate. def fail(): return list(self.func(None, lambda x: False)) + with pytest.raises(TypeError): fail() @@ -189,25 +195,24 @@ class Test_predicate_split: assert true_l == list(range(0, 100, 2)) def test_key(self): - false_l, true_l = self.kls(lambda x: x % 2 == 0, - ([0, x] for x in range(100)), - key=itemgetter(1)) + false_l, true_l = self.kls( + lambda x: x % 2 == 0, ([0, x] for x in range(100)), key=itemgetter(1) + ) assert false_l == [[0, x] for x in range(1, 100, 2)] assert true_l == [[0, x] for x in range(0, 100, 2)] class TestSplitNegations: - def test_empty(self): # empty input - seq = '' + seq = "" assert split_negations(seq) == ((), ()) def test_bad_value(self): # no-value negation should raise a ValueError bad_values = ( - '-', - 'a b c - d f e', + "-", + "a b c - d f e", ) for s in bad_values: @@ -216,7 +221,7 @@ class TestSplitNegations: def test_negs(self): # all negs - seq = ('-' + str(x) for x in range(100)) + seq = ("-" + str(x) for x in range(100)) assert split_negations(seq) == (tuple(map(str, range(100))), ()) def test_pos(self): @@ -226,31 +231,33 @@ class TestSplitNegations: def test_neg_pos(self): # both - seq = (('-' + str(x), str(x)) for x in range(100)) + seq = (("-" + str(x), str(x)) for x in range(100)) seq = chain.from_iterable(seq) - assert split_negations(seq) == (tuple(map(str, range(100))), tuple(map(str, range(100)))) + assert split_negations(seq) == ( + tuple(map(str, range(100))), + tuple(map(str, range(100))), + ) def test_converter(self): # converter method - seq = (('-' + str(x), str(x)) for x in range(100)) + seq = (("-" + str(x), str(x)) for x in range(100)) seq = chain.from_iterable(seq) assert split_negations(seq, int) == (tuple(range(100)), tuple(range(100))) class TestSplitElements: - def test_empty(self): # empty input - seq = '' + seq = "" assert split_elements(seq) == ((), (), ()) def test_bad_value(self): # no-value neg/pos should raise ValueErrors bad_values = ( - '-', - '+', - 'a b c - d f e', - 'a b c + d f e', + "-", + "+", + "a b c - d f e", + "a b c + d f e", ) for s in bad_values: @@ -259,7 +266,7 @@ class TestSplitElements: def test_negs(self): # all negs - seq = ('-' + str(x) for x in range(100)) + seq = ("-" + str(x) for x in range(100)) assert split_elements(seq) == (tuple(map(str, range(100))), (), ()) def test_neutral(self): @@ -269,12 +276,12 @@ class TestSplitElements: def test_pos(self): # all pos - seq = ('+' + str(x) for x in range(100)) + seq = ("+" + str(x) for x in range(100)) assert split_elements(seq) == ((), (), tuple(map(str, range(100)))) def test_neg_pos(self): # both negative and positive values - seq = (('-' + str(x), '+' + str(x)) for x in range(100)) + seq = (("-" + str(x), "+" + str(x)) for x in range(100)) seq = chain.from_iterable(seq) assert split_elements(seq) == ( tuple(map(str, range(100))), @@ -284,7 +291,7 @@ class TestSplitElements: def test_neg_neu_pos(self): # all three value types - seq = (('-' + str(x), str(x), '+' + str(x)) for x in range(100)) + seq = (("-" + str(x), str(x), "+" + str(x)) for x in range(100)) seq = chain.from_iterable(seq) assert split_elements(seq) == ( tuple(map(str, range(100))), @@ -294,7 +301,10 @@ class TestSplitElements: def test_converter(self): # converter method - seq = (('-' + str(x), str(x), '+' + str(x)) for x in range(100)) + seq = (("-" + str(x), str(x), "+" + str(x)) for x in range(100)) seq = chain.from_iterable(seq) assert split_elements(seq, int) == ( - tuple(range(100)), tuple(range(100)), tuple(range(100))) + tuple(range(100)), + tuple(range(100)), + tuple(range(100)), + ) diff --git a/tests/test_stringio.py b/tests/test_stringio.py index 4fb7c78b..1e6d1e5d 100644 --- a/tests/test_stringio.py +++ b/tests/test_stringio.py @@ -34,6 +34,7 @@ class readonly_mixin: class Test_text_readonly(readonly_mixin): kls = stringio.text_readonly -class Test_bytes_readonly(readonly_mixin ): + +class Test_bytes_readonly(readonly_mixin): kls = stringio.bytes_readonly - encoding = 'utf8' + encoding = "utf8" diff --git a/tests/test_strings.py b/tests/test_strings.py index b55c2306..19305708 100644 --- a/tests/test_strings.py +++ b/tests/test_strings.py @@ -3,38 +3,36 @@ from snakeoil.strings import doc_dedent, pluralism class TestPluralism: - def test_none(self): # default - assert pluralism([]) == 's' + assert pluralism([]) == "s" # different suffix for nonexistence - assert pluralism([], none='') == '' + assert pluralism([], none="") == "" def test_singular(self): # default - assert pluralism([1]) == '' + assert pluralism([1]) == "" # different suffix for singular existence - assert pluralism([1], singular='o') == 'o' + assert pluralism([1], singular="o") == "o" def test_plural(self): # default - assert pluralism([1, 2]) == 's' + assert pluralism([1, 2]) == "s" # different suffix for plural existence - assert pluralism([1, 2], plural='ies') == 'ies' + assert pluralism([1, 2], plural="ies") == "ies" def test_int(self): - assert pluralism(0) == 's' - assert pluralism(1) == '' - assert pluralism(2) == 's' + assert pluralism(0) == "s" + assert pluralism(1) == "" + assert pluralism(2) == "s" class TestDocDedent: - def test_empty(self): - s = '' + s = "" assert s == doc_dedent(s) def test_non_string(self): @@ -42,20 +40,20 @@ class TestDocDedent: doc_dedent(None) def test_line(self): - s = 'line' + s = "line" assert s == doc_dedent(s) def test_indented_line(self): - for indent in ('\t', ' '): - s = f'{indent}line' - assert 'line' == doc_dedent(s) + for indent in ("\t", " "): + s = f"{indent}line" + assert "line" == doc_dedent(s) def test_docstring(self): s = """Docstring to test. foo bar """ - assert 'Docstring to test.\n\nfoo bar\n' == doc_dedent(s) + assert "Docstring to test.\n\nfoo bar\n" == doc_dedent(s) def test_all_indented(self): s = """\ @@ -63,4 +61,4 @@ class TestDocDedent: foo bar """ - assert 'Docstring to test.\n\nfoo bar\n' == doc_dedent(s) + assert "Docstring to test.\n\nfoo bar\n" == doc_dedent(s) diff --git a/tests/test_version.py b/tests/test_version.py index 7dad73e4..09927542 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -7,7 +7,6 @@ from snakeoil import __version__, version class TestVersion: - def setup_method(self, method): # reset the cached version in the module reload(version) @@ -17,124 +16,142 @@ class TestVersion: def test_get_version_unknown(self): with pytest.raises(ValueError): - version.get_version('snakeoilfoo', __file__) + version.get_version("snakeoilfoo", __file__) def test_get_version_api(self): - v = version.get_version('snakeoil', __file__, '9.9.9') - assert v.startswith('snakeoil 9.9.9') + v = version.get_version("snakeoil", __file__, "9.9.9") + assert v.startswith("snakeoil 9.9.9") def test_get_version_git_dev(self): - with mock.patch('snakeoil.version.import_module') as import_module, \ - mock.patch('snakeoil.version.get_git_version') as get_git_version: + with mock.patch("snakeoil.version.import_module") as import_module, mock.patch( + "snakeoil.version.get_git_version" + ) as get_git_version: import_module.side_effect = ImportError verinfo = { - 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64', - 'date': 'Mon Sep 25 13:50:24 2017 -0400', - 'tag': None + "rev": "1ff76b021d208f7df38ac524537b6419404f1c64", + "date": "Mon Sep 25 13:50:24 2017 -0400", + "tag": None, } get_git_version.return_value = verinfo - result = version.get_version('snakeoil', __file__, __version__) - assert result == f"snakeoil {__version__}-g{verinfo['rev'][:7]} -- {verinfo['date']}" + result = version.get_version("snakeoil", __file__, __version__) + assert ( + result + == f"snakeoil {__version__}-g{verinfo['rev'][:7]} -- {verinfo['date']}" + ) def test_get_version_git_release(self): verinfo = { - 'rev': 'ab38751890efa8be96b7f95938d6b868b769bab6', - 'date': 'Thu Sep 21 15:57:38 2017 -0400', - 'tag': '2.3.4', + "rev": "ab38751890efa8be96b7f95938d6b868b769bab6", + "date": "Thu Sep 21 15:57:38 2017 -0400", + "tag": "2.3.4", } # fake snakeoil._verinfo module object class Verinfo: version_info = verinfo - with mock.patch('snakeoil.version.import_module') as import_module: + with mock.patch("snakeoil.version.import_module") as import_module: import_module.return_value = Verinfo() - result = version.get_version('snakeoil', __file__, verinfo['tag']) + result = version.get_version("snakeoil", __file__, verinfo["tag"]) assert result == f"snakeoil {verinfo['tag']} -- released {verinfo['date']}" def test_get_version_no_git_version(self): - with mock.patch('snakeoil.version.import_module') as import_module, \ - mock.patch('snakeoil.version.get_git_version') as get_git_version: + with mock.patch("snakeoil.version.import_module") as import_module, mock.patch( + "snakeoil.version.get_git_version" + ) as get_git_version: import_module.side_effect = ImportError get_git_version.return_value = None - result = version.get_version('snakeoil', 'nonexistent', __version__) - assert result == f'snakeoil {__version__}' + result = version.get_version("snakeoil", "nonexistent", __version__) + assert result == f"snakeoil {__version__}" def test_get_version_caching(self): # retrieved version info is cached in a module attr - v = version.get_version('snakeoil', __file__) - assert v.startswith(f'snakeoil {__version__}') + v = version.get_version("snakeoil", __file__) + assert v.startswith(f"snakeoil {__version__}") # re-running get_version returns the cached attr instead of reprocessing - with mock.patch('snakeoil.version.import_module') as import_module: - v = version.get_version('snakeoil', __file__) + with mock.patch("snakeoil.version.import_module") as import_module: + v = version.get_version("snakeoil", __file__) assert not import_module.called class TestGitVersion: - def test_get_git_version_not_available(self): - with mock.patch('snakeoil.version._run_git') as run_git: - run_git.side_effect = EnvironmentError(errno.ENOENT, 'git not found') - assert version.get_git_version('nonexistent') is None + with mock.patch("snakeoil.version._run_git") as run_git: + run_git.side_effect = EnvironmentError(errno.ENOENT, "git not found") + assert version.get_git_version("nonexistent") is None def test_get_git_version_error(self): - with mock.patch('snakeoil.version._run_git') as run_git: - run_git.return_value = (b'foo', 1) - assert version.get_git_version('nonexistent') is None + with mock.patch("snakeoil.version._run_git") as run_git: + run_git.return_value = (b"foo", 1) + assert version.get_git_version("nonexistent") is None def test_get_git_version_non_repo(self, tmpdir): assert version.get_git_version(str(tmpdir)) is None def test_get_git_version_exc(self): with pytest.raises(OSError): - with mock.patch('snakeoil.version._run_git') as run_git: - run_git.side_effect = OSError(errno.EIO, 'Input/output error') - version.get_git_version('nonexistent') + with mock.patch("snakeoil.version._run_git") as run_git: + run_git.side_effect = OSError(errno.EIO, "Input/output error") + version.get_git_version("nonexistent") def test_get_git_version_good_dev(self): - with mock.patch('snakeoil.version._run_git') as run_git: + with mock.patch("snakeoil.version._run_git") as run_git: # dev version run_git.return_value = ( - b'1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400', 0) - result = version.get_git_version('nonexistent') + b"1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400", + 0, + ) + result = version.get_git_version("nonexistent") expected = { - 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64', - 'date': 'Mon Sep 25 13:50:24 2017 -0400', - 'tag': None, - 'commits': 2, + "rev": "1ff76b021d208f7df38ac524537b6419404f1c64", + "date": "Mon Sep 25 13:50:24 2017 -0400", + "tag": None, + "commits": 2, } assert result == expected def test_get_git_version_good_tag(self): - with mock.patch('snakeoil.version._run_git') as run_git, \ - mock.patch('snakeoil.version._get_git_tag') as get_git_tag: + with mock.patch("snakeoil.version._run_git") as run_git, mock.patch( + "snakeoil.version._get_git_tag" + ) as get_git_tag: # tagged, release version run_git.return_value = ( - b'1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400', 0) - get_git_tag.return_value = '1.1.1' - result = version.get_git_version('nonexistent') + b"1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400", + 0, + ) + get_git_tag.return_value = "1.1.1" + result = version.get_git_version("nonexistent") expected = { - 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64', - 'date': 'Mon Sep 25 13:50:24 2017 -0400', - 'tag': '1.1.1', - 'commits': 2, + "rev": "1ff76b021d208f7df38ac524537b6419404f1c64", + "date": "Mon Sep 25 13:50:24 2017 -0400", + "tag": "1.1.1", + "commits": 2, } assert result == expected def test_get_git_tag_bad_output(self): - with mock.patch('snakeoil.version._run_git') as run_git: + with mock.patch("snakeoil.version._run_git") as run_git: # unknown git tag rev output - run_git.return_value = (b'a', 1) - assert version._get_git_tag('foo', 'bar') is None - run_git.return_value = (b'a foo/v0.7.2', 0) - assert version._get_git_tag('foo', 'bar') is None + run_git.return_value = (b"a", 1) + assert version._get_git_tag("foo", "bar") is None + run_git.return_value = (b"a foo/v0.7.2", 0) + assert version._get_git_tag("foo", "bar") is None # expected output formats - run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1^0', 0) - assert version._get_git_tag('foo', 'bar') == '1.1.1' - run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1', 0) - assert version._get_git_tag('foo', 'bar') == '1.1.1' - run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/1.1.1', 0) - assert version._get_git_tag('foo', 'bar') == '1.1.1' + run_git.return_value = ( + b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1^0", + 0, + ) + assert version._get_git_tag("foo", "bar") == "1.1.1" + run_git.return_value = ( + b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1", + 0, + ) + assert version._get_git_tag("foo", "bar") == "1.1.1" + run_git.return_value = ( + b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/1.1.1", + 0, + ) + assert version._get_git_tag("foo", "bar") == "1.1.1" |