aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorBrian Harring <ferringb@gmail.com>2022-12-24 13:14:53 -0800
committerArthur Zamarin <arthurzam@gentoo.org>2022-12-25 19:49:11 +0200
commitd6a7c2e44b4f497357f8569d423104232a58f384 (patch)
tree625ac52169356714a9f5e69e11f2b6cc2d72355a /tests
parentcompression: prefer gtar over tar if available (diff)
downloadsnakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.tar.gz
snakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.tar.bz2
snakeoil-d6a7c2e44b4f497357f8569d423104232a58f384.zip
Reformat w/ black 22.12.0 for consistency.
Signed-off-by: Brian Harring <ferringb@gmail.com> Signed-off-by: Arthur Zamarin <arthurzam@gentoo.org>
Diffstat (limited to 'tests')
-rw-r--r--tests/cli/test_arghparse.py336
-rw-r--r--tests/cli/test_input.py99
-rw-r--r--tests/compression/__init__.py68
-rw-r--r--tests/compression/test_bzip2.py40
-rw-r--r--tests/compression/test_init.py77
-rw-r--r--tests/compression/test_xz.py30
-rw-r--r--tests/test_bash.py283
-rw-r--r--tests/test_caching.py22
-rw-r--r--tests/test_chksum.py4
-rw-r--r--tests/test_chksum_defaults.py37
-rw-r--r--tests/test_constraints.py64
-rw-r--r--tests/test_containers.py19
-rw-r--r--tests/test_contexts.py51
-rw-r--r--tests/test_currying.py119
-rw-r--r--tests/test_data_source.py37
-rw-r--r--tests/test_decorators.py37
-rw-r--r--tests/test_demandload.py86
-rw-r--r--tests/test_demandload_usage.py6
-rw-r--r--tests/test_dependant_methods.py21
-rw-r--r--tests/test_fileutils.py105
-rw-r--r--tests/test_formatters.py243
-rw-r--r--tests/test_iterables.py11
-rw-r--r--tests/test_klass.py154
-rw-r--r--tests/test_mappings.py208
-rw-r--r--tests/test_modules.py67
-rw-r--r--tests/test_obj.py85
-rw-r--r--tests/test_osutils.py221
-rw-r--r--tests/test_process.py6
-rw-r--r--tests/test_process_spawn.py50
-rw-r--r--tests/test_sequences.py110
-rw-r--r--tests/test_stringio.py5
-rw-r--r--tests/test_strings.py34
-rw-r--r--tests/test_version.py139
33 files changed, 1567 insertions, 1307 deletions
diff --git a/tests/cli/test_arghparse.py b/tests/cli/test_arghparse.py
index ccaa65e7..4741d861 100644
--- a/tests/cli/test_arghparse.py
+++ b/tests/cli/test_arghparse.py
@@ -11,56 +11,60 @@ from snakeoil.test import argparse_helpers
class TestArgparseDocs:
-
def test_add_argument_docs(self):
# force using an unpatched version of argparse
reload(argparse)
parser = argparse.ArgumentParser()
- parser.add_argument('--foo', action='store_true')
+ parser.add_argument("--foo", action="store_true")
# vanilla argparse doesn't support docs kwargs
with pytest.raises(TypeError):
parser.add_argument(
- '-b', '--blah', action='store_true', docs='Blah blah blah')
+ "-b", "--blah", action="store_true", docs="Blah blah blah"
+ )
with pytest.raises(TypeError):
- parser.add_argument_group('fa', description='fa la la', docs='fa la la la')
+ parser.add_argument_group("fa", description="fa la la", docs="fa la la la")
with pytest.raises(TypeError):
- parser.add_mutually_exclusive_group('fee', description='fi', docs='fo fum')
+ parser.add_mutually_exclusive_group("fee", description="fi", docs="fo fum")
# forcibly monkey-patch argparse to allow docs kwargs
reload(arghparse)
- default = 'baz baz'
- docs = 'blah blah'
+ default = "baz baz"
+ docs = "blah blah"
for enable_docs, expected_txt in ((False, default), (True, docs)):
arghparse._generate_docs = enable_docs
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(description=default, docs=docs)
- subparser = subparsers.add_parser('foo', description=default, docs=docs)
+ subparser = subparsers.add_parser("foo", description=default, docs=docs)
action = parser.add_argument(
- '-b', '--blah', action='store_true', help=default, docs=docs)
- arg_group = parser.add_argument_group('fa', description=default, docs=docs)
+ "-b", "--blah", action="store_true", help=default, docs=docs
+ )
+ arg_group = parser.add_argument_group("fa", description=default, docs=docs)
mut_arg_group = parser.add_mutually_exclusive_group()
mut_action = mut_arg_group.add_argument(
- '-f', '--fee', action='store_true', help=default, docs=docs)
+ "-f", "--fee", action="store_true", help=default, docs=docs
+ )
- assert getattr(parser._subparsers, 'description', None) == expected_txt
- assert getattr(subparser, 'description', None) == expected_txt
- assert getattr(action, 'help', None) == expected_txt
- assert getattr(arg_group, 'description', None) == expected_txt
- assert getattr(mut_action, 'help', None) == expected_txt
+ assert getattr(parser._subparsers, "description", None) == expected_txt
+ assert getattr(subparser, "description", None) == expected_txt
+ assert getattr(action, "help", None) == expected_txt
+ assert getattr(arg_group, "description", None) == expected_txt
+ assert getattr(mut_action, "help", None) == expected_txt
# list/tuple-based docs
arghparse._generate_docs = True
- docs = 'foo bar'
+ docs = "foo bar"
parser = argparse.ArgumentParser()
list_action = parser.add_argument(
- '-b', '--blah', action='store_true', help=default, docs=list(docs.split()))
+ "-b", "--blah", action="store_true", help=default, docs=list(docs.split())
+ )
tuple_action = parser.add_argument(
- '-c', '--cat', action='store_true', help=default, docs=tuple(docs.split()))
- assert getattr(list_action, 'help', None) == 'foo\nbar'
- assert getattr(tuple_action, 'help', None) == 'foo\nbar'
+ "-c", "--cat", action="store_true", help=default, docs=tuple(docs.split())
+ )
+ assert getattr(list_action, "help", None) == "foo\nbar"
+ assert getattr(tuple_action, "help", None) == "foo\nbar"
class TestOptionalsParser:
@@ -68,7 +72,9 @@ class TestOptionalsParser:
# TODO: move this to a generic argparse fixture
@pytest.fixture(autouse=True)
def __setup_optionals_parser(self):
- self.optionals_parser = argparse_helpers.mangle_parser(arghparse.OptionalsParser())
+ self.optionals_parser = argparse_helpers.mangle_parser(
+ arghparse.OptionalsParser()
+ )
def test_no_args(self):
args, unknown = self.optionals_parser.parse_known_optionals([])
@@ -76,14 +82,14 @@ class TestOptionalsParser:
assert unknown == []
def test_only_positionals(self):
- self.optionals_parser.add_argument('args')
+ self.optionals_parser.add_argument("args")
args, unknown = self.optionals_parser.parse_known_optionals([])
- assert vars(args) == {'args': None}
+ assert vars(args) == {"args": None}
assert unknown == []
def test_optionals(self):
- self.optionals_parser.add_argument('--opt1')
- self.optionals_parser.add_argument('args')
+ self.optionals_parser.add_argument("--opt1")
+ self.optionals_parser.add_argument("args")
parse = self.optionals_parser.parse_known_optionals
# no args
@@ -92,37 +98,37 @@ class TestOptionalsParser:
assert unknown == []
# only known optional
- args, unknown = parse(['--opt1', 'yes'])
- assert args.opt1 == 'yes'
+ args, unknown = parse(["--opt1", "yes"])
+ assert args.opt1 == "yes"
assert unknown == []
# unknown optional
- args, unknown = parse(['--foo'])
+ args, unknown = parse(["--foo"])
assert args.opt1 is None
- assert unknown == ['--foo']
+ assert unknown == ["--foo"]
# unknown optional and positional
- args, unknown = parse(['--foo', 'arg'])
+ args, unknown = parse(["--foo", "arg"])
assert args.opt1 is None
- assert unknown == ['--foo', 'arg']
+ assert unknown == ["--foo", "arg"]
# known optional with unknown optional
- args, unknown = parse(['--opt1', 'yes', '--foo'])
- assert args.opt1 == 'yes'
- assert unknown == ['--foo']
+ args, unknown = parse(["--opt1", "yes", "--foo"])
+ assert args.opt1 == "yes"
+ assert unknown == ["--foo"]
# different order
- args, unknown = parse(['--foo', '--opt1', 'yes'])
- assert args.opt1 == 'yes'
- assert unknown == ['--foo']
+ args, unknown = parse(["--foo", "--opt1", "yes"])
+ assert args.opt1 == "yes"
+ assert unknown == ["--foo"]
# known optional with unknown positional
- args, unknown = parse(['--opt1', 'yes', 'arg'])
- assert args.opt1 == 'yes'
- assert unknown == ['arg']
+ args, unknown = parse(["--opt1", "yes", "arg"])
+ assert args.opt1 == "yes"
+ assert unknown == ["arg"]
# known optionals parsing stops at the first positional arg
- args, unknown = parse(['arg', '--opt1', 'yes'])
+ args, unknown = parse(["arg", "--opt1", "yes"])
assert args.opt1 is None
- assert unknown == ['arg', '--opt1', 'yes']
+ assert unknown == ["arg", "--opt1", "yes"]
class TestCsvActionsParser:
@@ -134,20 +140,19 @@ class TestCsvActionsParser:
def test_bad_action(self):
with pytest.raises(ValueError) as excinfo:
- self.csv_parser.add_argument('--arg1', action='unknown')
+ self.csv_parser.add_argument("--arg1", action="unknown")
assert 'unknown action "unknown"' == str(excinfo.value)
def test_csv_actions(self):
- self.csv_parser.add_argument('--arg1', action='csv')
- self.csv_parser.add_argument('--arg2', action='csv_append')
- self.csv_parser.add_argument('--arg3', action='csv_negations')
- self.csv_parser.add_argument('--arg4', action='csv_negations_append')
- self.csv_parser.add_argument('--arg5', action='csv_elements')
- self.csv_parser.add_argument('--arg6', action='csv_elements_append')
+ self.csv_parser.add_argument("--arg1", action="csv")
+ self.csv_parser.add_argument("--arg2", action="csv_append")
+ self.csv_parser.add_argument("--arg3", action="csv_negations")
+ self.csv_parser.add_argument("--arg4", action="csv_negations_append")
+ self.csv_parser.add_argument("--arg5", action="csv_elements")
+ self.csv_parser.add_argument("--arg6", action="csv_elements_append")
class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser):
-
def test_debug(self):
# debug passed
parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser(debug=True))
@@ -161,8 +166,10 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser):
assert namespace.debug is False
# debug passed in sys.argv -- early debug attr on the parser instance is set
- with mock.patch('sys.argv', ['script', '--debug']):
- parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser(debug=True))
+ with mock.patch("sys.argv", ["script", "--debug"]):
+ parser = argparse_helpers.mangle_parser(
+ arghparse.ArgumentParser(debug=True)
+ )
assert parser.debug is True
def test_debug_disabled(self):
@@ -176,34 +183,36 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser):
# parser attribute still exists
assert parser.debug is False
# but namespace attribute doesn't
- assert not hasattr(namespace, 'debug')
+ assert not hasattr(namespace, "debug")
def test_verbosity(self):
values = (
([], 0),
- (['-q'], -1),
- (['--quiet'], -1),
- (['-v'], 1),
- (['--verbose'], 1),
- (['-q', '-v'], 0),
- (['--quiet', '--verbose'], 0),
- (['-q', '-q'], -2),
- (['-v', '-v'], 2),
+ (["-q"], -1),
+ (["--quiet"], -1),
+ (["-v"], 1),
+ (["--verbose"], 1),
+ (["-q", "-v"], 0),
+ (["--quiet", "--verbose"], 0),
+ (["-q", "-q"], -2),
+ (["-v", "-v"], 2),
)
for args, val in values:
- with mock.patch('sys.argv', ['script'] + args):
+ with mock.patch("sys.argv", ["script"] + args):
parser = argparse_helpers.mangle_parser(
- arghparse.ArgumentParser(quiet=True, verbose=True))
+ arghparse.ArgumentParser(quiet=True, verbose=True)
+ )
namespace = parser.parse_args(args)
- assert parser.verbosity == val, '{} failed'.format(args)
- assert namespace.verbosity == val, '{} failed'.format(args)
+ assert parser.verbosity == val, "{} failed".format(args)
+ assert namespace.verbosity == val, "{} failed".format(args)
def test_verbosity_disabled(self):
parser = argparse_helpers.mangle_parser(
- arghparse.ArgumentParser(quiet=False, verbose=False))
+ arghparse.ArgumentParser(quiet=False, verbose=False)
+ )
# ensure the options aren't there if disabled
- for args in ('-q', '--quiet', '-v', '--verbose'):
+ for args in ("-q", "--quiet", "-v", "--verbose"):
with pytest.raises(argparse_helpers.Error):
namespace = parser.parse_args([args])
@@ -211,17 +220,15 @@ class TestArgumentParser(TestCsvActionsParser, TestOptionalsParser):
# parser attribute still exists
assert parser.verbosity == 0
# but namespace attribute doesn't
- assert not hasattr(namespace, 'verbosity')
+ assert not hasattr(namespace, "verbosity")
class BaseArgparseOptions:
-
def setup_method(self, method):
self.parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser())
class TestStoreBoolAction(BaseArgparseOptions):
-
def setup_method(self, method):
super().setup_method(method)
self.parser.add_argument("--testing", action=arghparse.StoreBool, default=None)
@@ -229,13 +236,13 @@ class TestStoreBoolAction(BaseArgparseOptions):
def test_bool_disabled(self):
for raw_val in ("n", "no", "false"):
for allowed in (raw_val.upper(), raw_val.lower()):
- namespace = self.parser.parse_args(['--testing=' + allowed])
+ namespace = self.parser.parse_args(["--testing=" + allowed])
assert namespace.testing is False
def test_bool_enabled(self):
for raw_val in ("y", "yes", "true"):
for allowed in (raw_val.upper(), raw_val.lower()):
- namespace = self.parser.parse_args(['--testing=' + allowed])
+ namespace = self.parser.parse_args(["--testing=" + allowed])
assert namespace.testing is True
def test_bool_invalid(self):
@@ -244,249 +251,244 @@ class TestStoreBoolAction(BaseArgparseOptions):
class ParseStdinTest(BaseArgparseOptions):
-
def setup_method(self, method):
super().setup_method(method)
- self.parser.add_argument(
- "testing", nargs='+', action=arghparse.ParseStdin)
+ self.parser.add_argument("testing", nargs="+", action=arghparse.ParseStdin)
def test_none_invalid(self):
with pytest.raises(argparse_helpers.Error):
self.parser.parse_args([])
def test_non_stdin(self):
- namespace = self.parser.parse_args(['foo'])
- assert namespace.testing == ['foo']
+ namespace = self.parser.parse_args(["foo"])
+ assert namespace.testing == ["foo"]
def test_non_stdin_multiple(self):
- namespace = self.parser.parse_args(['foo', 'bar'])
- assert namespace.testing == ['foo', 'bar']
+ namespace = self.parser.parse_args(["foo", "bar"])
+ assert namespace.testing == ["foo", "bar"]
def test_stdin(self):
# stdin is an interactive tty
- with mock.patch('sys.stdin.isatty', return_value=True):
+ with mock.patch("sys.stdin.isatty", return_value=True):
with pytest.raises(argparse_helpers.Error) as excinfo:
- namespace = self.parser.parse_args(['-'])
- assert 'only valid when piping data in' in str(excinfo.value)
+ namespace = self.parser.parse_args(["-"])
+ assert "only valid when piping data in" in str(excinfo.value)
# fake piping data in
for readlines, expected in (
- ([], []),
- ([' '], []),
- (['\n'], []),
- (['\n', '\n'], []),
- (['foo'], ['foo']),
- (['foo '], ['foo']),
- (['foo\n'], ['foo']),
- (['foo', 'bar', 'baz'], ['foo', 'bar', 'baz']),
- (['\nfoo\n', ' bar ', '\nbaz'], ['\nfoo', ' bar', '\nbaz']),
+ ([], []),
+ ([" "], []),
+ (["\n"], []),
+ (["\n", "\n"], []),
+ (["foo"], ["foo"]),
+ (["foo "], ["foo"]),
+ (["foo\n"], ["foo"]),
+ (["foo", "bar", "baz"], ["foo", "bar", "baz"]),
+ (["\nfoo\n", " bar ", "\nbaz"], ["\nfoo", " bar", "\nbaz"]),
):
- with mock.patch('sys.stdin') as stdin, \
- mock.patch("builtins.open", mock.mock_open()) as mock_file:
+ with mock.patch("sys.stdin") as stdin, mock.patch(
+ "builtins.open", mock.mock_open()
+ ) as mock_file:
stdin.readlines.return_value = readlines
stdin.isatty.return_value = False
- namespace = self.parser.parse_args(['-'])
+ namespace = self.parser.parse_args(["-"])
mock_file.assert_called_once_with("/dev/tty")
assert namespace.testing == expected
class TestCommaSeparatedValuesAction(BaseArgparseOptions):
-
def setup_method(self, method):
super().setup_method(method)
self.test_values = (
- ('', []),
- (',', []),
- (',,', []),
- ('a', ['a']),
- ('a,b,-c', ['a', 'b', '-c']),
+ ("", []),
+ (",", []),
+ (",,", []),
+ ("a", ["a"]),
+ ("a,b,-c", ["a", "b", "-c"]),
)
- self.action = 'csv'
+ self.action = "csv"
self.single_expected = lambda x: x
self.multi_expected = lambda x: x
def test_parse_args(self):
- self.parser.add_argument('--testing', action=self.action)
+ self.parser.add_argument("--testing", action=self.action)
for raw_val, expected in self.test_values:
- namespace = self.parser.parse_args(['--testing=' + raw_val])
+ namespace = self.parser.parse_args(["--testing=" + raw_val])
assert namespace.testing == self.single_expected(expected)
def test_parse_multi_args(self):
- self.parser.add_argument('--testing', action=self.action)
+ self.parser.add_argument("--testing", action=self.action)
for raw_val, expected in self.test_values:
- namespace = self.parser.parse_args([
- '--testing=' + raw_val, '--testing=' + raw_val,
- ])
+ namespace = self.parser.parse_args(
+ [
+ "--testing=" + raw_val,
+ "--testing=" + raw_val,
+ ]
+ )
assert namespace.testing == self.multi_expected(expected)
class TestCommaSeparatedValuesAppendAction(TestCommaSeparatedValuesAction):
-
def setup_method(self, method):
super().setup_method(method)
- self.action = 'csv_append'
+ self.action = "csv_append"
self.multi_expected = lambda x: x + x
class TestCommaSeparatedNegationsAction(TestCommaSeparatedValuesAction):
-
def setup_method(self, method):
super().setup_method(method)
self.test_values = (
- ('', ([], [])),
- (',', ([], [])),
- (',,', ([], [])),
- ('a', ([], ['a'])),
- ('-a', (['a'], [])),
- ('a,-b,-c,d', (['b', 'c'], ['a', 'd'])),
+ ("", ([], [])),
+ (",", ([], [])),
+ (",,", ([], [])),
+ ("a", ([], ["a"])),
+ ("-a", (["a"], [])),
+ ("a,-b,-c,d", (["b", "c"], ["a", "d"])),
)
- self.bad_args = ('-',)
- self.action = 'csv_negations'
+ self.bad_args = ("-",)
+ self.action = "csv_negations"
def test_parse_bad_args(self):
- self.parser.add_argument('--testing', action=self.action)
+ self.parser.add_argument("--testing", action=self.action)
for arg in self.bad_args:
with pytest.raises(argparse.ArgumentTypeError) as excinfo:
- namespace = self.parser.parse_args(['--testing=' + arg])
- assert 'without a token' in str(excinfo.value)
+ namespace = self.parser.parse_args(["--testing=" + arg])
+ assert "without a token" in str(excinfo.value)
class TestCommaSeparatedNegationsAppendAction(TestCommaSeparatedNegationsAction):
-
def setup_method(self, method):
super().setup_method(method)
- self.action = 'csv_negations_append'
+ self.action = "csv_negations_append"
self.multi_expected = lambda x: tuple(x + y for x, y in zip(x, x))
class TestCommaSeparatedElementsAction(TestCommaSeparatedNegationsAction):
-
def setup_method(self, method):
super().setup_method(method)
self.test_values = (
- ('', ([], [], [])),
- (',', ([], [], [])),
- (',,', ([], [], [])),
- ('-a', (['a'], [], [])),
- ('a', ([], ['a'], [])),
- ('+a', ([], [], ['a'])),
- ('a,-b,-c,d', (['b', 'c'], ['a', 'd'], [])),
- ('a,-b,+c,-d,+e,f', (['b', 'd'], ['a', 'f'], ['c', 'e'])),
+ ("", ([], [], [])),
+ (",", ([], [], [])),
+ (",,", ([], [], [])),
+ ("-a", (["a"], [], [])),
+ ("a", ([], ["a"], [])),
+ ("+a", ([], [], ["a"])),
+ ("a,-b,-c,d", (["b", "c"], ["a", "d"], [])),
+ ("a,-b,+c,-d,+e,f", (["b", "d"], ["a", "f"], ["c", "e"])),
)
- self.bad_values = ('-', '+')
- self.action = 'csv_elements'
+ self.bad_values = ("-", "+")
+ self.action = "csv_elements"
class TestCommaSeparatedElementsAppendAction(TestCommaSeparatedElementsAction):
-
def setup_method(self, method):
super().setup_method(method)
- self.action = 'csv_elements_append'
+ self.action = "csv_elements_append"
self.multi_expected = lambda x: tuple(x + y for x, y in zip(x, x))
class TestExistentPathType(BaseArgparseOptions):
-
def setup_method(self, method):
super().setup_method(method)
- self.parser.add_argument('--path', type=arghparse.existent_path)
+ self.parser.add_argument("--path", type=arghparse.existent_path)
def test_nonexistent(self):
# nonexistent path arg raises an error
with pytest.raises(argparse_helpers.Error):
- self.parser.parse_args(['--path=/path/to/nowhere'])
+ self.parser.parse_args(["--path=/path/to/nowhere"])
def test_os_errors(self, tmpdir):
# random OS/FS issues raise errors
- with mock.patch('os.path.realpath') as realpath:
- realpath.side_effect = OSError(19, 'Random OS error')
+ with mock.patch("os.path.realpath") as realpath:
+ realpath.side_effect = OSError(19, "Random OS error")
with pytest.raises(argparse_helpers.Error):
- self.parser.parse_args(['--path=%s' % tmpdir])
+ self.parser.parse_args(["--path=%s" % tmpdir])
def test_regular_usage(self, tmpdir):
- namespace = self.parser.parse_args(['--path=%s' % tmpdir])
+ namespace = self.parser.parse_args(["--path=%s" % tmpdir])
assert namespace.path == str(tmpdir)
class TestExistentDirType(BaseArgparseOptions):
-
def setup_method(self, method):
super().setup_method(method)
- self.parser.add_argument('--path', type=arghparse.existent_dir)
+ self.parser.add_argument("--path", type=arghparse.existent_dir)
def test_nonexistent(self):
# nonexistent path arg raises an error
with pytest.raises(argparse_helpers.Error):
- self.parser.parse_args(['--path=/path/to/nowhere'])
+ self.parser.parse_args(["--path=/path/to/nowhere"])
def test_os_errors(self, tmp_path):
# random OS/FS issues raise errors
- with mock.patch('os.path.realpath') as realpath:
- realpath.side_effect = OSError(19, 'Random OS error')
+ with mock.patch("os.path.realpath") as realpath:
+ realpath.side_effect = OSError(19, "Random OS error")
with pytest.raises(argparse_helpers.Error):
- self.parser.parse_args([f'--path={tmp_path}'])
+ self.parser.parse_args([f"--path={tmp_path}"])
def test_file_path(self, tmp_path):
- f = tmp_path / 'file'
+ f = tmp_path / "file"
f.touch()
with pytest.raises(argparse_helpers.Error):
- self.parser.parse_args([f'--path={f}'])
+ self.parser.parse_args([f"--path={f}"])
def test_regular_usage(self, tmp_path):
- namespace = self.parser.parse_args([f'--path={tmp_path}'])
+ namespace = self.parser.parse_args([f"--path={tmp_path}"])
assert namespace.path == str(tmp_path)
class TestNamespace:
-
def setup_method(self, method):
self.parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser())
def test_pop(self):
- self.parser.set_defaults(test='test')
+ self.parser.set_defaults(test="test")
namespace = self.parser.parse_args([])
- assert namespace.pop('test') == 'test'
+ assert namespace.pop("test") == "test"
# re-popping raises an exception since the attr has been removed
with pytest.raises(AttributeError):
- namespace.pop('test')
+ namespace.pop("test")
# popping a nonexistent attr with a fallback returns the fallback
- assert namespace.pop('nonexistent', 'foo') == 'foo'
+ assert namespace.pop("nonexistent", "foo") == "foo"
def test_collapse_delayed(self):
def _delayed_val(namespace, attr, val):
setattr(namespace, attr, val)
- self.parser.set_defaults(delayed=arghparse.DelayedValue(partial(_delayed_val, val=42)))
+
+ self.parser.set_defaults(
+ delayed=arghparse.DelayedValue(partial(_delayed_val, val=42))
+ )
namespace = self.parser.parse_args([])
assert namespace.delayed == 42
def test_bool(self):
namespace = arghparse.Namespace()
assert not namespace
- namespace.arg = 'foo'
+ namespace.arg = "foo"
assert namespace
class TestManHelpAction:
-
def test_help(self, capsys):
parser = argparse_helpers.mangle_parser(arghparse.ArgumentParser())
- with mock.patch('subprocess.Popen') as popen:
+ with mock.patch("subprocess.Popen") as popen:
# --help long option tries man page first before falling back to help output
with pytest.raises(argparse_helpers.Exit):
- namespace = parser.parse_args(['--help'])
+ namespace = parser.parse_args(["--help"])
popen.assert_called_once()
- assert popen.call_args[0][0][0] == 'man'
+ assert popen.call_args[0][0][0] == "man"
captured = capsys.readouterr()
- assert captured.out.strip().startswith('usage: ')
+ assert captured.out.strip().startswith("usage: ")
popen.reset_mock()
# -h short option just displays the regular help output
with pytest.raises(argparse_helpers.Exit):
- namespace = parser.parse_args(['-h'])
+ namespace = parser.parse_args(["-h"])
popen.assert_not_called()
captured = capsys.readouterr()
- assert captured.out.strip().startswith('usage: ')
+ assert captured.out.strip().startswith("usage: ")
popen.reset_mock()
diff --git a/tests/cli/test_input.py b/tests/cli/test_input.py
index 8efb1f59..2f15f256 100644
--- a/tests/cli/test_input.py
+++ b/tests/cli/test_input.py
@@ -9,12 +9,11 @@ from snakeoil.test.argparse_helpers import FakeStreamFormatter
@pytest.fixture
def mocked_input():
- with mock.patch('builtins.input') as mocked_input:
+ with mock.patch("builtins.input") as mocked_input:
yield mocked_input
class TestUserQuery:
-
@pytest.fixture(autouse=True)
def __setup(self):
self.out = FakeStreamFormatter()
@@ -22,98 +21,104 @@ class TestUserQuery:
self.query = partial(userquery, out=self.out, err=self.err)
def test_default_answer(self, mocked_input):
- mocked_input.return_value = ''
- assert self.query('foo') == True
+ mocked_input.return_value = ""
+ assert self.query("foo") == True
def test_tuple_prompt(self, mocked_input):
- mocked_input.return_value = ''
- prompt = 'perhaps a tuple'
+ mocked_input.return_value = ""
+ prompt = "perhaps a tuple"
assert self.query(tuple(prompt.split())) == True
- output = ''.join(prompt.split())
- assert self.out.get_text_stream().strip().split('\n')[0][:len(output)] == output
+ output = "".join(prompt.split())
+ assert (
+ self.out.get_text_stream().strip().split("\n")[0][: len(output)] == output
+ )
def test_no_default_answer(self, mocked_input):
responses = {
- 'a': ('z', 'Yes'),
- 'b': ('y', 'No'),
+ "a": ("z", "Yes"),
+ "b": ("y", "No"),
}
# no default answer returns None for empty input
- mocked_input.return_value = ''
- assert self.query('foo', responses=responses) == None
- mocked_input.return_value = 'a'
- assert self.query('foo', responses=responses) == 'z'
- mocked_input.return_value = 'b'
- assert self.query('foo', responses=responses) == 'y'
+ mocked_input.return_value = ""
+ assert self.query("foo", responses=responses) == None
+ mocked_input.return_value = "a"
+ assert self.query("foo", responses=responses) == "z"
+ mocked_input.return_value = "b"
+ assert self.query("foo", responses=responses) == "y"
def test_ambiguous_input(self, mocked_input):
responses = {
- 'a': ('z', 'Yes'),
- 'A': ('y', 'No'),
+ "a": ("z", "Yes"),
+ "A": ("y", "No"),
}
- mocked_input.return_value = 'a'
+ mocked_input.return_value = "a"
with pytest.raises(NoChoice):
- self.query('foo', responses=responses)
- error_output = self.err.get_text_stream().strip().split('\n')[1]
- expected = 'Response %r is ambiguous (%s)' % (
- mocked_input.return_value, ', '.join(sorted(responses.keys())))
+ self.query("foo", responses=responses)
+ error_output = self.err.get_text_stream().strip().split("\n")[1]
+ expected = "Response %r is ambiguous (%s)" % (
+ mocked_input.return_value,
+ ", ".join(sorted(responses.keys())),
+ )
assert error_output == expected
def test_default_correct_input(self, mocked_input):
- for input, output in (('no', False),
- ('No', False),
- ('yes', True),
- ('Yes', True)):
+ for input, output in (
+ ("no", False),
+ ("No", False),
+ ("yes", True),
+ ("Yes", True),
+ ):
mocked_input.return_value = input
- assert self.query('foo') == output
+ assert self.query("foo") == output
def test_default_answer_no_matches(self, mocked_input):
- mocked_input.return_value = ''
+ mocked_input.return_value = ""
with pytest.raises(ValueError):
- self.query('foo', default_answer='foo')
+ self.query("foo", default_answer="foo")
assert self.out.stream == []
def test_custom_default_answer(self, mocked_input):
- mocked_input.return_value = ''
- assert self.query('foo', default_answer=False) == False
+ mocked_input.return_value = ""
+ assert self.query("foo", default_answer=False) == False
def test_eof_nochoice(self, mocked_input):
# user hits ctrl-d
mocked_input.side_effect = EOFError
with pytest.raises(NoChoice):
- self.query('foo')
- output = self.out.get_text_stream().strip().split('\n')[1]
- expected = 'Not answerable: EOF on STDIN'
+ self.query("foo")
+ output = self.out.get_text_stream().strip().split("\n")[1]
+ expected = "Not answerable: EOF on STDIN"
assert output == expected
def test_stdin_closed_nochoice(self, mocked_input):
- mocked_input.side_effect = IOError(errno.EBADF, '')
+ mocked_input.side_effect = IOError(errno.EBADF, "")
with pytest.raises(NoChoice):
- self.query('foo')
- output = self.out.get_text_stream().strip().split('\n')[1]
- expected = 'Not answerable: STDIN is either closed, or not readable'
+ self.query("foo")
+ output = self.out.get_text_stream().strip().split("\n")[1]
+ expected = "Not answerable: STDIN is either closed, or not readable"
assert output == expected
def test_unhandled_ioerror(self, mocked_input):
- mocked_input.side_effect = IOError(errno.ENODEV, '')
+ mocked_input.side_effect = IOError(errno.ENODEV, "")
with pytest.raises(IOError):
- self.query('foo')
+ self.query("foo")
def test_bad_choice_limit(self, mocked_input):
# user hits enters a bad choice 3 times in a row
- mocked_input.return_value = 'bad'
+ mocked_input.return_value = "bad"
with pytest.raises(NoChoice):
- self.query('foo')
+ self.query("foo")
assert mocked_input.call_count == 3
- output = self.err.get_text_stream().strip().split('\n')[1]
+ output = self.err.get_text_stream().strip().split("\n")[1]
expected = "Sorry, response %r not understood." % (mocked_input.return_value,)
assert output == expected
def test_custom_choice_limit(self, mocked_input):
# user hits enters a bad choice 5 times in a row
- mocked_input.return_value = 'haha'
+ mocked_input.return_value = "haha"
with pytest.raises(NoChoice):
- self.query('foo', limit=5)
+ self.query("foo", limit=5)
assert mocked_input.call_count == 5
- output = self.err.get_text_stream().strip().split('\n')[1]
+ output = self.err.get_text_stream().strip().split("\n")[1]
expected = "Sorry, response %r not understood." % (mocked_input.return_value,)
assert output == expected
diff --git a/tests/compression/__init__.py b/tests/compression/__init__.py
index 3b70dcba..0bf26d04 100644
--- a/tests/compression/__init__.py
+++ b/tests/compression/__init__.py
@@ -4,78 +4,100 @@ import pytest
from snakeoil import compression
from snakeoil.process import CommandNotFound, find_binary
+
def hide_binary(*binaries: str):
def mock_find_binary(name):
if name in binaries:
raise CommandNotFound(name)
return find_binary(name)
- return patch('snakeoil.process.find_binary', side_effect=mock_find_binary)
+ return patch("snakeoil.process.find_binary", side_effect=mock_find_binary)
class Base:
- module: str = ''
- decompressed_test_data: bytes = b''
- compressed_test_data: bytes = b''
+ module: str = ""
+ decompressed_test_data: bytes = b""
+ compressed_test_data: bytes = b""
def decompress(self, data: bytes) -> bytes:
- raise NotImplementedError(self, 'decompress')
+ raise NotImplementedError(self, "decompress")
- @pytest.mark.parametrize('parallelize', (True, False))
- @pytest.mark.parametrize('level', (1, 9))
+ @pytest.mark.parametrize("parallelize", (True, False))
+ @pytest.mark.parametrize("level", (1, 9))
def test_compress_data(self, level, parallelize):
- compressed = compression.compress_data(self.module, self.decompressed_test_data, level=level, parallelize=parallelize)
+ compressed = compression.compress_data(
+ self.module,
+ self.decompressed_test_data,
+ level=level,
+ parallelize=parallelize,
+ )
assert compressed
assert self.decompress(compressed) == self.decompressed_test_data
- @pytest.mark.parametrize('parallelize', (True, False))
+ @pytest.mark.parametrize("parallelize", (True, False))
def test_decompress_data(self, parallelize):
- assert self.decompressed_test_data == compression.decompress_data(self.module, self.compressed_test_data, parallelize=parallelize)
+ assert self.decompressed_test_data == compression.decompress_data(
+ self.module, self.compressed_test_data, parallelize=parallelize
+ )
- @pytest.mark.parametrize('parallelize', (True, False))
- @pytest.mark.parametrize('level', (1, 9))
+ @pytest.mark.parametrize("parallelize", (True, False))
+ @pytest.mark.parametrize("level", (1, 9))
def test_compress_handle(self, tmp_path, level, parallelize):
- path = tmp_path / f'test.{self.module}'
+ path = tmp_path / f"test.{self.module}"
- stream = compression.compress_handle(self.module, str(path), level=level, parallelize=parallelize)
+ stream = compression.compress_handle(
+ self.module, str(path), level=level, parallelize=parallelize
+ )
stream.write(self.decompressed_test_data)
stream.close()
assert self.decompress(path.read_bytes()) == self.decompressed_test_data
with path.open("wb") as file:
- stream = compression.compress_handle(self.module, file, level=level, parallelize=parallelize)
+ stream = compression.compress_handle(
+ self.module, file, level=level, parallelize=parallelize
+ )
stream.write(self.decompressed_test_data)
stream.close()
assert self.decompress(path.read_bytes()) == self.decompressed_test_data
with path.open("wb") as file:
- stream = compression.compress_handle(self.module, file.fileno(), level=level, parallelize=parallelize)
+ stream = compression.compress_handle(
+ self.module, file.fileno(), level=level, parallelize=parallelize
+ )
stream.write(self.decompressed_test_data)
stream.close()
assert self.decompress(path.read_bytes()) == self.decompressed_test_data
with pytest.raises(TypeError):
- compression.compress_handle(self.module, b'', level=level, parallelize=parallelize)
+ compression.compress_handle(
+ self.module, b"", level=level, parallelize=parallelize
+ )
- @pytest.mark.parametrize('parallelize', (True, False))
+ @pytest.mark.parametrize("parallelize", (True, False))
def test_decompress_handle(self, tmp_path, parallelize):
- path = tmp_path / f'test.{self.module}'
+ path = tmp_path / f"test.{self.module}"
path.write_bytes(self.compressed_test_data)
- stream = compression.decompress_handle(self.module, str(path), parallelize=parallelize)
+ stream = compression.decompress_handle(
+ self.module, str(path), parallelize=parallelize
+ )
assert stream.read() == self.decompressed_test_data
stream.close()
with path.open("rb") as file:
- stream = compression.decompress_handle(self.module, file, parallelize=parallelize)
+ stream = compression.decompress_handle(
+ self.module, file, parallelize=parallelize
+ )
assert stream.read() == self.decompressed_test_data
stream.close()
with path.open("rb") as file:
- stream = compression.decompress_handle(self.module, file.fileno(), parallelize=parallelize)
+ stream = compression.decompress_handle(
+ self.module, file.fileno(), parallelize=parallelize
+ )
assert stream.read() == self.decompressed_test_data
stream.close()
with pytest.raises(TypeError):
- compression.decompress_handle(self.module, b'', parallelize=parallelize)
+ compression.decompress_handle(self.module, b"", parallelize=parallelize)
diff --git a/tests/compression/test_bzip2.py b/tests/compression/test_bzip2.py
index f3093d09..9fdffd9a 100644
--- a/tests/compression/test_bzip2.py
+++ b/tests/compression/test_bzip2.py
@@ -10,28 +10,29 @@ from . import Base, hide_binary
def test_no_native():
- with hide_imports('bz2'):
+ with hide_imports("bz2"):
importlib.reload(_bzip2)
assert not _bzip2.native
def test_missing_bzip2_binary():
- with hide_binary('bzip2'):
- with pytest.raises(CommandNotFound, match='bzip2'):
+ with hide_binary("bzip2"):
+ with pytest.raises(CommandNotFound, match="bzip2"):
importlib.reload(_bzip2)
def test_missing_lbzip2_binary():
- with hide_binary('lbzip2'):
+ with hide_binary("lbzip2"):
importlib.reload(_bzip2)
assert not _bzip2.parallelizable
+
class Bzip2Base(Base):
- module = 'bzip2'
- decompressed_test_data = b'Some text here\n'
+ module = "bzip2"
+ decompressed_test_data = b"Some text here\n"
compressed_test_data = (
- b'BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02'
+ b"BZh91AY&SY\x1bM\x00\x02\x00\x00\x01\xd3\x80\x00\x10@\x00\x08\x00\x02"
b'B\x94@ \x00"\r\x03\xd4\x0c \t!\x1b\xb7\x80u/\x17rE8P\x90\x1bM\x00\x02'
)
@@ -40,37 +41,36 @@ class Bzip2Base(Base):
class TestStdlib(Bzip2Base):
-
- @pytest.fixture(autouse=True, scope='class')
+ @pytest.fixture(autouse=True, scope="class")
def _setup(self):
try:
- find_binary('bzip2')
+ find_binary("bzip2")
except CommandNotFound:
- pytest.skip('bzip2 binary not found')
- with hide_binary('lbzip2'):
+ pytest.skip("bzip2 binary not found")
+ with hide_binary("lbzip2"):
importlib.reload(_bzip2)
yield
class TestBzip2(Bzip2Base):
-
- @pytest.fixture(autouse=True, scope='class')
+ @pytest.fixture(autouse=True, scope="class")
def _setup(self):
- with hide_binary('lbzip2'):
+ with hide_binary("lbzip2"):
importlib.reload(_bzip2)
yield
class TestLbzip2(Bzip2Base):
-
- @pytest.fixture(autouse=True, scope='class')
+ @pytest.fixture(autouse=True, scope="class")
def _setup(self):
try:
- find_binary('lbzip2')
+ find_binary("lbzip2")
except CommandNotFound:
- pytest.skip('lbzip2 binary not found')
+ pytest.skip("lbzip2 binary not found")
importlib.reload(_bzip2)
def test_bad_level(self):
with pytest.raises(ValueError, match='unknown option "-0"'):
- _bzip2.compress_data(self.decompressed_test_data, level=90, parallelize=True)
+ _bzip2.compress_data(
+ self.decompressed_test_data, level=90, parallelize=True
+ )
diff --git a/tests/compression/test_init.py b/tests/compression/test_init.py
index f3a40270..f1fe5bda 100644
--- a/tests/compression/test_init.py
+++ b/tests/compression/test_init.py
@@ -11,78 +11,77 @@ from . import hide_binary
@pytest.mark.skipif(sys.platform == "darwin", reason="darwin fails with bzip2")
class TestArComp:
-
- @pytest.fixture(scope='class')
+ @pytest.fixture(scope="class")
def tar_file(self, tmp_path_factory):
data = tmp_path_factory.mktemp("data")
- (data / 'file1').write_text('Hello world')
- (data / 'file2').write_text('Larry the Cow')
- path = data / 'test 1.tar'
- subprocess.run(['tar', 'cf', str(path), 'file1', 'file2'], cwd=data, check=True)
- (data / 'file1').unlink()
- (data / 'file2').unlink()
+ (data / "file1").write_text("Hello world")
+ (data / "file2").write_text("Larry the Cow")
+ path = data / "test 1.tar"
+ subprocess.run(["tar", "cf", str(path), "file1", "file2"], cwd=data, check=True)
+ (data / "file1").unlink()
+ (data / "file2").unlink()
return str(path)
- @pytest.fixture(scope='class')
+ @pytest.fixture(scope="class")
def tar_bz2_file(self, tar_file):
- subprocess.run(['bzip2', '-z', '-k', tar_file], check=True)
+ subprocess.run(["bzip2", "-z", "-k", tar_file], check=True)
return tar_file + ".bz2"
- @pytest.fixture(scope='class')
+ @pytest.fixture(scope="class")
def tbz2_file(self, tar_bz2_file):
- new_path = tar_bz2_file.replace('.tar.bz2', '.tbz2')
+ new_path = tar_bz2_file.replace(".tar.bz2", ".tbz2")
shutil.copyfile(tar_bz2_file, new_path)
return new_path
- @pytest.fixture(scope='class')
+ @pytest.fixture(scope="class")
def lzma_file(self, tmp_path_factory):
- data = (tmp_path_factory.mktemp("data") / 'test 2.lzma')
- with data.open('wb') as f:
- subprocess.run(['lzma'], check=True, input=b'Hello world', stdout=f)
+ data = tmp_path_factory.mktemp("data") / "test 2.lzma"
+ with data.open("wb") as f:
+ subprocess.run(["lzma"], check=True, input=b"Hello world", stdout=f)
return str(data)
def test_unknown_extenstion(self, tmp_path):
- file = tmp_path / 'test.file'
- with pytest.raises(ArCompError, match='unknown compression file extension'):
- ArComp(file, ext='.foo')
+ file = tmp_path / "test.file"
+ with pytest.raises(ArCompError, match="unknown compression file extension"):
+ ArComp(file, ext=".foo")
def test_missing_tar(self, tmp_path, tar_file):
- with hide_binary('tar'), chdir(tmp_path):
- with pytest.raises(ArCompError, match='required binary not found'):
- ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
+ with hide_binary("tar"), chdir(tmp_path):
+ with pytest.raises(ArCompError, match="required binary not found"):
+ ArComp(tar_file, ext=".tar").unpack(dest=tmp_path)
def test_tar(self, tmp_path, tar_file):
with chdir(tmp_path):
- ArComp(tar_file, ext='.tar').unpack(dest=tmp_path)
- assert (tmp_path / 'file1').read_text() == 'Hello world'
- assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+ ArComp(tar_file, ext=".tar").unpack(dest=tmp_path)
+ assert (tmp_path / "file1").read_text() == "Hello world"
+ assert (tmp_path / "file2").read_text() == "Larry the Cow"
def test_tar_bz2(self, tmp_path, tar_bz2_file):
with chdir(tmp_path):
- ArComp(tar_bz2_file, ext='.tar.bz2').unpack(dest=tmp_path)
- assert (tmp_path / 'file1').read_text() == 'Hello world'
- assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+ ArComp(tar_bz2_file, ext=".tar.bz2").unpack(dest=tmp_path)
+ assert (tmp_path / "file1").read_text() == "Hello world"
+ assert (tmp_path / "file2").read_text() == "Larry the Cow"
def test_tbz2(self, tmp_path, tbz2_file):
with chdir(tmp_path):
- ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
- assert (tmp_path / 'file1').read_text() == 'Hello world'
- assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+ ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path)
+ assert (tmp_path / "file1").read_text() == "Hello world"
+ assert (tmp_path / "file2").read_text() == "Larry the Cow"
def test_fallback_tbz2(self, tmp_path, tbz2_file):
with hide_binary(*next(zip(*_TarBZ2.compress_binary[:-1]))):
with chdir(tmp_path):
- ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
- assert (tmp_path / 'file1').read_text() == 'Hello world'
- assert (tmp_path / 'file2').read_text() == 'Larry the Cow'
+ ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path)
+ assert (tmp_path / "file1").read_text() == "Hello world"
+ assert (tmp_path / "file2").read_text() == "Larry the Cow"
def test_no_fallback_tbz2(self, tmp_path, tbz2_file):
with hide_binary(*next(zip(*_TarBZ2.compress_binary))), chdir(tmp_path):
- with pytest.raises(ArCompError, match='no compression binary'):
- ArComp(tbz2_file, ext='.tbz2').unpack(dest=tmp_path)
+ with pytest.raises(ArCompError, match="no compression binary"):
+ ArComp(tbz2_file, ext=".tbz2").unpack(dest=tmp_path)
def test_lzma(self, tmp_path, lzma_file):
- dest = tmp_path / 'file'
+ dest = tmp_path / "file"
with chdir(tmp_path):
- ArComp(lzma_file, ext='.lzma').unpack(dest=dest)
- assert (dest).read_bytes() == b'Hello world'
+ ArComp(lzma_file, ext=".lzma").unpack(dest=dest)
+ assert (dest).read_bytes() == b"Hello world"
diff --git a/tests/compression/test_xz.py b/tests/compression/test_xz.py
index f8417b30..0af7c645 100644
--- a/tests/compression/test_xz.py
+++ b/tests/compression/test_xz.py
@@ -10,26 +10,26 @@ from . import Base, hide_binary
def test_no_native():
- with hide_imports('lzma'):
+ with hide_imports("lzma"):
importlib.reload(_xz)
assert not _xz.native
def test_missing_xz_binary():
- with hide_binary('xz'):
- with pytest.raises(CommandNotFound, match='xz'):
+ with hide_binary("xz"):
+ with pytest.raises(CommandNotFound, match="xz"):
importlib.reload(_xz)
class XzBase(Base):
- module = 'xz'
- decompressed_test_data = b'Some text here\n' * 2
+ module = "xz"
+ decompressed_test_data = b"Some text here\n" * 2
compressed_test_data = (
- b'\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x04\xc0\x1e\x1e!\x01\x16\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00j\xf6\x947\xe0\x00\x1d\x00\x16]\x00)\x9b\xc9\xa6g'
- b'Bw\x8c\xb3\x9eA\x9a\xbeT\xc9\xfa\xe3\x19\x8f(\x00\x00\x00\x00\x00\x96N'
- b'\xa8\x8ed\xa2WH\x00\x01:\x1e1V \xff\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ'
+ b"\xfd7zXZ\x00\x00\x04\xe6\xd6\xb4F\x04\xc0\x1e\x1e!\x01\x16\x00\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00j\xf6\x947\xe0\x00\x1d\x00\x16]\x00)\x9b\xc9\xa6g"
+ b"Bw\x8c\xb3\x9eA\x9a\xbeT\xc9\xfa\xe3\x19\x8f(\x00\x00\x00\x00\x00\x96N"
+ b"\xa8\x8ed\xa2WH\x00\x01:\x1e1V \xff\x1f\xb6\xf3}\x01\x00\x00\x00\x00\x04YZ"
)
def decompress(self, data: bytes) -> bytes:
@@ -37,20 +37,18 @@ class XzBase(Base):
class TestStdlib(XzBase):
-
- @pytest.fixture(autouse=True, scope='class')
+ @pytest.fixture(autouse=True, scope="class")
def _setup(self):
try:
- find_binary('xz')
+ find_binary("xz")
except CommandNotFound:
- pytest.skip('xz binary not found')
+ pytest.skip("xz binary not found")
importlib.reload(_xz)
class TestXz(XzBase):
-
- @pytest.fixture(autouse=True, scope='class')
+ @pytest.fixture(autouse=True, scope="class")
def _setup(self):
- with hide_imports('lzma'):
+ with hide_imports("lzma"):
importlib.reload(_xz)
yield
diff --git a/tests/test_bash.py b/tests/test_bash.py
index ec9df537..3d2157b8 100644
--- a/tests/test_bash.py
+++ b/tests/test_bash.py
@@ -1,128 +1,138 @@
from io import StringIO
import pytest
-from snakeoil.bash import (BashParseError, iter_read_bash, read_bash,
- read_bash_dict, read_dict)
+from snakeoil.bash import (
+ BashParseError,
+ iter_read_bash,
+ read_bash,
+ read_bash_dict,
+ read_dict,
+)
class TestBashCommentStripping:
-
def test_iter_read_bash(self):
- output = iter_read_bash(StringIO(
- '\n'
- '# hi I am a comment\n'
- 'I am not \n'
- ' asdf # inline comment\n'))
- assert list(output) == ['I am not', 'asdf']
+ output = iter_read_bash(
+ StringIO(
+ "\n" "# hi I am a comment\n" "I am not \n" " asdf # inline comment\n"
+ )
+ )
+ assert list(output) == ["I am not", "asdf"]
- output = iter_read_bash(StringIO(
- 'inline # comment '), allow_inline_comments=False)
- assert list(output) == ['inline # comment']
+ output = iter_read_bash(
+ StringIO("inline # comment "), allow_inline_comments=False
+ )
+ assert list(output) == ["inline # comment"]
def test_iter_read_bash_line_cont(self):
- output = iter_read_bash(StringIO(
- '\n'
- '# hi I am a comment\\\n'
- 'I am not \\\n'
- 'a comment \n'
- ' asdf # inline comment\\\n'),
- allow_line_cont=True)
- assert list(output) == ['I am not a comment', 'asdf']
+ output = iter_read_bash(
+ StringIO(
+ "\n"
+ "# hi I am a comment\\\n"
+ "I am not \\\n"
+ "a comment \n"
+ " asdf # inline comment\\\n"
+ ),
+ allow_line_cont=True,
+ )
+ assert list(output) == ["I am not a comment", "asdf"]
# continuation into inline comment
- output = iter_read_bash(StringIO(
- '\n'
- '# hi I am a comment\n'
- 'I am \\\n'
- 'not a \\\n'
- 'comment # inline comment\n'),
- allow_line_cont=True)
- assert list(output) == ['I am not a comment']
+ output = iter_read_bash(
+ StringIO(
+ "\n"
+ "# hi I am a comment\n"
+ "I am \\\n"
+ "not a \\\n"
+ "comment # inline comment\n"
+ ),
+ allow_line_cont=True,
+ )
+ assert list(output) == ["I am not a comment"]
# ends with continuation
- output = iter_read_bash(StringIO(
- '\n'
- '# hi I am a comment\n'
- 'I am \\\n'
- '\\\n'
- 'not a \\\n'
- 'comment\\\n'
- '\\\n'),
- allow_line_cont=True)
- assert list(output) == ['I am not a comment']
+ output = iter_read_bash(
+ StringIO(
+ "\n"
+ "# hi I am a comment\n"
+ "I am \\\n"
+ "\\\n"
+ "not a \\\n"
+ "comment\\\n"
+ "\\\n"
+ ),
+ allow_line_cont=True,
+ )
+ assert list(output) == ["I am not a comment"]
# embedded comment prefix via continued lines
- output = iter_read_bash(StringIO(
- '\\\n'
- '# comment\\\n'
- ' not a comment\n'
- '\\\n'
- ' # inner comment\n'
- 'also not\\\n'
- '#\\\n'
- 'a comment\n'),
- allow_line_cont=True)
- assert list(output) == ['not a comment', 'also not#a comment']
+ output = iter_read_bash(
+ StringIO(
+ "\\\n"
+ "# comment\\\n"
+ " not a comment\n"
+ "\\\n"
+ " # inner comment\n"
+ "also not\\\n"
+ "#\\\n"
+ "a comment\n"
+ ),
+ allow_line_cont=True,
+ )
+ assert list(output) == ["not a comment", "also not#a comment"]
# Line continuations have to end with \<newline> without any backslash
# before the pattern.
- output = iter_read_bash(StringIO(
- 'I am \\ \n'
- 'not a comment'),
- allow_line_cont=True)
- assert list(output) == ['I am \\', 'not a comment']
- output = iter_read_bash(StringIO(
- '\\\n'
- 'I am \\\\\n'
- 'not a comment'),
- allow_line_cont=True)
- assert list(output) == ['I am \\\\', 'not a comment']
+ output = iter_read_bash(
+ StringIO("I am \\ \n" "not a comment"), allow_line_cont=True
+ )
+ assert list(output) == ["I am \\", "not a comment"]
+ output = iter_read_bash(
+ StringIO("\\\n" "I am \\\\\n" "not a comment"), allow_line_cont=True
+ )
+ assert list(output) == ["I am \\\\", "not a comment"]
def test_read_bash(self):
- output = read_bash(StringIO(
- '\n'
- '# hi I am a comment\n'
- 'I am not\n'))
- assert output == ['I am not']
+ output = read_bash(StringIO("\n" "# hi I am a comment\n" "I am not\n"))
+ assert output == ["I am not"]
class TestReadDictConfig:
-
def test_read_dict(self):
- bash_dict = read_dict(StringIO(
- '\n'
- '# hi I am a comment\n'
- 'foo1=bar\n'
- 'foo2="bar"\n'
- 'foo3=\'bar"\n'))
+ bash_dict = read_dict(
+ StringIO(
+ "\n" "# hi I am a comment\n" "foo1=bar\n" 'foo2="bar"\n' "foo3='bar\"\n"
+ )
+ )
assert bash_dict == {
- 'foo1': 'bar',
- 'foo2': 'bar',
- 'foo3': '\'bar"',
- }
- assert read_dict(['foo=bar'], source_isiter=True) == {'foo': 'bar'}
+ "foo1": "bar",
+ "foo2": "bar",
+ "foo3": "'bar\"",
+ }
+ assert read_dict(["foo=bar"], source_isiter=True) == {"foo": "bar"}
with pytest.raises(BashParseError):
- read_dict(['invalid'], source_isiter=True)
+ read_dict(["invalid"], source_isiter=True)
- bash_dict = read_dict(StringIO("foo bar\nfoo2 bar\nfoo3\tbar\n"), splitter=None)
- assert bash_dict == dict.fromkeys(('foo', 'foo2', 'foo3'), 'bar')
- bash_dict = read_dict(['foo = blah', 'foo2= blah ', 'foo3=blah'], strip=True)
- assert bash_dict == dict.fromkeys(('foo', 'foo2', 'foo3'), 'blah')
+ bash_dict = read_dict(
+ StringIO("foo bar\nfoo2 bar\nfoo3\tbar\n"), splitter=None
+ )
+ assert bash_dict == dict.fromkeys(("foo", "foo2", "foo3"), "bar")
+ bash_dict = read_dict(["foo = blah", "foo2= blah ", "foo3=blah"], strip=True)
+ assert bash_dict == dict.fromkeys(("foo", "foo2", "foo3"), "blah")
class TestReadBashDict:
-
@pytest.fixture(autouse=True)
def _setup(self, tmp_path):
self.valid_file = tmp_path / "valid"
self.valid_file.write_text(
- '# hi I am a comment\n'
- 'foo1=bar\n'
+ "# hi I am a comment\n"
+ "foo1=bar\n"
"foo2='bar'\n"
'foo3="bar"\n'
- 'foo4=-/:j4\n'
- 'foo5=\n'
+ "foo4=-/:j4\n"
+ "foo5=\n"
'export foo6="bar"\n'
)
self.sourcing_file = tmp_path / "sourcing"
@@ -131,18 +141,13 @@ class TestReadBashDict:
self.sourcing_file2.write_text(f'source "{self.valid_file}"\n')
self.advanced_file = tmp_path / "advanced"
self.advanced_file.write_text(
- 'one1=1\n'
- 'one_=$one1\n'
- 'two1=2\n'
- 'two_=${two1}\n'
+ "one1=1\n" "one_=$one1\n" "two1=2\n" "two_=${two1}\n"
)
self.env_file = tmp_path / "env"
- self.env_file.write_text('imported=${external}\n')
+ self.env_file.write_text("imported=${external}\n")
self.escaped_file = tmp_path / "escaped"
self.escaped_file.write_text(
- 'end=bye\n'
- 'quoteddollar="\\${dollar}"\n'
- 'quotedexpansion="\\${${end}}"\n'
+ "end=bye\n" 'quoteddollar="\\${dollar}"\n' 'quotedexpansion="\\${${end}}"\n'
)
self.unclosed_file = tmp_path / "unclosed"
self.unclosed_file.write_text('foo="bar')
@@ -151,19 +156,19 @@ class TestReadBashDict:
try:
return read_bash_dict(handle, *args, **kwds)
finally:
- if hasattr(handle, 'close'):
+ if hasattr(handle, "close"):
handle.close()
def test_read_bash_dict(self):
# TODO this is not even close to complete
bash_dict = self.invoke_and_close(str(self.valid_file))
d = {
- 'foo1': 'bar',
- 'foo2': 'bar',
- 'foo3': 'bar',
- 'foo4': '-/:j4',
- 'foo5': '',
- 'foo6': 'bar',
+ "foo1": "bar",
+ "foo2": "bar",
+ "foo3": "bar",
+ "foo4": "-/:j4",
+ "foo5": "",
+ "foo6": "bar",
}
assert bash_dict == d
@@ -171,59 +176,81 @@ class TestReadBashDict:
self.invoke_and_close(StringIO("a=b\ny='"))
def test_var_read(self):
- assert self.invoke_and_close(StringIO("x=y@a\n")) == {'x': 'y@a'}
- assert self.invoke_and_close(StringIO("x=y~a\n")) == {'x': 'y~a'}
- assert self.invoke_and_close(StringIO("x=y^a\n")) == {'x': 'y^a'}
- assert self.invoke_and_close(StringIO('x="\nasdf\nfdsa"')) == {'x': '\nasdf\nfdsa'}
+ assert self.invoke_and_close(StringIO("x=y@a\n")) == {"x": "y@a"}
+ assert self.invoke_and_close(StringIO("x=y~a\n")) == {"x": "y~a"}
+ assert self.invoke_and_close(StringIO("x=y^a\n")) == {"x": "y^a"}
+ assert self.invoke_and_close(StringIO('x="\nasdf\nfdsa"')) == {
+ "x": "\nasdf\nfdsa"
+ }
def test_empty_assign(self):
self.valid_file.write_text("foo=\ndar=blah\n")
- assert self.invoke_and_close(str(self.valid_file)) == {'foo': '', 'dar': 'blah'}
+ assert self.invoke_and_close(str(self.valid_file)) == {"foo": "", "dar": "blah"}
self.valid_file.write_text("foo=\ndar=\n")
- assert self.invoke_and_close(str(self.valid_file)) == {'foo': '', 'dar': ''}
+ assert self.invoke_and_close(str(self.valid_file)) == {"foo": "", "dar": ""}
self.valid_file.write_text("foo=blah\ndar=\n")
- assert self.invoke_and_close(str(self.valid_file)) == {'foo': 'blah', 'dar': ''}
+ assert self.invoke_and_close(str(self.valid_file)) == {"foo": "blah", "dar": ""}
def test_quoting(self):
- assert self.invoke_and_close(StringIO("x='y \\\na'")) == {'x': 'y \\\na'}
- assert self.invoke_and_close(StringIO("x='y'a\n")) == {'x': "ya"}
- assert self.invoke_and_close(StringIO('x="y \\\nasdf"')) == {'x': 'y asdf'}
+ assert self.invoke_and_close(StringIO("x='y \\\na'")) == {"x": "y \\\na"}
+ assert self.invoke_and_close(StringIO("x='y'a\n")) == {"x": "ya"}
+ assert self.invoke_and_close(StringIO('x="y \\\nasdf"')) == {"x": "y asdf"}
def test_eof_without_newline(self):
- assert self.invoke_and_close(StringIO("x=y")) == {'x': 'y'}
- assert self.invoke_and_close(StringIO("x='y'a")) == {'x': 'ya'}
+ assert self.invoke_and_close(StringIO("x=y")) == {"x": "y"}
+ assert self.invoke_and_close(StringIO("x='y'a")) == {"x": "ya"}
def test_sourcing(self):
- output = self.invoke_and_close(str(self.sourcing_file), sourcing_command='source')
- expected = {'foo1': 'bar', 'foo2': 'bar', 'foo3': 'bar', 'foo4': '-/:j4', 'foo5': '', 'foo6': 'bar'}
+ output = self.invoke_and_close(
+ str(self.sourcing_file), sourcing_command="source"
+ )
+ expected = {
+ "foo1": "bar",
+ "foo2": "bar",
+ "foo3": "bar",
+ "foo4": "-/:j4",
+ "foo5": "",
+ "foo6": "bar",
+ }
assert output == expected
- output = self.invoke_and_close(str(self.sourcing_file2), sourcing_command='source')
- expected = {'foo1': 'bar', 'foo2': 'bar', 'foo3': 'bar', 'foo4': '-/:j4', 'foo5': '', 'foo6': 'bar'}
+ output = self.invoke_and_close(
+ str(self.sourcing_file2), sourcing_command="source"
+ )
+ expected = {
+ "foo1": "bar",
+ "foo2": "bar",
+ "foo3": "bar",
+ "foo4": "-/:j4",
+ "foo5": "",
+ "foo6": "bar",
+ }
assert output == expected
def test_read_advanced(self):
output = self.invoke_and_close(str(self.advanced_file))
expected = {
- 'one1': '1',
- 'one_': '1',
- 'two1': '2',
- 'two_': '2',
+ "one1": "1",
+ "one_": "1",
+ "two1": "2",
+ "two_": "2",
}
assert output == expected
def test_env(self):
- assert self.invoke_and_close(str(self.env_file)) == {'imported': ''}
- env = {'external': 'imported foo'}
+ assert self.invoke_and_close(str(self.env_file)) == {"imported": ""}
+ env = {"external": "imported foo"}
env_backup = env.copy()
- assert self.invoke_and_close(str(self.env_file), env) == {'imported': 'imported foo'}
+ assert self.invoke_and_close(str(self.env_file), env) == {
+ "imported": "imported foo"
+ }
assert env_backup == env
def test_escaping(self):
output = self.invoke_and_close(str(self.escaped_file))
expected = {
- 'end': 'bye',
- 'quoteddollar': '${dollar}',
- 'quotedexpansion': '${bye}',
+ "end": "bye",
+ "quoteddollar": "${dollar}",
+ "quotedexpansion": "${bye}",
}
assert output == expected
diff --git a/tests/test_caching.py b/tests/test_caching.py
index eaa5014c..06615d38 100644
--- a/tests/test_caching.py
+++ b/tests/test_caching.py
@@ -5,17 +5,20 @@ from snakeoil.caching import WeakInstMeta
class weak_slotted(metaclass=WeakInstMeta):
__inst_caching__ = True
- __slots__ = ('one',)
+ __slots__ = ("one",)
class weak_inst(metaclass=WeakInstMeta):
__inst_caching__ = True
counter = 0
+
def __new__(cls, *args, **kwargs):
cls.counter += 1
return object.__new__(cls)
+
def __init__(self, *args, **kwargs):
pass
+
@classmethod
def reset(cls):
cls.counter = 0
@@ -34,7 +37,6 @@ class reenabled_weak_inst(automatic_disabled_weak_inst):
class TestWeakInstMeta:
-
def test_reuse(self, kls=weak_inst):
kls.reset()
o = kls()
@@ -99,8 +101,8 @@ class TestWeakInstMeta:
# (RaisingHashFor...).
# UserWarning is ignored and everything other warning is an error.
- @pytest.mark.filterwarnings('ignore::UserWarning')
- @pytest.mark.filterwarnings('error')
+ @pytest.mark.filterwarnings("ignore::UserWarning")
+ @pytest.mark.filterwarnings("error")
def test_uncachable(self):
weak_inst.reset()
@@ -108,21 +110,24 @@ class TestWeakInstMeta:
class RaisingHashForTestUncachable:
def __init__(self, error):
self.error = error
+
def __hash__(self):
raise self.error
assert weak_inst([]) is not weak_inst([])
assert weak_inst.counter == 2
for x in (TypeError, NotImplementedError):
- assert weak_inst(RaisingHashForTestUncachable(x)) is not \
- weak_inst(RaisingHashForTestUncachable(x))
+ assert weak_inst(RaisingHashForTestUncachable(x)) is not weak_inst(
+ RaisingHashForTestUncachable(x)
+ )
- @pytest.mark.filterwarnings('error::UserWarning')
+ @pytest.mark.filterwarnings("error::UserWarning")
def test_uncachable_warning_msg(self):
# This name is *important*, see above.
class RaisingHashForTestUncachableWarnings:
def __init__(self, error):
self.error = error
+
def __hash__(self):
raise self.error
@@ -134,6 +139,7 @@ class TestWeakInstMeta:
class BrokenHash:
def __hash__(self):
return 1
+
assert weak_inst(BrokenHash()) is not weak_inst(BrokenHash())
def test_weak_slot(self):
@@ -148,7 +154,7 @@ class TestWeakInstMeta:
# The actual test is that the class definition works.
class ExistingWeakrefSlot:
__inst_caching__ = True
- __slots__ = ('one', '__weakref__')
+ __slots__ = ("one", "__weakref__")
assert ExistingWeakrefSlot()
diff --git a/tests/test_chksum.py b/tests/test_chksum.py
index 016e3f73..b4c1ab24 100644
--- a/tests/test_chksum.py
+++ b/tests/test_chksum.py
@@ -3,15 +3,16 @@ from snakeoil import chksum
class Test_funcs:
-
def setup_method(self, method):
chksum.__inited__ = False
chksum.chksum_types.clear()
self._saved_init = chksum.init
self._inited_count = 0
+
def f():
self._inited_count += 1
chksum.__inited__ = True
+
chksum.init = f
# ensure we aren't mangling chksum state for other tests.
@@ -41,4 +42,3 @@ class Test_funcs:
assert chksum.get_handler("x") == 1
assert chksum.get_handler("y") == 2
assert self._inited_count == 1
-
diff --git a/tests/test_chksum_defaults.py b/tests/test_chksum_defaults.py
index 7f867d8a..a22d339c 100644
--- a/tests/test_chksum_defaults.py
+++ b/tests/test_chksum_defaults.py
@@ -14,14 +14,15 @@ def require_chf(func):
def subfunc(self):
if self.chf is None:
pytest.skip(
- 'no handler for %s, do you need to install PyCrypto or mhash?'
- % (self.chf_type,))
+ "no handler for %s, do you need to install PyCrypto or mhash?"
+ % (self.chf_type,)
+ )
func(self)
+
return subfunc
class base:
-
def get_chf(self):
try:
self.chf = chksum.get_handler(self.chf_type)
@@ -53,14 +54,17 @@ class base:
@require_chf
def test_data_source_check(self):
assert self.chf(local_source(self.fn)) == self.expected_long
- assert self.chf(data_source(fileutils.readfile_ascii(self.fn))) == self.expected_long
+ assert (
+ self.chf(data_source(fileutils.readfile_ascii(self.fn)))
+ == self.expected_long
+ )
-class ChksumTest(base):
+class ChksumTest(base):
@require_chf
def test_str2long(self):
assert self.chf.str2long(self.expected_str) == self.expected_long
- if self.chf_type == 'size':
+ if self.chf_type == "size":
return
for x in extra_chksums.get(self.chf_type, ()):
assert self.chf.str2long(x) == int(x, 16)
@@ -68,11 +72,12 @@ class ChksumTest(base):
@require_chf
def test_long2str(self):
assert self.chf.long2str(self.expected_long) == self.expected_str
- if self.chf_type == 'size':
+ if self.chf_type == "size":
return
for x in extra_chksums.get(self.chf_type, ()):
assert self.chf.long2str(int(x == 16)), x
+
checksums = {
"rmd160": "b83ad488d624e7911f886420ab230f78f6368b9f",
"sha1": "63cd8cce8a1773dffb400ee184be3ec7d89791f5",
@@ -87,22 +92,22 @@ checksums = {
checksums.update((k, (int(v, 16), v)) for k, v in checksums.items())
checksums["size"] = (int(len(data) * multi), str(int(len(data) * multi)))
-extra_chksums = {
- "md5":
- ["2dfd84279314a178d0fa842af3a40e25577e1bc"]
-}
+extra_chksums = {"md5": ["2dfd84279314a178d0fa842af3a40e25577e1bc"]}
for k, v in checksums.items():
- extra_chksums.setdefault(k, []).extend((''.rjust(len(v[1]), '0'), '01'.rjust(len(v[1]), '0')))
+ extra_chksums.setdefault(k, []).extend(
+ ("".rjust(len(v[1]), "0"), "01".rjust(len(v[1]), "0"))
+ )
# trick: create subclasses for each checksum with a useful class name.
for chf_type, expected in checksums.items():
expectedsum = expected[0]
expectedstr = expected[1]
- globals()['TestChksum' + chf_type.capitalize()] = type(
- 'TestChksum' + chf_type.capitalize(),
+ globals()["TestChksum" + chf_type.capitalize()] = type(
+ "TestChksum" + chf_type.capitalize(),
(ChksumTest,),
- dict(chf_type=chf_type, expected_long=expectedsum, expected_str=expectedstr))
+ dict(chf_type=chf_type, expected_long=expectedsum, expected_str=expectedstr),
+ )
# pylint: disable=undefined-loop-variable
del chf_type, expected
@@ -110,7 +115,7 @@ del chf_type, expected
class TestGetChksums(base):
- chfs = [k for k in sorted(checksums) if k in ('md5', 'sha1')]
+ chfs = [k for k in sorted(checksums) if k in ("md5", "sha1")]
expected_long = [checksums[k][0] for k in chfs]
def get_chf(self):
diff --git a/tests/test_constraints.py b/tests/test_constraints.py
index 5e938a9e..15d360c2 100644
--- a/tests/test_constraints.py
+++ b/tests/test_constraints.py
@@ -2,61 +2,75 @@ import pytest
from snakeoil.constraints import Problem
+
def any_of(**kwargs):
return any(kwargs.values())
+
def all_of(**kwargs):
return all(kwargs.values())
+
def test_readd_variables():
p = Problem()
- p.add_variable((True, False), 'x', 'y')
+ p.add_variable((True, False), "x", "y")
with pytest.raises(AssertionError, match="variable 'y' was already added"):
- p.add_variable((True, False), 'y', 'z')
+ p.add_variable((True, False), "y", "z")
+
def test_constraint_unknown_variable():
p = Problem()
- p.add_variable((True, False), 'x', 'y')
+ p.add_variable((True, False), "x", "y")
with pytest.raises(AssertionError, match="unknown variable 'z'"):
- p.add_constraint(any_of, ('y', 'z'))
+ p.add_constraint(any_of, ("y", "z"))
+
def test_empty_problem():
p = Problem()
- assert tuple(p) == ({}, )
+ assert tuple(p) == ({},)
+
def test_empty_constraints():
p = Problem()
- p.add_variable((True, False), 'x', 'y')
- p.add_variable((True, ), 'z')
+ p.add_variable((True, False), "x", "y")
+ p.add_variable((True,), "z")
assert len(tuple(p)) == 4
+
def test_domain_prefer_later():
p = Problem()
- p.add_variable((False, True), 'x', 'y')
- p.add_constraint(any_of, ('x', 'y'))
- assert next(iter(p)) == {'x': True, 'y': True}
+ p.add_variable((False, True), "x", "y")
+ p.add_constraint(any_of, ("x", "y"))
+ assert next(iter(p)) == {"x": True, "y": True}
+
def test_constraint_single_variable():
p = Problem()
- p.add_variable((True, False), 'x', 'y')
- p.add_constraint(lambda x: x, ('x', ))
- p.add_constraint(lambda y: not y, ('y', ))
- assert tuple(p) == ({'x': True, 'y': False}, )
+ p.add_variable((True, False), "x", "y")
+ p.add_constraint(lambda x: x, ("x",))
+ p.add_constraint(lambda y: not y, ("y",))
+ assert tuple(p) == ({"x": True, "y": False},)
+
def test_no_solution():
p = Problem()
- p.add_variable((True, ), 'x')
- p.add_variable((True, False), 'y', 'z')
- p.add_constraint(lambda x, y: not x or y, ('x', 'y'))
- p.add_constraint(lambda y, z: not y or not z, ('y', 'z'))
- p.add_constraint(lambda x, z: not x or z, ('x', 'z'))
+ p.add_variable((True,), "x")
+ p.add_variable((True, False), "y", "z")
+ p.add_constraint(lambda x, y: not x or y, ("x", "y"))
+ p.add_constraint(lambda y, z: not y or not z, ("y", "z"))
+ p.add_constraint(lambda x, z: not x or z, ("x", "z"))
assert not tuple(p)
+
def test_forward_check():
p = Problem()
- p.add_variable(range(2, 10), 'x', 'y', 'z')
- p.add_constraint(lambda x, y: (x + y) % 2 == 0, ('x', 'y'))
- p.add_constraint(lambda x, y, z: (x * y * z) % 2 != 0, ('x', 'y', 'z'))
- p.add_constraint(lambda y, z: y < z, ('y', 'z'))
- p.add_constraint(lambda z, x: x ** 2 <= z, ('x', 'z'))
- assert tuple(p) == ({'x': 3, 'y': 7, 'z': 9}, {'x': 3, 'y': 5, 'z': 9}, {'x': 3, 'y': 3, 'z': 9})
+ p.add_variable(range(2, 10), "x", "y", "z")
+ p.add_constraint(lambda x, y: (x + y) % 2 == 0, ("x", "y"))
+ p.add_constraint(lambda x, y, z: (x * y * z) % 2 != 0, ("x", "y", "z"))
+ p.add_constraint(lambda y, z: y < z, ("y", "z"))
+ p.add_constraint(lambda z, x: x**2 <= z, ("x", "z"))
+ assert tuple(p) == (
+ {"x": 3, "y": 7, "z": 9},
+ {"x": 3, "y": 5, "z": 9},
+ {"x": 3, "y": 3, "z": 9},
+ )
diff --git a/tests/test_containers.py b/tests/test_containers.py
index f6940c90..9df32588 100644
--- a/tests/test_containers.py
+++ b/tests/test_containers.py
@@ -5,7 +5,6 @@ from snakeoil import containers
class TestInvertedContains:
-
def setup_method(self, method):
self.set = containers.InvertedContains(range(12))
@@ -17,7 +16,7 @@ class TestInvertedContains:
class BasicSet(containers.SetMixin):
- __slots__ = ('_data',)
+ __slots__ = ("_data",)
def __init__(self, data):
self._data = set(data)
@@ -28,7 +27,7 @@ class BasicSet(containers.SetMixin):
def __contains__(self, other):
return other in self._data
- #def __str__(self):
+ # def __str__(self):
# return 'BasicSet([%s])' % ', '.join((str(x) for x in self._data))
def __eq__(self, other):
@@ -43,7 +42,6 @@ class BasicSet(containers.SetMixin):
class TestSetMethods:
-
def test_and(self):
c = BasicSet(range(100))
s = set(range(25, 75))
@@ -80,8 +78,8 @@ class TestSetMethods:
assert c - s == r1
assert s - c == r2
-class TestLimitedChangeSet:
+class TestLimitedChangeSet:
def setup_method(self, method):
self.set = containers.LimitedChangeSet(range(12))
@@ -89,17 +87,18 @@ class TestLimitedChangeSet:
def f(val):
assert isinstance(val, int)
return val
+
self.set = containers.LimitedChangeSet(range(12), key_validator=f)
self.set.add(13)
self.set.add(14)
self.set.remove(11)
assert 5 in self.set
with pytest.raises(AssertionError):
- self.set.add('2')
+ self.set.add("2")
with pytest.raises(AssertionError):
- self.set.remove('2')
+ self.set.remove("2")
with pytest.raises(AssertionError):
- self.set.__contains__('2')
+ self.set.__contains__("2")
def test_basic(self, changes=0):
# this should be a no-op
@@ -188,7 +187,7 @@ class TestLimitedChangeSet:
assert sorted(list(self.set)) == list(range(-1, 13))
def test_str(self):
- assert str(containers.LimitedChangeSet([7])) == 'LimitedChangeSet([7])'
+ assert str(containers.LimitedChangeSet([7])) == "LimitedChangeSet([7])"
def test__eq__(self):
c = containers.LimitedChangeSet(range(99))
@@ -199,7 +198,6 @@ class TestLimitedChangeSet:
class TestLimitedChangeSetWithBlacklist:
-
def setup_method(self, method):
self.set = containers.LimitedChangeSet(range(12), [3, 13])
@@ -222,7 +220,6 @@ class TestLimitedChangeSetWithBlacklist:
class TestProtectedSet:
-
def setup_method(self, method):
self.set = containers.ProtectedSet(set(range(12)))
diff --git a/tests/test_contexts.py b/tests/test_contexts.py
index 219d5ee8..be212a23 100644
--- a/tests/test_contexts.py
+++ b/tests/test_contexts.py
@@ -44,9 +44,10 @@ def test_syspath(tmpdir):
assert mangled_syspath == tuple(sys.path)
-@pytest.mark.skip(reason='this currently breaks on github ci, https://github.com/pkgcore/snakeoil/issues/68')
+@pytest.mark.skip(
+ reason="this currently breaks on github ci, https://github.com/pkgcore/snakeoil/issues/68"
+)
class TestSplitExec:
-
def test_context_process(self):
# code inside the with statement is run in a separate process
pid = os.getpid()
@@ -77,9 +78,9 @@ class TestSplitExec:
b = 3
# changes to locals aren't propagated back
assert a == 1
- assert 'b' not in locals()
+ assert "b" not in locals()
# but they're accessible via the 'locals' attr
- expected = {'a': 2, 'b': 3}
+ expected = {"a": 2, "b": 3}
for k, v in expected.items():
assert c.locals[k] == v
@@ -87,20 +88,21 @@ class TestSplitExec:
with SplitExec() as c:
func = lambda x: x
from sys import implementation
+
a = 4
- assert c.locals == {'a': 4}
+ assert c.locals == {"a": 4}
def test_context_exceptions(self):
# exceptions in the child process are sent back to the parent and re-raised
with pytest.raises(IOError) as e:
with SplitExec() as c:
- raise IOError(errno.EBUSY, 'random error')
+ raise IOError(errno.EBUSY, "random error")
assert e.value.errno == errno.EBUSY
def test_child_setup_raises_exception(self):
class ChildSetupException(SplitExec):
def _child_setup(self):
- raise IOError(errno.EBUSY, 'random error')
+ raise IOError(errno.EBUSY, "random error")
with pytest.raises(IOError) as e:
with ChildSetupException() as c:
@@ -108,26 +110,33 @@ class TestSplitExec:
assert e.value.errno == errno.EBUSY
-@pytest.mark.skipif(not sys.platform.startswith('linux'), reason='supported on Linux only')
-@pytest.mark.xfail(platform.python_implementation() == "PyPy", reason='Fails on PyPy')
+@pytest.mark.skipif(
+ not sys.platform.startswith("linux"), reason="supported on Linux only"
+)
+@pytest.mark.xfail(platform.python_implementation() == "PyPy", reason="Fails on PyPy")
class TestNamespace:
-
- @pytest.mark.skipif(not os.path.exists('/proc/self/ns/user'),
- reason='user namespace support required')
+ @pytest.mark.skipif(
+ not os.path.exists("/proc/self/ns/user"),
+ reason="user namespace support required",
+ )
def test_user_namespace(self):
try:
with Namespace(user=True) as ns:
assert os.getuid() == 0
except PermissionError:
- pytest.skip('No permission to use user namespace')
-
- @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/user') and os.path.exists('/proc/self/ns/uts')),
- reason='user and uts namespace support required')
+ pytest.skip("No permission to use user namespace")
+
+ @pytest.mark.skipif(
+ not (
+ os.path.exists("/proc/self/ns/user") and os.path.exists("/proc/self/ns/uts")
+ ),
+ reason="user and uts namespace support required",
+ )
def test_uts_namespace(self):
try:
- with Namespace(user=True, uts=True, hostname='host') as ns:
- ns_hostname, _, ns_domainname = socket.getfqdn().partition('.')
- assert ns_hostname == 'host'
- assert ns_domainname == ''
+ with Namespace(user=True, uts=True, hostname="host") as ns:
+ ns_hostname, _, ns_domainname = socket.getfqdn().partition(".")
+ assert ns_hostname == "host"
+ assert ns_domainname == ""
except PermissionError:
- pytest.skip('No permission to use user and uts namespace')
+ pytest.skip("No permission to use user and uts namespace")
diff --git a/tests/test_currying.py b/tests/test_currying.py
index d14f0fcc..7f8618fc 100644
--- a/tests/test_currying.py
+++ b/tests/test_currying.py
@@ -5,8 +5,10 @@ from snakeoil import currying
def passthrough(*args, **kwargs):
return args, kwargs
+
# docstring is part of the test
+
def documented():
"""original docstring"""
@@ -18,36 +20,37 @@ class TestPreCurry:
def test_pre_curry(self):
noop = self.pre_curry(passthrough)
assert noop() == ((), {})
- assert noop('foo', 'bar') == (('foo', 'bar'), {})
- assert noop(foo='bar') == ((), {'foo': 'bar'})
- assert noop('foo', bar='baz') == (('foo',), {'bar': 'baz'})
+ assert noop("foo", "bar") == (("foo", "bar"), {})
+ assert noop(foo="bar") == ((), {"foo": "bar"})
+ assert noop("foo", bar="baz") == (("foo",), {"bar": "baz"})
one_arg = self.pre_curry(passthrough, 42)
assert one_arg() == ((42,), {})
- assert one_arg('foo', 'bar') == ((42, 'foo', 'bar'), {})
- assert one_arg(foo='bar') == ((42,), {'foo': 'bar'})
- assert one_arg('foo', bar='baz') == ((42, 'foo'), {'bar': 'baz'})
+ assert one_arg("foo", "bar") == ((42, "foo", "bar"), {})
+ assert one_arg(foo="bar") == ((42,), {"foo": "bar"})
+ assert one_arg("foo", bar="baz") == ((42, "foo"), {"bar": "baz"})
keyword_arg = self.pre_curry(passthrough, foo=42)
- assert keyword_arg() == ((), {'foo': 42})
- assert keyword_arg('foo', 'bar') == (('foo', 'bar'), {'foo': 42})
- assert keyword_arg(foo='bar') == ((), {'foo': 'bar'})
- assert keyword_arg('foo', bar='baz') == (('foo',), {'bar': 'baz', 'foo': 42})
+ assert keyword_arg() == ((), {"foo": 42})
+ assert keyword_arg("foo", "bar") == (("foo", "bar"), {"foo": 42})
+ assert keyword_arg(foo="bar") == ((), {"foo": "bar"})
+ assert keyword_arg("foo", bar="baz") == (("foo",), {"bar": "baz", "foo": 42})
both = self.pre_curry(passthrough, 42, foo=42)
- assert both() == ((42,), {'foo': 42})
- assert both('foo', 'bar') == ((42, 'foo', 'bar'), {'foo': 42})
- assert both(foo='bar') == ((42,), {'foo': 'bar'})
- assert both('foo', bar='baz') == ((42, 'foo'), {'bar': 'baz', 'foo': 42})
+ assert both() == ((42,), {"foo": 42})
+ assert both("foo", "bar") == ((42, "foo", "bar"), {"foo": 42})
+ assert both(foo="bar") == ((42,), {"foo": "bar"})
+ assert both("foo", bar="baz") == ((42, "foo"), {"bar": "baz", "foo": 42})
def test_curry_original(self):
assert self.pre_curry(passthrough).func is passthrough
def test_instancemethod(self):
class Test:
- method = self.pre_curry(passthrough, 'test')
+ method = self.pre_curry(passthrough, "test")
+
test = Test()
- assert (('test', test), {}) == test.method()
+ assert (("test", test), {}) == test.method()
class Test_pretty_docs:
@@ -56,58 +59,63 @@ class Test_pretty_docs:
def test_module_magic(self):
for target in self.currying_targets:
- assert currying.pretty_docs(target(passthrough)).__module__ is \
- passthrough.__module__
+ assert (
+ currying.pretty_docs(target(passthrough)).__module__
+ is passthrough.__module__
+ )
# test is kinda useless if they are identical without pretty_docs
- assert getattr(target(passthrough), '__module__', None) is not \
- passthrough.__module__
+ assert (
+ getattr(target(passthrough), "__module__", None)
+ is not passthrough.__module__
+ )
def test_pretty_docs(self):
for target in self.currying_targets:
for func in (passthrough, documented):
- assert currying.pretty_docs(target(func), 'new doc').__doc__ == 'new doc'
+ assert (
+ currying.pretty_docs(target(func), "new doc").__doc__ == "new doc"
+ )
assert currying.pretty_docs(target(func)).__doc__ is func.__doc__
class TestPostCurry:
-
def test_post_curry(self):
noop = currying.post_curry(passthrough)
assert noop() == ((), {})
- assert noop('foo', 'bar') == (('foo', 'bar'), {})
- assert noop(foo='bar') == ((), {'foo': 'bar'})
- assert noop('foo', bar='baz') == (('foo',), {'bar': 'baz'})
+ assert noop("foo", "bar") == (("foo", "bar"), {})
+ assert noop(foo="bar") == ((), {"foo": "bar"})
+ assert noop("foo", bar="baz") == (("foo",), {"bar": "baz"})
one_arg = currying.post_curry(passthrough, 42)
assert one_arg() == ((42,), {})
- assert one_arg('foo', 'bar') == (('foo', 'bar', 42), {})
- assert one_arg(foo='bar') == ((42,), {'foo': 'bar'})
- assert one_arg('foo', bar='baz') == (('foo', 42), {'bar': 'baz'})
+ assert one_arg("foo", "bar") == (("foo", "bar", 42), {})
+ assert one_arg(foo="bar") == ((42,), {"foo": "bar"})
+ assert one_arg("foo", bar="baz") == (("foo", 42), {"bar": "baz"})
keyword_arg = currying.post_curry(passthrough, foo=42)
- assert keyword_arg() == ((), {'foo': 42})
- assert keyword_arg('foo', 'bar') == (('foo', 'bar'), {'foo': 42})
- assert keyword_arg(foo='bar') == ((), {'foo': 42})
- assert keyword_arg('foo', bar='baz') == (('foo',), {'bar': 'baz', 'foo': 42})
+ assert keyword_arg() == ((), {"foo": 42})
+ assert keyword_arg("foo", "bar") == (("foo", "bar"), {"foo": 42})
+ assert keyword_arg(foo="bar") == ((), {"foo": 42})
+ assert keyword_arg("foo", bar="baz") == (("foo",), {"bar": "baz", "foo": 42})
both = currying.post_curry(passthrough, 42, foo=42)
- assert both() == ((42,), {'foo': 42})
- assert both('foo', 'bar') == (('foo', 'bar', 42), {'foo': 42})
- assert both(foo='bar') == ((42,), {'foo': 42})
- assert both('foo', bar='baz') == (('foo', 42), {'bar': 'baz', 'foo': 42})
+ assert both() == ((42,), {"foo": 42})
+ assert both("foo", "bar") == (("foo", "bar", 42), {"foo": 42})
+ assert both(foo="bar") == ((42,), {"foo": 42})
+ assert both("foo", bar="baz") == (("foo", 42), {"bar": "baz", "foo": 42})
def test_curry_original(self):
assert currying.post_curry(passthrough).func is passthrough
def test_instancemethod(self):
class Test:
- method = currying.post_curry(passthrough, 'test')
+ method = currying.post_curry(passthrough, "test")
+
test = Test()
- assert ((test, 'test'), {}) == test.method()
+ assert ((test, "test"), {}) == test.method()
class Test_wrap_exception:
-
def test_wrap_exception_complex(self):
inner, outer = [], []
@@ -118,33 +126,33 @@ class Test_wrap_exception:
assert isinstance(exception, inner_exception)
assert functor is throwing_func
assert fargs == (False,)
- assert fkwds == {'monkey': 'bone'}
+ assert fkwds == {"monkey": "bone"}
outer.append(True)
raise wrapping_exception()
def throwing_func(*args, **kwds):
assert args == (False,)
- assert kwds == {'monkey': 'bone'}
+ assert kwds == {"monkey": "bone"}
inner.append(True)
raise inner_exception()
func = currying.wrap_exception_complex(f, IndexError)(throwing_func)
# basic behaviour
- pytest.raises(IndexError, func, False, monkey='bone')
+ pytest.raises(IndexError, func, False, monkey="bone")
assert len(inner) == 1
assert len(outer) == 1
# ensure pass thru if it's an allowed exception
inner_exception = IndexError
- pytest.raises(IndexError, func, False, monkey='bone')
+ pytest.raises(IndexError, func, False, monkey="bone")
assert len(inner) == 2
assert len(outer) == 1
# finally, ensure it doesn't intercept, and passes thru for
# exceptions it shouldn't handle
inner_exception = MemoryError
- pytest.raises(MemoryError, func, False, monkey='bone')
+ pytest.raises(MemoryError, func, False, monkey="bone")
assert len(inner) == 3
assert len(outer) == 1
@@ -159,9 +167,10 @@ class Test_wrap_exception:
self.args = args
self.kwds = kwds
- func = currying.wrap_exception(my_exception, 1, 3, 2, monkey='bone',
- ignores=ValueError)(throwing_func)
- assert func.__name__ == 'throwing_func'
+ func = currying.wrap_exception(
+ my_exception, 1, 3, 2, monkey="bone", ignores=ValueError
+ )(throwing_func)
+ assert func.__name__ == "throwing_func"
pytest.raises(ValueError, func)
throw_kls = IndexError
pytest.raises(my_exception, func)
@@ -170,17 +179,23 @@ class Test_wrap_exception:
raise AssertionError("shouldn't have been able to reach here")
except my_exception as e:
assert e.args == (1, 3, 2)
- assert e.kwds == {'monkey': 'bone'}
+ assert e.kwds == {"monkey": "bone"}
# finally, verify that the exception can be pased in.
func = currying.wrap_exception(
- my_exception, 1, 3, 2, monkey='bone',
- ignores=ValueError, pass_error="the_exception")(throwing_func)
- assert func.__name__ == 'throwing_func'
+ my_exception,
+ 1,
+ 3,
+ 2,
+ monkey="bone",
+ ignores=ValueError,
+ pass_error="the_exception",
+ )(throwing_func)
+ assert func.__name__ == "throwing_func"
pytest.raises(my_exception, func)
try:
func()
raise AssertionError("shouldn't have been able to reach here")
except my_exception as e:
assert e.args == (1, 3, 2)
- assert e.kwds == {'monkey': 'bone', 'the_exception': e.__cause__}
+ assert e.kwds == {"monkey": "bone", "the_exception": e.__cause__}
diff --git a/tests/test_data_source.py b/tests/test_data_source.py
index ddd3eee6..1ede9aa0 100644
--- a/tests/test_data_source.py
+++ b/tests/test_data_source.py
@@ -53,15 +53,15 @@ class TestDataSource:
assert reader_data == writer_data
def _mk_data(self, size=(100000)):
- return ''.join(str(x % 10) for x in range(size))
+ return "".join(str(x % 10) for x in range(size))
def test_transfer_to_data_source(self):
data = self._mk_data()
reader = self.get_obj(data=data)
if self.supports_mutable:
- writer = self.get_obj(data='', mutable=True)
+ writer = self.get_obj(data="", mutable=True)
else:
- writer = data_source.data_source('', mutable=True)
+ writer = data_source.data_source("", mutable=True)
reader.transfer_to_data_source(writer)
self.assertContents(reader, writer)
@@ -70,9 +70,11 @@ class TestDataSource:
data = self._mk_data()
reader = self.get_obj(data=data)
if isinstance(reader, data_source.bz2_source):
- writer = data_source.bz2_source(tmp_path / 'transfer_to_path', mutable=True)
+ writer = data_source.bz2_source(tmp_path / "transfer_to_path", mutable=True)
else:
- writer = data_source.local_source(tmp_path / 'transfer_to_path', mutable=True)
+ writer = data_source.local_source(
+ tmp_path / "transfer_to_path", mutable=True
+ )
reader.transfer_to_path(writer.path)
@@ -82,9 +84,9 @@ class TestDataSource:
data = self._mk_data()
reader = self.get_obj(data=data)
if self.supports_mutable:
- writer = self.get_obj(data='', mutable=True)
+ writer = self.get_obj(data="", mutable=True)
else:
- writer = data_source.data_source('', mutable=True)
+ writer = data_source.data_source("", mutable=True)
with reader.bytes_fileobj() as reader_f, writer.bytes_fileobj(True) as writer_f:
data_source.transfer_between_files(reader_f, writer_f)
@@ -93,15 +95,14 @@ class TestDataSource:
class TestLocalSource(TestDataSource):
-
def get_obj(self, data="foonani", mutable=False, test_creation=False):
self.fp = self.dir / "localsource.test"
if not test_creation:
mode = None
if isinstance(data, bytes):
- mode = 'wb'
+ mode = "wb"
elif mode is None:
- mode = 'w'
+ mode = "w"
with open(self.fp, mode) as f:
f.write(data)
return data_source.local_source(self.fp, mutable=mutable)
@@ -118,21 +119,20 @@ class TestLocalSource(TestDataSource):
obj = self.get_obj(test_creation=True, mutable=True)
# this will blow up if tries to ascii decode it.
with obj.bytes_fileobj(True) as f:
- assert f.read() == b''
+ assert f.read() == b""
f.write(data)
with obj.bytes_fileobj() as f:
assert f.read() == data
class TestBz2Source(TestDataSource):
-
def get_obj(self, data="foonani", mutable=False, test_creation=False):
self.fp = self.dir / "bz2source.test.bz2"
if not test_creation:
if isinstance(data, str):
data = data.encode()
- with open(self.fp, 'wb') as f:
- f.write(compression.compress_data('bzip2', data))
+ with open(self.fp, "wb") as f:
+ f.write(compression.compress_data("bzip2", data))
return data_source.bz2_source(self.fp, mutable=mutable)
def test_bytes_fileobj(self):
@@ -150,8 +150,7 @@ class Test_invokable_data_source(TestDataSource):
def get_obj(self, data="foonani", mutable=False):
if isinstance(data, str):
data = data.encode("utf8")
- return data_source.invokable_data_source(
- partial(self._get_data, data))
+ return data_source.invokable_data_source(partial(self._get_data, data))
@staticmethod
def _get_data(data, is_text=False):
@@ -168,10 +167,10 @@ class Test_invokable_data_source_wrapper_text(Test_invokable_data_source):
def get_obj(self, mutable=False, data="foonani"):
return data_source.invokable_data_source.wrap_function(
- partial(self._get_data, data),
- self.text_mode)
+ partial(self._get_data, data), self.text_mode
+ )
- def _get_data(self, data='foonani'):
+ def _get_data(self, data="foonani"):
if isinstance(data, str):
if not self.text_mode:
return data.encode("utf8")
diff --git a/tests/test_decorators.py b/tests/test_decorators.py
index ce00ac4b..92c0fb8c 100644
--- a/tests/test_decorators.py
+++ b/tests/test_decorators.py
@@ -8,7 +8,6 @@ from snakeoil.decorators import coroutine, namespace, splitexec
class TestSplitExecDecorator:
-
def setup_method(self, method):
self.pid = os.getpid()
@@ -18,11 +17,14 @@ class TestSplitExecDecorator:
assert self.pid != os.getpid()
-@pytest.mark.skipif(not sys.platform.startswith('linux'), reason='supported on Linux only')
+@pytest.mark.skipif(
+ not sys.platform.startswith("linux"), reason="supported on Linux only"
+)
class TestNamespaceDecorator:
-
- @pytest.mark.skipif(not os.path.exists('/proc/self/ns/user'),
- reason='user namespace support required')
+ @pytest.mark.skipif(
+ not os.path.exists("/proc/self/ns/user"),
+ reason="user namespace support required",
+ )
def test_user_namespace(self):
@namespace(user=True)
def do_test():
@@ -31,31 +33,34 @@ class TestNamespaceDecorator:
try:
do_test()
except PermissionError:
- pytest.skip('No permission to use user namespace')
-
- @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/user') and os.path.exists('/proc/self/ns/uts')),
- reason='user and uts namespace support required')
+ pytest.skip("No permission to use user namespace")
+
+ @pytest.mark.skipif(
+ not (
+ os.path.exists("/proc/self/ns/user") and os.path.exists("/proc/self/ns/uts")
+ ),
+ reason="user and uts namespace support required",
+ )
def test_uts_namespace(self):
- @namespace(user=True, uts=True, hostname='host')
+ @namespace(user=True, uts=True, hostname="host")
def do_test():
- ns_hostname, _, ns_domainname = socket.getfqdn().partition('.')
- assert ns_hostname == 'host'
- assert ns_domainname == ''
+ ns_hostname, _, ns_domainname = socket.getfqdn().partition(".")
+ assert ns_hostname == "host"
+ assert ns_domainname == ""
try:
do_test()
except PermissionError:
- pytest.skip('No permission to use user and uts namespace')
+ pytest.skip("No permission to use user and uts namespace")
class TestCoroutineDecorator:
-
def test_coroutine(self):
@coroutine
def count():
i = 0
while True:
- val = (yield i)
+ val = yield i
i = val if val is not None else i + 1
cr = count()
diff --git a/tests/test_demandload.py b/tests/test_demandload.py
index ec2661c9..843cf801 100644
--- a/tests/test_demandload.py
+++ b/tests/test_demandload.py
@@ -9,6 +9,7 @@ from snakeoil import demandload
# setup is what the test expects.
# it also explicitly resets the state on the way out.
+
def reset_globals(functor):
def f(*args, **kwds):
orig_demandload = demandload.demandload
@@ -22,60 +23,61 @@ def reset_globals(functor):
demandload.demand_compile_regexp = orig_demand_compile
demandload._protection_enabled = orig_protection
demandload._noisy_protection = orig_noisy
+
return f
class TestParser:
-
@reset_globals
def test_parse(self):
for input, output in [
- ('foo', [('foo', 'foo')]),
- ('foo:bar', [('foo.bar', 'bar')]),
- ('foo:bar,baz@spork', [('foo.bar', 'bar'), ('foo.baz', 'spork')]),
- ('foo@bar', [('foo', 'bar')]),
- ('foo_bar', [('foo_bar', 'foo_bar')]),
- ]:
+ ("foo", [("foo", "foo")]),
+ ("foo:bar", [("foo.bar", "bar")]),
+ ("foo:bar,baz@spork", [("foo.bar", "bar"), ("foo.baz", "spork")]),
+ ("foo@bar", [("foo", "bar")]),
+ ("foo_bar", [("foo_bar", "foo_bar")]),
+ ]:
assert output == list(demandload.parse_imports([input]))
- pytest.raises(ValueError, list, demandload.parse_imports(['a.b']))
- pytest.raises(ValueError, list, demandload.parse_imports(['a:,']))
- pytest.raises(ValueError, list, demandload.parse_imports(['a:b,x@']))
- pytest.raises(ValueError, list, demandload.parse_imports(['b-x']))
- pytest.raises(ValueError, list, demandload.parse_imports([' b_x']))
+ pytest.raises(ValueError, list, demandload.parse_imports(["a.b"]))
+ pytest.raises(ValueError, list, demandload.parse_imports(["a:,"]))
+ pytest.raises(ValueError, list, demandload.parse_imports(["a:b,x@"]))
+ pytest.raises(ValueError, list, demandload.parse_imports(["b-x"]))
+ pytest.raises(ValueError, list, demandload.parse_imports([" b_x"]))
class TestPlaceholder:
-
@reset_globals
def test_getattr(self):
scope = {}
- placeholder = demandload.Placeholder(scope, 'foo', list)
- assert scope == object.__getattribute__(placeholder, '_scope')
+ placeholder = demandload.Placeholder(scope, "foo", list)
+ assert scope == object.__getattribute__(placeholder, "_scope")
assert placeholder.__doc__ == [].__doc__
- assert scope['foo'] == []
+ assert scope["foo"] == []
demandload._protection_enabled = lambda: True
with pytest.raises(ValueError):
- getattr(placeholder, '__doc__')
+ getattr(placeholder, "__doc__")
@reset_globals
def test__str__(self):
scope = {}
- placeholder = demandload.Placeholder(scope, 'foo', list)
- assert scope == object.__getattribute__(placeholder, '_scope')
+ placeholder = demandload.Placeholder(scope, "foo", list)
+ assert scope == object.__getattribute__(placeholder, "_scope")
assert str(placeholder) == str([])
- assert scope['foo'] == []
+ assert scope["foo"] == []
@reset_globals
def test_call(self):
def passthrough(*args, **kwargs):
return args, kwargs
+
def get_func():
return passthrough
+
scope = {}
- placeholder = demandload.Placeholder(scope, 'foo', get_func)
- assert scope == object.__getattribute__(placeholder, '_scope')
- assert (('arg',), {'kwarg': 42}) == placeholder('arg', kwarg=42)
- assert passthrough is scope['foo']
+ placeholder = demandload.Placeholder(scope, "foo", get_func)
+ assert scope == object.__getattribute__(placeholder, "_scope")
+ assert (("arg",), {"kwarg": 42}) == placeholder("arg", kwarg=42)
+ assert passthrough is scope["foo"]
@reset_globals
def test_setattr(self):
@@ -83,45 +85,43 @@ class TestPlaceholder:
pass
scope = {}
- placeholder = demandload.Placeholder(scope, 'foo', Struct)
+ placeholder = demandload.Placeholder(scope, "foo", Struct)
placeholder.val = 7
demandload._protection_enabled = lambda: True
with pytest.raises(ValueError):
- getattr(placeholder, 'val')
- assert 7 == scope['foo'].val
+ getattr(placeholder, "val")
+ assert 7 == scope["foo"].val
class TestImport:
-
@reset_globals
def test_demandload(self):
scope = {}
- demandload.demandload('snakeoil:demandload', scope=scope)
- assert demandload is not scope['demandload']
- assert demandload.demandload is scope['demandload'].demandload
- assert demandload is scope['demandload']
+ demandload.demandload("snakeoil:demandload", scope=scope)
+ assert demandload is not scope["demandload"]
+ assert demandload.demandload is scope["demandload"].demandload
+ assert demandload is scope["demandload"]
@reset_globals
def test_disabled_demandload(self):
scope = {}
- demandload.disabled_demandload('snakeoil:demandload', scope=scope)
- assert demandload is scope['demandload']
+ demandload.disabled_demandload("snakeoil:demandload", scope=scope)
+ assert demandload is scope["demandload"]
class TestDemandCompileRegexp:
-
@reset_globals
def test_demand_compile_regexp(self):
scope = {}
- demandload.demand_compile_regexp('foo', 'frob', scope=scope)
- assert list(scope.keys()) == ['foo']
- assert 'frob' == scope['foo'].pattern
- assert 'frob' == scope['foo'].pattern
+ demandload.demand_compile_regexp("foo", "frob", scope=scope)
+ assert list(scope.keys()) == ["foo"]
+ assert "frob" == scope["foo"].pattern
+ assert "frob" == scope["foo"].pattern
# verify it's delayed via a bad regex.
- demandload.demand_compile_regexp('foo', 'f(', scope=scope)
- assert list(scope.keys()) == ['foo']
+ demandload.demand_compile_regexp("foo", "f(", scope=scope)
+ assert list(scope.keys()) == ["foo"]
# should blow up on accessing an attribute.
- obj = scope['foo']
+ obj = scope["foo"]
with pytest.raises(sre_constants.error):
- getattr(obj, 'pattern')
+ getattr(obj, "pattern")
diff --git a/tests/test_demandload_usage.py b/tests/test_demandload_usage.py
index 79ccfe03..ddde0563 100644
--- a/tests/test_demandload_usage.py
+++ b/tests/test_demandload_usage.py
@@ -4,7 +4,7 @@ from snakeoil.test import mixins
class TestDemandLoadTargets(mixins.PythonNamespaceWalker):
- target_namespace = 'snakeoil'
+ target_namespace = "snakeoil"
ignore_all_import_failures = False
@pytest.fixture(autouse=True)
@@ -16,8 +16,8 @@ class TestDemandLoadTargets(mixins.PythonNamespaceWalker):
def test_demandload_targets(self):
for x in self.walk_namespace(
- self.target_namespace,
- ignore_failed_imports=self.ignore_all_import_failures):
+ self.target_namespace, ignore_failed_imports=self.ignore_all_import_failures
+ ):
self.check_space(x)
def check_space(self, mod):
diff --git a/tests/test_dependant_methods.py b/tests/test_dependant_methods.py
index ffd6d363..186e175a 100644
--- a/tests/test_dependant_methods.py
+++ b/tests/test_dependant_methods.py
@@ -8,7 +8,6 @@ def func(self, seq, data, val=True):
class TestDependantMethods:
-
@staticmethod
def generate_instance(methods, dependencies):
class Class(metaclass=dm.ForcedDepends):
@@ -25,13 +24,15 @@ class TestDependantMethods:
results = []
o = self.generate_instance(
{str(x): currying.post_curry(func, results, x) for x in range(10)},
- {str(x): str(x - 1) for x in range(1, 10)})
+ {str(x): str(x - 1) for x in range(1, 10)},
+ )
getattr(o, "9")()
assert results == list(range(10))
results = []
o = self.generate_instance(
{str(x): currying.post_curry(func, results, x, False) for x in range(10)},
- {str(x): str(x - 1) for x in range(1, 10)})
+ {str(x): str(x - 1) for x in range(1, 10)},
+ )
getattr(o, "9")()
assert results == [0]
getattr(o, "9")()
@@ -41,7 +42,8 @@ class TestDependantMethods:
results = []
o = self.generate_instance(
{str(x): currying.post_curry(func, results, x) for x in range(10)},
- {str(x): str(x - 1) for x in range(1, 10)})
+ {str(x): str(x - 1) for x in range(1, 10)},
+ )
getattr(o, "1")()
assert results == [0, 1]
getattr(o, "2")()
@@ -71,14 +73,15 @@ class TestDependantMethods:
results = []
o = self.generate_instance(
{str(x): currying.post_curry(func, results, x) for x in range(10)},
- {str(x): str(x - 1) for x in range(1, 10)})
- getattr(o, '2')(ignore_deps=True)
+ {str(x): str(x - 1) for x in range(1, 10)},
+ )
+ getattr(o, "2")(ignore_deps=True)
assert [2] == results
def test_no_deps(self):
results = []
o = self.generate_instance(
- {str(x): currying.post_curry(func, results, x) for x in range(10)},
- {})
- getattr(o, '2')()
+ {str(x): currying.post_curry(func, results, x) for x in range(10)}, {}
+ )
+ getattr(o, "2")()
assert [2] == results
diff --git a/tests/test_fileutils.py b/tests/test_fileutils.py
index a4555f89..356eb74d 100644
--- a/tests/test_fileutils.py
+++ b/tests/test_fileutils.py
@@ -13,7 +13,6 @@ from snakeoil.test import random_str
class TestTouch:
-
@pytest.fixture
def random_path(self, tmp_path):
return tmp_path / random_str(10)
@@ -124,19 +123,19 @@ class TestAtomicWriteFile:
def cpy_setup_class(scope, func_name):
- if getattr(fileutils, 'native_%s' % func_name) \
- is getattr(fileutils, func_name):
- scope['skip'] = 'extensions disabled'
+ if getattr(fileutils, "native_%s" % func_name) is getattr(fileutils, func_name):
+ scope["skip"] = "extensions disabled"
else:
- scope['func'] = staticmethod(getattr(fileutils, func_name))
+ scope["func"] = staticmethod(getattr(fileutils, func_name))
+
class Test_readfile:
func = staticmethod(fileutils.readfile)
- test_cases = ['asdf\nfdasswer\1923', '', '987234']
+ test_cases = ["asdf\nfdasswer\1923", "", "987234"]
- default_encoding = 'ascii'
- none_on_missing_ret_data = 'dar'
+ default_encoding = "ascii"
+ none_on_missing_ret_data = "dar"
@staticmethod
def convert_data(data, encoding):
@@ -147,7 +146,7 @@ class Test_readfile:
return data
def test_it(self, tmp_path):
- fp = tmp_path / 'testfile'
+ fp = tmp_path / "testfile"
for expected in self.test_cases:
raised = None
encoding = self.default_encoding
@@ -168,16 +167,16 @@ class Test_readfile:
assert self.func(path) == expected
def test_none_on_missing(self, tmp_path):
- fp = tmp_path / 'nonexistent'
+ fp = tmp_path / "nonexistent"
with pytest.raises(FileNotFoundError):
self.func(fp)
assert self.func(fp, True) is None
- fp.write_bytes(self.convert_data('dar', 'ascii'))
+ fp.write_bytes(self.convert_data("dar", "ascii"))
assert self.func(fp, True) == self.none_on_missing_ret_data
# ensure it handles paths that go through files-
# still should be suppress
- assert self.func(fp / 'extra', True) is None
+ assert self.func(fp / "extra", True) is None
class Test_readfile_ascii(Test_readfile):
@@ -186,85 +185,86 @@ class Test_readfile_ascii(Test_readfile):
class Test_readfile_utf8(Test_readfile):
func = staticmethod(fileutils.readfile_utf8)
- default_encoding = 'utf8'
+ default_encoding = "utf8"
class Test_readfile_bytes(Test_readfile):
func = staticmethod(fileutils.readfile_bytes)
default_encoding = None
- test_cases = list(map(
- currying.post_curry(Test_readfile.convert_data, 'ascii'),
- Test_readfile.test_cases))
- test_cases.append('\ua000fa'.encode("utf8"))
+ test_cases = list(
+ map(
+ currying.post_curry(Test_readfile.convert_data, "ascii"),
+ Test_readfile.test_cases,
+ )
+ )
+ test_cases.append("\ua000fa".encode("utf8"))
none_on_missing_ret_data = Test_readfile.convert_data(
- Test_readfile.none_on_missing_ret_data, 'ascii')
+ Test_readfile.none_on_missing_ret_data, "ascii"
+ )
class readlines_mixin:
-
def assertFunc(self, path, expected):
expected = tuple(expected.split())
- if expected == ('',):
+ if expected == ("",):
expected = ()
- if 'utf8' not in self.encoding_mode:
+ if "utf8" not in self.encoding_mode:
assert tuple(self.func(path)) == expected
return
assert tuple(self.func(path)) == expected
def test_none_on_missing(self, tmp_path):
- fp = tmp_path / 'nonexistent'
+ fp = tmp_path / "nonexistent"
with pytest.raises(FileNotFoundError):
self.func(fp)
assert not tuple(self.func(fp, False, True))
- fp.write_bytes(self.convert_data('dar', 'ascii'))
+ fp.write_bytes(self.convert_data("dar", "ascii"))
assert tuple(self.func(fp, True)) == (self.none_on_missing_ret_data,)
- assert not tuple(self.func(fp / 'missing', False, True))
+ assert not tuple(self.func(fp / "missing", False, True))
def test_strip_whitespace(self, tmp_path):
- fp = tmp_path / 'data'
+ fp = tmp_path / "data"
- fp.write_bytes(self.convert_data(' dar1 \ndar2 \n dar3\n',
- 'ascii'))
+ fp.write_bytes(self.convert_data(" dar1 \ndar2 \n dar3\n", "ascii"))
results = tuple(self.func(fp, True))
- expected = ('dar1', 'dar2', 'dar3')
- if self.encoding_mode == 'bytes':
+ expected = ("dar1", "dar2", "dar3")
+ if self.encoding_mode == "bytes":
expected = tuple(x.encode("ascii") for x in expected)
assert results == expected
# this time without the trailing newline...
- fp.write_bytes(self.convert_data(' dar1 \ndar2 \n dar3',
- 'ascii'))
+ fp.write_bytes(self.convert_data(" dar1 \ndar2 \n dar3", "ascii"))
results = tuple(self.func(fp, True))
assert results == expected
# test a couple of edgecases; underly c extension has gotten these
# wrong before.
- fp.write_bytes(self.convert_data('0', 'ascii'))
+ fp.write_bytes(self.convert_data("0", "ascii"))
results = tuple(self.func(fp, True))
- expected = ('0',)
- if self.encoding_mode == 'bytes':
+ expected = ("0",)
+ if self.encoding_mode == "bytes":
expected = tuple(x.encode("ascii") for x in expected)
assert results == expected
- fp.write_bytes(self.convert_data('0\n', 'ascii'))
+ fp.write_bytes(self.convert_data("0\n", "ascii"))
results = tuple(self.func(fp, True))
- expected = ('0',)
- if self.encoding_mode == 'bytes':
+ expected = ("0",)
+ if self.encoding_mode == "bytes":
expected = tuple(x.encode("ascii") for x in expected)
assert results == expected
- fp.write_bytes(self.convert_data('0 ', 'ascii'))
+ fp.write_bytes(self.convert_data("0 ", "ascii"))
results = tuple(self.func(fp, True))
- expected = ('0',)
- if self.encoding_mode == 'bytes':
+ expected = ("0",)
+ if self.encoding_mode == "bytes":
expected = tuple(x.encode("ascii") for x in expected)
assert results == expected
def mk_readlines_test(scope, mode):
- func_name = 'readlines_%s' % mode
- base = globals()['Test_readfile_%s' % mode]
+ func_name = "readlines_%s" % mode
+ base = globals()["Test_readfile_%s" % mode]
class kls(readlines_mixin, base):
func = staticmethod(getattr(fileutils, func_name))
@@ -273,14 +273,15 @@ def mk_readlines_test(scope, mode):
kls.__name__ = "Test_%s" % func_name
scope["Test_%s" % func_name] = kls
+
for case in ("ascii", "bytes", "utf8"):
- name = 'readlines_%s' % case
+ name = "readlines_%s" % case
mk_readlines_test(locals(), case)
class TestBrokenStats:
- test_cases = ['/proc/crypto', '/sys/devices/system/cpu/present']
+ test_cases = ["/proc/crypto", "/sys/devices/system/cpu/present"]
def test_readfile(self):
for path in self.test_cases:
@@ -292,7 +293,7 @@ class TestBrokenStats:
def _check_path(self, path, func, split_it=False):
try:
- with open(path, 'r') as handle:
+ with open(path, "r") as handle:
data = handle.read()
except EnvironmentError as e:
if e.errno not in (errno.ENOENT, errno.EPERM):
@@ -302,7 +303,7 @@ class TestBrokenStats:
func_data = func(path)
if split_it:
func_data = list(func_data)
- data = [x for x in data.split('\n') if x]
+ data = [x for x in data.split("\n") if x]
func_data = [x for x in func_data if x]
assert func_data == data
@@ -313,13 +314,13 @@ class Test_mmap_or_open_for_read:
func = staticmethod(fileutils.mmap_or_open_for_read)
def test_zero_length(self, tmp_path):
- (path := tmp_path / "target").write_text('')
+ (path := tmp_path / "target").write_text("")
m, f = self.func(path)
assert m is None
- assert f.read() == b''
+ assert f.read() == b""
f.close()
- def test_mmap(self, tmp_path, data=b'foonani'):
+ def test_mmap(self, tmp_path, data=b"foonani"):
(path := tmp_path / "target").write_bytes(data)
m, f = self.func(path)
assert len(m) == len(data)
@@ -329,14 +330,14 @@ class Test_mmap_or_open_for_read:
class Test_mmap_and_close:
-
def test_it(self, tmp_path):
- (path := tmp_path / "target").write_bytes(data := b'asdfasdf')
+ (path := tmp_path / "target").write_bytes(data := b"asdfasdf")
fd, m = None, None
try:
fd = os.open(path, os.O_RDONLY)
m = _fileutils.mmap_and_close(
- fd, len(data), mmap.MAP_PRIVATE, mmap.PROT_READ)
+ fd, len(data), mmap.MAP_PRIVATE, mmap.PROT_READ
+ )
# and ensure it closed the fd...
with pytest.raises(EnvironmentError):
os.read(fd, 1)
diff --git a/tests/test_formatters.py b/tests/test_formatters.py
index 266ef1e0..549f2adc 100644
--- a/tests/test_formatters.py
+++ b/tests/test_formatters.py
@@ -18,16 +18,16 @@ class TestPlainTextFormatter:
def test_basics(self):
# As many sporks as fit in 20 chars.
- sporks = ' '.join(3 * ('spork',))
+ sporks = " ".join(3 * ("spork",))
for inputs, output in [
- (('\N{SNOWMAN}',), '?'),
- ((7 * 'spork ',), '%s\n%s\n%s' % (sporks, sporks, 'spork ')),
- (7 * ('spork ',), '%s \n%s \n%s' % (sporks, sporks, 'spork ')),
- ((30 * 'a'), 20 * 'a' + '\n' + 10 * 'a'),
- (30 * ('a',), 20 * 'a' + '\n' + 10 * 'a'),
- ]:
+ (("\N{SNOWMAN}",), "?"),
+ ((7 * "spork ",), "%s\n%s\n%s" % (sporks, sporks, "spork ")),
+ (7 * ("spork ",), "%s \n%s \n%s" % (sporks, sporks, "spork ")),
+ ((30 * "a"), 20 * "a" + "\n" + 10 * "a"),
+ (30 * ("a",), 20 * "a" + "\n" + 10 * "a"),
+ ]:
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii')
+ formatter = self.kls(stream, encoding="ascii")
formatter.width = 20
formatter.write(autoline=False, wrap=True, *inputs)
assert output.encode() == stream.getvalue()
@@ -35,69 +35,70 @@ class TestPlainTextFormatter:
def test_first_prefix(self):
# As many sporks as fit in 20 chars.
for inputs, output in [
- (('\N{SNOWMAN}',), 'foon:?'),
- ((7 * 'spork ',),
- 'foon:spork spork\n'
- 'spork spork spork\n'
- 'spork spork '),
- (7 * ('spork ',),
- 'foon:spork spork \n'
- 'spork spork spork \n'
- 'spork spork '),
- ((30 * 'a'), 'foon:' + 15 * 'a' + '\n' + 15 * 'a'),
- (30 * ('a',), 'foon:' + 15 * 'a' + '\n' + 15 * 'a'),
- ]:
+ (("\N{SNOWMAN}",), "foon:?"),
+ (
+ (7 * "spork ",),
+ "foon:spork spork\n" "spork spork spork\n" "spork spork ",
+ ),
+ (
+ 7 * ("spork ",),
+ "foon:spork spork \n" "spork spork spork \n" "spork spork ",
+ ),
+ ((30 * "a"), "foon:" + 15 * "a" + "\n" + 15 * "a"),
+ (30 * ("a",), "foon:" + 15 * "a" + "\n" + 15 * "a"),
+ ]:
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii')
+ formatter = self.kls(stream, encoding="ascii")
formatter.width = 20
- formatter.write(autoline=False, wrap=True, first_prefix='foon:', *inputs)
+ formatter.write(autoline=False, wrap=True, first_prefix="foon:", *inputs)
assert output.encode() == stream.getvalue()
def test_later_prefix(self):
for inputs, output in [
- (('\N{SNOWMAN}',), '?'),
- ((7 * 'spork ',),
- 'spork spork spork\n'
- 'foon:spork spork\n'
- 'foon:spork spork '),
- (7 * ('spork ',),
- 'spork spork spork \n'
- 'foon:spork spork \n'
- 'foon:spork spork '),
- ((30 * 'a'), 20 * 'a' + '\n' + 'foon:' + 10 * 'a'),
- (30 * ('a',), 20 * 'a' + '\n' + 'foon:' + 10 * 'a'),
- ]:
+ (("\N{SNOWMAN}",), "?"),
+ (
+ (7 * "spork ",),
+ "spork spork spork\n" "foon:spork spork\n" "foon:spork spork ",
+ ),
+ (
+ 7 * ("spork ",),
+ "spork spork spork \n" "foon:spork spork \n" "foon:spork spork ",
+ ),
+ ((30 * "a"), 20 * "a" + "\n" + "foon:" + 10 * "a"),
+ (30 * ("a",), 20 * "a" + "\n" + "foon:" + 10 * "a"),
+ ]:
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii')
+ formatter = self.kls(stream, encoding="ascii")
formatter.width = 20
- formatter.later_prefix = ['foon:']
+ formatter.later_prefix = ["foon:"]
formatter.write(wrap=True, autoline=False, *inputs)
assert output.encode() == stream.getvalue()
def test_complex(self):
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii')
+ formatter = self.kls(stream, encoding="ascii")
formatter.width = 9
- formatter.first_prefix = ['foo', None, ' d']
- formatter.later_prefix = ['dorkey']
+ formatter.first_prefix = ["foo", None, " d"]
+ formatter.later_prefix = ["dorkey"]
formatter.write("dar bl", wrap=True, autoline=False)
assert "foo ddar\ndorkeybl".encode() == stream.getvalue()
- formatter.write(" "*formatter.width, wrap=True, autoline=True)
+ formatter.write(" " * formatter.width, wrap=True, autoline=True)
formatter.stream = stream = BytesIO()
formatter.write("dar", " b", wrap=True, autoline=False)
assert "foo ddar\ndorkeyb".encode() == stream.getvalue()
- output = \
-""" rdepends: >=dev-lang/python-2.3 >=sys-apps/sed-4.0.5
+ output = """ rdepends: >=dev-lang/python-2.3 >=sys-apps/sed-4.0.5
dev-python/python-fchksum
"""
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii', width=80)
+ formatter = self.kls(stream, encoding="ascii", width=80)
formatter.wrap = True
assert formatter.autoline
assert formatter.width == 80
- formatter.later_prefix = [' ']
- formatter.write(" rdepends: >=dev-lang/python-2.3 "
- ">=sys-apps/sed-4.0.5 dev-python/python-fchksum")
+ formatter.later_prefix = [" "]
+ formatter.write(
+ " rdepends: >=dev-lang/python-2.3 "
+ ">=sys-apps/sed-4.0.5 dev-python/python-fchksum"
+ )
assert len(formatter.first_prefix) == 0
assert len(formatter.later_prefix) == 1
assert output.encode() == stream.getvalue()
@@ -105,148 +106,176 @@ class TestPlainTextFormatter:
formatter.stream = stream = BytesIO()
# push it right up to the limit.
formatter.width = 82
- formatter.write(" rdepends: >=dev-lang/python-2.3 "
- ">=sys-apps/sed-4.0.5 dev-python/python-fchksum")
+ formatter.write(
+ " rdepends: >=dev-lang/python-2.3 "
+ ">=sys-apps/sed-4.0.5 dev-python/python-fchksum"
+ )
assert output.encode() == stream.getvalue()
formatter.first_prefix = []
- formatter.later_prefix = [' ']
+ formatter.later_prefix = [" "]
formatter.width = 28
formatter.autoline = False
formatter.wrap = True
formatter.stream = stream = BytesIO()
input = (" description: ", "The Portage")
formatter.write(*input)
- output = ''.join(input).rsplit(" ", 1)
- output[1] = ' %s' % output[1]
- assert '\n'.join(output).encode() == stream.getvalue()
-
+ output = "".join(input).rsplit(" ", 1)
+ output[1] = " %s" % output[1]
+ assert "\n".join(output).encode() == stream.getvalue()
def test_wrap_autoline(self):
for inputs, output in [
- ((3 * ('spork',)), 'spork\nspork\nspork\n'),
- (3 * (('spork',),), 'spork\nspork\nspork\n'),
- (((3 * 'spork',),),
- '\n'
- 'foonsporks\n'
- 'foonporksp\n'
- 'foonork\n'),
- ((('fo',), (2 * 'spork',),), 'fo\nsporkspork\n'),
- ((('fo',), (3 * 'spork',),),
- 'fo\n'
- '\n'
- 'foonsporks\n'
- 'foonporksp\n'
- 'foonork\n'),
- ]:
+ ((3 * ("spork",)), "spork\nspork\nspork\n"),
+ (3 * (("spork",),), "spork\nspork\nspork\n"),
+ (((3 * "spork",),), "\n" "foonsporks\n" "foonporksp\n" "foonork\n"),
+ (
+ (
+ ("fo",),
+ (2 * "spork",),
+ ),
+ "fo\nsporkspork\n",
+ ),
+ (
+ (
+ ("fo",),
+ (3 * "spork",),
+ ),
+ "fo\n" "\n" "foonsporks\n" "foonporksp\n" "foonork\n",
+ ),
+ ]:
stream = BytesIO()
- formatter = self.kls(stream, encoding='ascii')
+ formatter = self.kls(stream, encoding="ascii")
formatter.width = 10
for input in inputs:
- formatter.write(wrap=True, later_prefix='foon', *input)
+ formatter.write(wrap=True, later_prefix="foon", *input)
assert output.encode() == stream.getvalue()
class TerminfoFormatterTest:
-
def _test_stream(self, stream, formatter, inputs, output):
stream.seek(0)
stream.truncate()
formatter.write(*inputs)
stream.seek(0)
result = stream.read()
- output = ''.join(output)
- assert output.encode() == result, \
- "given(%r), expected(%r), got(%r)" % (inputs, output, result)
+ output = "".join(output)
+ assert output.encode() == result, "given(%r), expected(%r), got(%r)" % (
+ inputs,
+ output,
+ result,
+ )
@issue7567
def test_terminfo(self):
- esc = '\x1b['
+ esc = "\x1b["
stream = TemporaryFile()
- f = formatters.TerminfoFormatter(stream, 'ansi', True, 'ascii')
+ f = formatters.TerminfoFormatter(stream, "ansi", True, "ascii")
f.autoline = False
for inputs, output in (
- ((f.bold, 'bold'), (esc, '1m', 'bold', esc, '0;10m')),
- ((f.underline, 'underline'),
- (esc, '4m', 'underline', esc, '0;10m')),
- ((f.fg('red'), 'red'), (esc, '31m', 'red', esc, '39;49m')),
- ((f.fg('red'), 'red', f.bold, 'boldred', f.fg(), 'bold',
- f.reset, 'done'),
- (esc, '31m', 'red', esc, '1m', 'boldred', esc, '39;49m', 'bold',
- esc, '0;10m', 'done')),
- ((42,), ('42',)),
- (('\N{SNOWMAN}',), ('?',))
- ):
+ ((f.bold, "bold"), (esc, "1m", "bold", esc, "0;10m")),
+ ((f.underline, "underline"), (esc, "4m", "underline", esc, "0;10m")),
+ ((f.fg("red"), "red"), (esc, "31m", "red", esc, "39;49m")),
+ (
+ (
+ f.fg("red"),
+ "red",
+ f.bold,
+ "boldred",
+ f.fg(),
+ "bold",
+ f.reset,
+ "done",
+ ),
+ (
+ esc,
+ "31m",
+ "red",
+ esc,
+ "1m",
+ "boldred",
+ esc,
+ "39;49m",
+ "bold",
+ esc,
+ "0;10m",
+ "done",
+ ),
+ ),
+ ((42,), ("42",)),
+ (("\N{SNOWMAN}",), ("?",)),
+ ):
self._test_stream(stream, f, inputs, output)
f.autoline = True
- self._test_stream(
- stream, f, ('lala',), ('lala', '\n'))
+ self._test_stream(stream, f, ("lala",), ("lala", "\n"))
def test_unsupported_term(self):
stream = TemporaryFile()
with pytest.raises(formatters.TerminfoUnsupported):
- formatters.TerminfoFormatter(stream, term='dumb')
+ formatters.TerminfoFormatter(stream, term="dumb")
@issue7567
def test_title(self):
stream = TemporaryFile()
try:
- f = formatters.TerminfoFormatter(stream, 'xterm+sl', True, 'ascii')
+ f = formatters.TerminfoFormatter(stream, "xterm+sl", True, "ascii")
except curses.error:
pytest.skip("xterm+sl not in terminfo db")
- f.title('TITLE')
+ f.title("TITLE")
stream.seek(0)
- assert b'\x1b]0;TITLE\x07' == stream.read()
+ assert b"\x1b]0;TITLE\x07" == stream.read()
def _with_term(term, func, *args, **kwargs):
- orig_term = os.environ.get('TERM')
+ orig_term = os.environ.get("TERM")
try:
- os.environ['TERM'] = term
+ os.environ["TERM"] = term
return func(*args, **kwargs)
finally:
if orig_term is None:
- del os.environ['TERM']
+ del os.environ["TERM"]
else:
- os.environ['TERM'] = orig_term
+ os.environ["TERM"] = orig_term
+
# XXX ripped from pkgcore's test_commandline
-def _get_pty_pair(encoding='ascii'):
+def _get_pty_pair(encoding="ascii"):
master_fd, slave_fd = pty.openpty()
- master = os.fdopen(master_fd, 'rb', 0)
- out = os.fdopen(slave_fd, 'wb', 0)
+ master = os.fdopen(master_fd, "rb", 0)
+ out = os.fdopen(slave_fd, "wb", 0)
return master, out
-@pytest.mark.skip(reason='this currently breaks on github ci due to the issue7567 workaround')
+@pytest.mark.skip(
+ reason="this currently breaks on github ci due to the issue7567 workaround"
+)
class TestGetFormatter:
-
@issue7567
def test_dumb_terminal(self):
master, _out = _get_pty_pair()
- formatter = _with_term('dumb', formatters.get_formatter, master)
+ formatter = _with_term("dumb", formatters.get_formatter, master)
assert isinstance(formatter, formatters.PlainTextFormatter)
@issue7567
def test_vt100_terminal(self):
master, _out = _get_pty_pair()
- formatter = _with_term('vt100', formatters.get_formatter, master)
+ formatter = _with_term("vt100", formatters.get_formatter, master)
assert isinstance(formatter, formatters.PlainTextFormatter)
@issue7567
def test_smart_terminal(self):
master, _out = _get_pty_pair()
- formatter = _with_term('xterm', formatters.get_formatter, master)
+ formatter = _with_term("xterm", formatters.get_formatter, master)
assert isinstance(formatter, formatters.TerminfoFormatter)
@issue7567
def test_not_a_tty(self):
stream = TemporaryFile()
- formatter = _with_term('xterm', formatters.get_formatter, stream)
+ formatter = _with_term("xterm", formatters.get_formatter, stream)
assert isinstance(formatter, formatters.PlainTextFormatter)
@issue7567
def test_no_fd(self):
stream = BytesIO()
- formatter = _with_term('xterm', formatters.get_formatter, stream)
+ formatter = _with_term("xterm", formatters.get_formatter, stream)
assert isinstance(formatter, formatters.PlainTextFormatter)
diff --git a/tests/test_iterables.py b/tests/test_iterables.py
index 3345c5e8..d0d57683 100644
--- a/tests/test_iterables.py
+++ b/tests/test_iterables.py
@@ -1,12 +1,10 @@
import operator
import pytest
-from snakeoil.iterables import (caching_iter, expandable_chain, iter_sort,
- partition)
+from snakeoil.iterables import caching_iter, expandable_chain, iter_sort, partition
class TestPartition:
-
def test_empty(self):
a, b = partition(())
assert list(a) == []
@@ -23,19 +21,18 @@ class TestPartition:
class TestExpandableChain:
-
def test_normal_function(self):
i = [iter(range(100)) for x in range(3)]
e = expandable_chain()
e.extend(i)
- assert list(e) == list(range(100))*3
+ assert list(e) == list(range(100)) * 3
for x in i + [e]:
pytest.raises(StopIteration, x.__next__)
def test_extend(self):
e = expandable_chain()
e.extend(range(100) for i in (1, 2))
- assert list(e) == list(range(100))*2
+ assert list(e) == list(range(100)) * 2
with pytest.raises(StopIteration):
e.extend([[]])
@@ -62,7 +59,6 @@ class TestExpandableChain:
class TestCachingIter:
-
def test_iter_consumption(self):
i = iter(range(100))
c = caching_iter(i)
@@ -147,6 +143,7 @@ class Test_iter_sort:
def test_ordering(self):
def f(l):
return sorted(l, key=operator.itemgetter(0))
+
result = list(iter_sort(f, *[iter(range(x, x + 10)) for x in (30, 20, 0, 10)]))
expected = list(range(40))
assert result == expected
diff --git a/tests/test_klass.py b/tests/test_klass.py
index 773925d2..25728fa5 100644
--- a/tests/test_klass.py
+++ b/tests/test_klass.py
@@ -14,7 +14,8 @@ class Test_GetAttrProxy:
class foo1:
def __init__(self, obj):
self.obj = obj
- __getattr__ = self.kls('obj')
+
+ __getattr__ = self.kls("obj")
class foo2:
pass
@@ -27,18 +28,18 @@ class Test_GetAttrProxy:
o2.foon = "dar"
assert o.foon == "dar"
o.foon = "foo"
- assert o.foon == 'foo'
+ assert o.foon == "foo"
def test_attrlist(self):
def make_class(attr_list=None):
class foo(metaclass=self.kls):
if attr_list is not None:
- locals()['__attr_comparison__'] = attr_list
+ locals()["__attr_comparison__"] = attr_list
with pytest.raises(TypeError):
make_class()
with pytest.raises(TypeError):
- make_class(['foon'])
+ make_class(["foon"])
with pytest.raises(TypeError):
make_class([None])
@@ -47,38 +48,39 @@ class Test_GetAttrProxy:
bar = "baz"
class Test:
- method = self.kls('test')
+ method = self.kls("test")
test = foo()
test = Test()
- assert test.method('bar') == foo.bar
+ assert test.method("bar") == foo.bar
class TestDirProxy:
-
@staticmethod
def noninternal_attrs(obj):
- return sorted(x for x in dir(obj) if not re.match(r'__\w+__', x))
+ return sorted(x for x in dir(obj) if not re.match(r"__\w+__", x))
def test_combined(self):
class foo1:
def __init__(self, obj):
self.obj = obj
- __dir__ = klass.DirProxy('obj')
+
+ __dir__ = klass.DirProxy("obj")
class foo2:
def __init__(self):
- self.attr = 'foo'
+ self.attr = "foo"
o2 = foo2()
o = foo1(o2)
- assert self.noninternal_attrs(o) == ['attr', 'obj']
+ assert self.noninternal_attrs(o) == ["attr", "obj"]
def test_empty(self):
class foo1:
def __init__(self, obj):
self.obj = obj
- __dir__ = klass.DirProxy('obj')
+
+ __dir__ = klass.DirProxy("obj")
class foo2:
pass
@@ -86,23 +88,26 @@ class TestDirProxy:
o2 = foo2()
o = foo1(o2)
assert self.noninternal_attrs(o2) == []
- assert self.noninternal_attrs(o) == ['obj']
+ assert self.noninternal_attrs(o) == ["obj"]
def test_slots(self):
class foo1:
- __slots__ = ('obj',)
+ __slots__ = ("obj",)
+
def __init__(self, obj):
self.obj = obj
- __dir__ = klass.DirProxy('obj')
+
+ __dir__ = klass.DirProxy("obj")
class foo2:
- __slots__ = ('attr',)
+ __slots__ = ("attr",)
+
def __init__(self):
- self.attr = 'foo'
+ self.attr = "foo"
o2 = foo2()
o = foo1(o2)
- assert self.noninternal_attrs(o) == ['attr', 'obj']
+ assert self.noninternal_attrs(o) == ["attr", "obj"]
class Test_contains:
@@ -111,6 +116,7 @@ class Test_contains:
def test_it(self):
class c(dict):
__contains__ = self.func
+
d = c({"1": 2})
assert "1" in d
assert 1 not in d
@@ -122,6 +128,7 @@ class Test_get:
def test_it(self):
class c(dict):
get = self.func
+
d = c({"1": 2})
assert d.get("1") == 2
assert d.get("1", 3) == 2
@@ -142,11 +149,13 @@ class Test_chained_getter:
assert id(self.kls("fa2341fa")) == l[0]
def test_eq(self):
- assert self.kls("asdf", disable_inst_caching=True) == \
- self.kls("asdf", disable_inst_caching=True)
+ assert self.kls("asdf", disable_inst_caching=True) == self.kls(
+ "asdf", disable_inst_caching=True
+ )
- assert self.kls("asdf2", disable_inst_caching=True) != \
- self.kls("asdf", disable_inst_caching=True)
+ assert self.kls("asdf2", disable_inst_caching=True) != self.kls(
+ "asdf", disable_inst_caching=True
+ )
def test_it(self):
class maze:
@@ -159,13 +168,13 @@ class Test_chained_getter:
d = {}
m = maze(d)
f = self.kls
- assert f('foon')(m) == m
+ assert f("foon")(m) == m
d["foon"] = 1
- assert f('foon')(m) == 1
- assert f('dar.foon')(m) == 1
- assert f('.'.join(['blah']*10))(m) == m
+ assert f("foon")(m) == 1
+ assert f("dar.foon")(m) == 1
+ assert f(".".join(["blah"] * 10))(m) == m
with pytest.raises(AttributeError):
- f('foon.dar')(m)
+ f("foon.dar")(m)
class Test_jit_attr:
@@ -184,23 +193,28 @@ class Test_jit_attr:
def jit_attr_ext_method(self):
return partial(klass.jit_attr_ext_method, kls=self.kls)
- def mk_inst(self, attrname='_attr', method_lookup=False,
- use_cls_setattr=False, func=None,
- singleton=klass._uncached_singleton):
+ def mk_inst(
+ self,
+ attrname="_attr",
+ method_lookup=False,
+ use_cls_setattr=False,
+ func=None,
+ singleton=klass._uncached_singleton,
+ ):
f = func
if not func:
+
def f(self):
self._invokes.append(self)
return 54321
class cls:
-
def __init__(self):
sf = partial(object.__setattr__, self)
- sf('_sets', [])
- sf('_reflects', [])
- sf('_invokes', [])
+ sf("_sets", [])
+ sf("_reflects", [])
+ sf("_invokes", [])
attr = self.kls(f, attrname, singleton, use_cls_setattr)
@@ -219,13 +233,22 @@ class Test_jit_attr:
sets = [instance] * sets
reflects = [instance] * reflects
invokes = [instance] * invokes
- msg = ("checking %s: got(%r), expected(%r); state was sets=%r, "
- "reflects=%r, invokes=%r" % (
- "%s", "%s", "%s", instance._sets, instance._reflects,
- instance._invokes))
+ msg = (
+ "checking %s: got(%r), expected(%r); state was sets=%r, "
+ "reflects=%r, invokes=%r"
+ % ("%s", "%s", "%s", instance._sets, instance._reflects, instance._invokes)
+ )
assert instance._sets == sets, msg % ("sets", instance._sets, sets)
- assert instance._reflects == reflects, msg % ("reflects", instance._reflects, reflects)
- assert instance._invokes == invokes, msg % ("invokes", instance._invokes, invokes)
+ assert instance._reflects == reflects, msg % (
+ "reflects",
+ instance._reflects,
+ reflects,
+ )
+ assert instance._invokes == invokes, msg % (
+ "invokes",
+ instance._invokes,
+ invokes,
+ )
def test_implementation(self):
obj = self.mk_inst()
@@ -298,7 +321,7 @@ class Test_jit_attr:
object.__setattr__(self, attr, value)
o = cls()
- assert not hasattr(o, 'invoked')
+ assert not hasattr(o, "invoked")
assert o.my_attr == now
assert o._blah2 == now
assert o.invoked
@@ -315,34 +338,34 @@ class Test_jit_attr:
return now2
def __setattr__(self, attr, value):
- if not getattr(self, '_setattr_allowed', False):
+ if not getattr(self, "_setattr_allowed", False):
raise TypeError("setattr isn't allowed for %s" % attr)
object.__setattr__(self, attr, value)
- base.attr = self.jit_attr_ext_method('f1', '_attr')
+ base.attr = self.jit_attr_ext_method("f1", "_attr")
o = base()
assert o.attr == now
assert o._attr == now
assert o.attr == now
- base.attr = self.jit_attr_ext_method('f1', '_attr', use_cls_setattr=True)
+ base.attr = self.jit_attr_ext_method("f1", "_attr", use_cls_setattr=True)
o = base()
with pytest.raises(TypeError):
- getattr(o, 'attr')
+ getattr(o, "attr")
base._setattr_allowed = True
assert o.attr == now
- base.attr = self.jit_attr_ext_method('f2', '_attr2')
+ base.attr = self.jit_attr_ext_method("f2", "_attr2")
o = base()
assert o.attr == now2
assert o._attr2 == now2
# finally, check that it's doing lookups rather then storing the func.
- base.attr = self.jit_attr_ext_method('func', '_attr2')
+ base.attr = self.jit_attr_ext_method("func", "_attr2")
o = base()
# no func...
with pytest.raises(AttributeError):
- getattr(o, 'attr')
+ getattr(o, "attr")
base.func = base.f1
assert o.attr == now
assert o._attr2 == now
@@ -354,7 +377,13 @@ class Test_jit_attr:
def test_check_singleton_is_compare(self):
def throw_assert(*args, **kwds):
- raise AssertionError("I shouldn't be invoked: %s, %s" % (args, kwds,))
+ raise AssertionError(
+ "I shouldn't be invoked: %s, %s"
+ % (
+ args,
+ kwds,
+ )
+ )
class puker:
__eq__ = throw_assert
@@ -369,11 +398,13 @@ class Test_jit_attr:
def test_cached_property(self):
l = []
+
class foo:
@klass.cached_property
def blah(self, l=l, i=iter(range(5))):
l.append(None)
return next(i)
+
f = foo()
assert f.blah == 0
assert len(l) == 1
@@ -413,15 +444,15 @@ class Test_aliased_attr:
o = cls()
with pytest.raises(AttributeError):
- getattr(o, 'attr')
+ getattr(o, "attr")
o.dar = "foon"
with pytest.raises(AttributeError):
- getattr(o, 'attr')
+ getattr(o, "attr")
o.dar = o
o.blah = "monkey"
- assert o.attr == 'monkey'
+ assert o.attr == "monkey"
# verify it'll cross properties...
class blah:
@@ -431,6 +462,7 @@ class Test_aliased_attr:
@property
def foon(self):
return blah()
+
alias = self.func("foon.target")
o = cls()
@@ -442,12 +474,15 @@ class Test_cached_hash:
def test_it(self):
now = int(time())
+
class cls:
invoked = []
+
@self.func
def __hash__(self):
self.invoked.append(self)
return now
+
o = cls()
assert hash(o) == now
assert o.invoked == [o]
@@ -462,7 +497,7 @@ class Test_reflective_hash:
def test_it(self):
class cls:
- __hash__ = self.func('_hash')
+ __hash__ = self.func("_hash")
obj = cls()
with pytest.raises(AttributeError):
@@ -477,7 +512,8 @@ class Test_reflective_hash:
hash(obj)
class cls2:
- __hash__ = self.func('_dar')
+ __hash__ = self.func("_dar")
+
obj = cls2()
with pytest.raises(AttributeError):
hash(obj)
@@ -486,7 +522,6 @@ class Test_reflective_hash:
class TestImmutableInstance:
-
def test_metaclass(self):
self.common_test(lambda x: x, metaclass=klass.immutable_instance)
@@ -506,7 +541,7 @@ class TestImmutableInstance:
with pytest.raises(AttributeError):
delattr(o, "dar")
- object.__setattr__(o, 'dar', 'foon')
+ object.__setattr__(o, "dar", "foon")
with pytest.raises(AttributeError):
delattr(o, "dar")
@@ -541,7 +576,6 @@ class TestAliasMethod:
class TestPatch:
-
def setup_method(self, method):
# cache original methods
self._math_ceil = math.ceil
@@ -556,7 +590,7 @@ class TestPatch:
n = 0.1
assert math.ceil(n) == 1
- @klass.patch('math.ceil')
+ @klass.patch("math.ceil")
def ceil(orig_ceil, n):
return math.floor(n)
@@ -567,8 +601,8 @@ class TestPatch:
assert math.ceil(n) == 2
assert math.floor(n) == 1
- @klass.patch('math.ceil')
- @klass.patch('math.floor')
+ @klass.patch("math.ceil")
+ @klass.patch("math.floor")
def zero(orig_func, n):
return 0
diff --git a/tests/test_mappings.py b/tests/test_mappings.py
index b1aef254..1ffe7801 100644
--- a/tests/test_mappings.py
+++ b/tests/test_mappings.py
@@ -10,7 +10,6 @@ def a_dozen():
class BasicDict(mappings.DictMixin):
-
def __init__(self, i=None, **kwargs):
self._d = {}
mappings.DictMixin.__init__(self, i, **kwargs)
@@ -20,7 +19,6 @@ class BasicDict(mappings.DictMixin):
class MutableDict(BasicDict):
-
def __setitem__(self, key, val):
self._d[key] = val
@@ -36,7 +34,6 @@ class ImmutableDict(BasicDict):
class TestDictMixin:
-
def test_immutability(self):
d = ImmutableDict()
pytest.raises(AttributeError, d.__setitem__, "spork", "foon")
@@ -59,12 +56,12 @@ class TestDictMixin:
pytest.raises(KeyError, d.pop, "spork")
assert d.pop("spork", "bat") == "bat"
assert d.pop("foo") == "bar"
- assert d.popitem(), ("baz" == "cat")
+ assert d.popitem(), "baz" == "cat"
pytest.raises(KeyError, d.popitem)
assert d.pop("nonexistent", None) == None
def test_init(self):
- d = MutableDict((('foo', 'bar'), ('spork', 'foon')), baz="cat")
+ d = MutableDict((("foo", "bar"), ("spork", "foon")), baz="cat")
assert d["foo"] == "bar"
assert d["baz"] == "cat"
d.clear()
@@ -73,19 +70,20 @@ class TestDictMixin:
def test_bool(self):
d = MutableDict()
assert not d
- d['x'] = 1
+ d["x"] = 1
assert d
- del d['x']
+ del d["x"]
assert not d
class RememberingNegateMixin:
-
def setup_method(self, method):
self.negate_calls = []
+
def negate(i):
self.negate_calls.append(i)
return -i
+
self.negate = negate
def teardown_method(self, method):
@@ -94,7 +92,6 @@ class RememberingNegateMixin:
class LazyValDictTestMixin:
-
def test_invalid_operations(self):
pytest.raises(AttributeError, operator.setitem, self.dict, 7, 7)
pytest.raises(AttributeError, operator.delitem, self.dict, 7)
@@ -118,6 +115,7 @@ class LazyValDictTestMixin:
# missing key
def get():
return self.dict[42]
+
pytest.raises(KeyError, get)
def test_caching(self):
@@ -129,7 +127,6 @@ class LazyValDictTestMixin:
class TestLazyValDictWithList(LazyValDictTestMixin, RememberingNegateMixin):
-
def setup_method(self, method):
super().setup_method(method)
self.dict = mappings.LazyValDict(list(range(12)), self.negate)
@@ -148,14 +145,12 @@ class TestLazyValDictWithList(LazyValDictTestMixin, RememberingNegateMixin):
class TestLazyValDictWithFunc(LazyValDictTestMixin, RememberingNegateMixin):
-
def setup_method(self, method):
super().setup_method(method)
self.dict = mappings.LazyValDict(a_dozen, self.negate)
class TestLazyValDict:
-
def test_invalid_init_args(self):
pytest.raises(TypeError, mappings.LazyValDict, [1], 42)
pytest.raises(TypeError, mappings.LazyValDict, 42, a_dozen)
@@ -164,36 +159,43 @@ class TestLazyValDict:
# TODO check for valid values for dict.new, since that seems to be
# part of the interface?
class TestProtectedDict:
-
def setup_method(self, method):
self.orig = {1: -1, 2: -2}
self.dict = mappings.ProtectedDict(self.orig)
def test_basic_operations(self):
assert self.dict[1] == -1
+
def get(i):
return self.dict[i]
+
pytest.raises(KeyError, get, 3)
assert sorted(self.dict.keys()) == [1, 2]
assert -1 not in self.dict
assert 2 in self.dict
+
def remove(i):
del self.dict[i]
+
pytest.raises(KeyError, remove, 50)
def test_basic_mutating(self):
# add something
self.dict[7] = -7
+
def check_after_adding():
assert self.dict[7] == -7
assert 7 in self.dict
assert sorted(self.dict.keys()) == [1, 2, 7]
+
check_after_adding()
# remove it again
del self.dict[7]
assert 7 not in self.dict
+
def get(i):
return self.dict[i]
+
pytest.raises(KeyError, get, 7)
assert sorted(self.dict.keys()) == [1, 2]
# add it back
@@ -214,7 +216,6 @@ class TestProtectedDict:
class TestImmutableDict:
-
def test_init_iterator(self):
d = mappings.ImmutableDict((x, x) for x in range(3))
assert dict(d) == {0: 0, 1: 1, 2: 2}
@@ -239,7 +240,7 @@ class TestImmutableDict:
def test_init_dictmixin(self):
d = MutableDict(baz="cat")
e = mappings.ImmutableDict(d)
- assert dict(d) == {'baz': 'cat'}
+ assert dict(d) == {"baz": "cat"}
def test_init_bad_data(self):
for data in (range(10), list(range(10)), [([], 1)]):
@@ -288,7 +289,6 @@ class TestImmutableDict:
class TestOrderedFrozenSet:
-
def test_magic_methods(self):
s = mappings.OrderedFrozenSet(range(9))
for x in range(9):
@@ -299,7 +299,7 @@ class TestOrderedFrozenSet:
for i in range(9):
assert s[i] == i
assert list(s[1:]) == list(range(1, 9))
- with pytest.raises(IndexError, match='index out of range'):
+ with pytest.raises(IndexError, match="index out of range"):
s[9]
assert s == set(range(9))
@@ -308,12 +308,12 @@ class TestOrderedFrozenSet:
assert hash(s)
def test_ordering(self):
- s = mappings.OrderedFrozenSet('set')
- assert 'set' == ''.join(s)
- assert 'tes' == ''.join(reversed(s))
- s = mappings.OrderedFrozenSet('setordered')
- assert 'setord' == ''.join(s)
- assert 'drotes' == ''.join(reversed(s))
+ s = mappings.OrderedFrozenSet("set")
+ assert "set" == "".join(s)
+ assert "tes" == "".join(reversed(s))
+ s = mappings.OrderedFrozenSet("setordered")
+ assert "setord" == "".join(s)
+ assert "drotes" == "".join(reversed(s))
def test_immmutability(self):
s = mappings.OrderedFrozenSet(range(9))
@@ -355,41 +355,40 @@ class TestOrderedFrozenSet:
class TestOrderedSet(TestOrderedFrozenSet):
-
def test_hash(self):
with pytest.raises(TypeError):
- assert hash(mappings.OrderedSet('set'))
+ assert hash(mappings.OrderedSet("set"))
def test_add(self):
s = mappings.OrderedSet()
- s.add('a')
- assert 'a' in s
+ s.add("a")
+ assert "a" in s
s.add(1)
assert 1 in s
- assert list(s) == ['a', 1]
+ assert list(s) == ["a", 1]
def test_discard(self):
s = mappings.OrderedSet()
- s.discard('a')
- s.add('a')
+ s.discard("a")
+ s.add("a")
assert s
- s.discard('a')
+ s.discard("a")
assert not s
def test_remove(self):
s = mappings.OrderedSet()
with pytest.raises(KeyError):
- s.remove('a')
- s.add('a')
- assert 'a' in s
- s.remove('a')
- assert 'a' not in s
+ s.remove("a")
+ s.add("a")
+ assert "a" in s
+ s.remove("a")
+ assert "a" not in s
def test_clear(self):
s = mappings.OrderedSet()
s.clear()
assert len(s) == 0
- s.add('a')
+ s.add("a")
assert len(s) == 1
s.clear()
assert len(s) == 0
@@ -425,8 +424,9 @@ class TestStackedDict:
assert x in std
def test_len(self):
- assert sum(map(len, (self.orig_dict, self.new_dict))) == \
- len(mappings.StackedDict(self.orig_dict, self.new_dict))
+ assert sum(map(len, (self.orig_dict, self.new_dict))) == len(
+ mappings.StackedDict(self.orig_dict, self.new_dict)
+ )
def test_setattr(self):
pytest.raises(TypeError, mappings.StackedDict().__setitem__, (1, 2))
@@ -447,24 +447,28 @@ class TestStackedDict:
assert len(s) == 0
def test_keys(self):
- assert sorted(mappings.StackedDict(self.orig_dict, self.new_dict)) == \
- sorted(list(self.orig_dict.keys()) + list(self.new_dict.keys()))
+ assert sorted(mappings.StackedDict(self.orig_dict, self.new_dict)) == sorted(
+ list(self.orig_dict.keys()) + list(self.new_dict.keys())
+ )
class TestIndeterminantDict:
-
def test_disabled_methods(self):
d = mappings.IndeterminantDict(lambda *a: None)
for x in (
- "clear",
- ("update", {}),
- ("setdefault", 1),
- "__iter__", "__len__", "__hash__",
- ("__delitem__", 1),
- ("__setitem__", 2),
- ("popitem", 2),
- "keys", "items", "values",
- ):
+ "clear",
+ ("update", {}),
+ ("setdefault", 1),
+ "__iter__",
+ "__len__",
+ "__hash__",
+ ("__delitem__", 1),
+ ("__setitem__", 2),
+ ("popitem", 2),
+ "keys",
+ "items",
+ "values",
+ ):
if isinstance(x, tuple):
pytest.raises(TypeError, getattr(d, x[0]), x[1])
else:
@@ -472,7 +476,8 @@ class TestIndeterminantDict:
def test_starter_dict(self):
d = mappings.IndeterminantDict(
- lambda key: False, starter_dict={}.fromkeys(range(100), True))
+ lambda key: False, starter_dict={}.fromkeys(range(100), True)
+ )
for x in range(100):
assert d[x] == True
for x in range(100, 110):
@@ -481,21 +486,24 @@ class TestIndeterminantDict:
def test_behaviour(self):
val = []
d = mappings.IndeterminantDict(
- lambda key: val.append(key), {}.fromkeys(range(10), True))
+ lambda key: val.append(key), {}.fromkeys(range(10), True)
+ )
assert d[0] == True
assert d[11] == None
assert val == [11]
+
def func(*a):
raise KeyError
+
with pytest.raises(KeyError):
mappings.IndeterminantDict(func).__getitem__(1)
-
def test_get(self):
def func(key):
if key == 2:
raise KeyError
return True
+
d = mappings.IndeterminantDict(func, {1: 1})
assert d.get(1, 1) == 1
assert d.get(1, 2) == 1
@@ -505,41 +513,42 @@ class TestIndeterminantDict:
class TestFoldingDict:
-
def test_preserve(self):
dct = mappings.PreservingFoldingDict(
- str.lower, list({'Foo': 'bar', 'fnz': 'donkey'}.items()))
- assert dct['fnz'] == 'donkey'
- assert dct['foo'] == 'bar'
- assert sorted(['bar' == 'donkey']), sorted(dct.values())
+ str.lower, list({"Foo": "bar", "fnz": "donkey"}.items())
+ )
+ assert dct["fnz"] == "donkey"
+ assert dct["foo"] == "bar"
+ assert sorted(["bar" == "donkey"]), sorted(dct.values())
assert dct.copy() == dct
- assert dct['foo'] == dct.get('Foo')
- assert 'foo' in dct
- keys = ['Foo', 'fnz']
+ assert dct["foo"] == dct.get("Foo")
+ assert "foo" in dct
+ keys = ["Foo", "fnz"]
keysList = list(dct)
for key in keys:
assert key in list(dct.keys())
assert key in keysList
assert (key, dct[key]) in list(dct.items())
assert len(keys) == len(dct)
- assert dct.pop('foo') == 'bar'
- assert 'foo' not in dct
- del dct['fnz']
- assert 'fnz' not in dct
- dct['Foo'] = 'bar'
+ assert dct.pop("foo") == "bar"
+ assert "foo" not in dct
+ del dct["fnz"]
+ assert "fnz" not in dct
+ dct["Foo"] = "bar"
dct.refold(lambda _: _)
- assert 'foo' not in dct
- assert 'Foo' in dct
- assert list(dct.items()) == [('Foo', 'bar')]
+ assert "foo" not in dct
+ assert "Foo" in dct
+ assert list(dct.items()) == [("Foo", "bar")]
dct.clear()
assert {} == dict(dct)
def test_no_preserve(self):
dct = mappings.NonPreservingFoldingDict(
- str.lower, list({'Foo': 'bar', 'fnz': 'monkey'}.items()))
- assert sorted(['bar', 'monkey']) == sorted(dct.values())
+ str.lower, list({"Foo": "bar", "fnz": "monkey"}.items())
+ )
+ assert sorted(["bar", "monkey"]) == sorted(dct.values())
assert dct.copy() == dct
- keys = ['foo', 'fnz']
+ keys = ["foo", "fnz"]
keysList = [key for key in dct]
for key in keys:
assert key in list(dct.keys())
@@ -547,8 +556,8 @@ class TestFoldingDict:
assert key in keysList
assert (key, dct[key]) in list(dct.items())
assert len(keys) == len(dct)
- assert dct.pop('foo') == 'bar'
- del dct['fnz']
+ assert dct.pop("foo") == "bar"
+ del dct["fnz"]
assert list(dct.keys()) == []
dct.clear()
assert {} == dict(dct)
@@ -580,20 +589,20 @@ class Test_attr_to_item_mapping:
if kls is None:
kls = self.kls
o = kls(f=2, g=3)
- assert ['f', 'g'] == sorted(o)
- self.assertBoth(o, 'g', 3)
+ assert ["f", "g"] == sorted(o)
+ self.assertBoth(o, "g", 3)
o.g = 4
- self.assertBoth(o, 'g', 4)
+ self.assertBoth(o, "g", 4)
del o.g
with pytest.raises(KeyError):
- operator.__getitem__(o, 'g')
+ operator.__getitem__(o, "g")
with pytest.raises(AttributeError):
- getattr(o, 'g')
- del o['f']
+ getattr(o, "g")
+ del o["f"]
with pytest.raises(KeyError):
- operator.__getitem__(o, 'f')
+ operator.__getitem__(o, "f")
with pytest.raises(AttributeError):
- getattr(o, 'f')
+ getattr(o, "f")
def test_inject(self):
class foon(dict):
@@ -611,30 +620,31 @@ class Test_ProxiedAttrs:
def __init__(self, **kwargs):
for attr, val in kwargs.items():
setattr(self, attr, val)
+
obj = foo()
d = self.kls(obj)
with pytest.raises(KeyError):
- operator.__getitem__(d, 'x')
+ operator.__getitem__(d, "x")
with pytest.raises(KeyError):
- operator.__delitem__(d, 'x')
- assert 'x' not in d
- d['x'] = 1
- assert d['x'] == 1
- assert 'x' in d
- assert ['x'] == list(x for x in d if not x.startswith("__"))
- del d['x']
- assert 'x' not in d
+ operator.__delitem__(d, "x")
+ assert "x" not in d
+ d["x"] = 1
+ assert d["x"] == 1
+ assert "x" in d
+ assert ["x"] == list(x for x in d if not x.startswith("__"))
+ del d["x"]
+ assert "x" not in d
with pytest.raises(KeyError):
- operator.__delitem__(d, 'x')
+ operator.__delitem__(d, "x")
with pytest.raises(KeyError):
- operator.__getitem__(d, 'x')
+ operator.__getitem__(d, "x")
# Finally, verify that immutable attribute errors are handled correctly.
d = self.kls(object())
with pytest.raises(KeyError):
- operator.__setitem__(d, 'x', 1)
+ operator.__setitem__(d, "x", 1)
with pytest.raises(KeyError):
- operator.__delitem__(d, 'x')
+ operator.__delitem__(d, "x")
class TestSlottedDict:
@@ -642,9 +652,9 @@ class TestSlottedDict:
kls = staticmethod(mappings.make_SlottedDict_kls)
def test_exceptions(self):
- d = self.kls(['spork'])()
+ d = self.kls(["spork"])()
for op in (operator.getitem, operator.delitem):
with pytest.raises(KeyError):
- op(d, 'spork')
+ op(d, "spork")
with pytest.raises(KeyError):
- op(d, 'foon')
+ op(d, "foon")
diff --git a/tests/test_modules.py b/tests/test_modules.py
index f4174979..da20f4f8 100644
--- a/tests/test_modules.py
+++ b/tests/test_modules.py
@@ -5,19 +5,18 @@ from snakeoil import modules
class TestModules:
-
@pytest.fixture(autouse=True)
def _setup(self, tmp_path):
# set up some test modules for our use
- packdir = tmp_path / 'mod_testpack'
+ packdir = tmp_path / "mod_testpack"
packdir.mkdir()
# create an empty file
- (packdir / '__init__.py').touch()
+ (packdir / "__init__.py").touch()
for directory in (tmp_path, packdir):
for i in range(3):
- (directory / f'mod_test{i}.py').write_text('def foo(): pass\n')
- (directory / 'mod_horked.py').write_text('1/0\n')
+ (directory / f"mod_test{i}.py").write_text("def foo(): pass\n")
+ (directory / "mod_horked.py").write_text("1/0\n")
# append them to path
sys.path.insert(0, str(tmp_path))
@@ -27,89 +26,93 @@ class TestModules:
sys.path.pop(0)
# make sure we don't keep the sys.modules entries around
for i in range(3):
- sys.modules.pop('mod_test%s' % i, None)
- sys.modules.pop('mod_testpack.mod_test%s' % i, None)
- sys.modules.pop('mod_testpack', None)
- sys.modules.pop('mod_horked', None)
- sys.modules.pop('mod_testpack.mod_horked', None)
+ sys.modules.pop("mod_test%s" % i, None)
+ sys.modules.pop("mod_testpack.mod_test%s" % i, None)
+ sys.modules.pop("mod_testpack", None)
+ sys.modules.pop("mod_horked", None)
+ sys.modules.pop("mod_testpack.mod_horked", None)
def test_load_module(self):
# import an already-imported module
- assert modules.load_module('snakeoil.modules') is modules
+ assert modules.load_module("snakeoil.modules") is modules
# and a system one, just for kicks
- assert modules.load_module('sys') is sys
+ assert modules.load_module("sys") is sys
# non-existing module from an existing package
with pytest.raises(modules.FailedImport):
- modules.load_module('snakeoil.__not_there')
+ modules.load_module("snakeoil.__not_there")
# (hopefully :) non-existing top-level module/package
with pytest.raises(modules.FailedImport):
- modules.load_module('__not_there')
+ modules.load_module("__not_there")
# "Unable to import"
# pylint: disable=F0401
# unimported toplevel module
- modtest1 = modules.load_module('mod_test1')
+ modtest1 = modules.load_module("mod_test1")
import mod_test1
+
assert mod_test1 is modtest1
# unimported in-package module
- packtest2 = modules.load_module('mod_testpack.mod_test2')
+ packtest2 = modules.load_module("mod_testpack.mod_test2")
from mod_testpack import mod_test2
+
assert mod_test2 is packtest2
def test_load_attribute(self):
# already imported
- assert modules.load_attribute('sys.path') is sys.path
+ assert modules.load_attribute("sys.path") is sys.path
# unimported
- myfoo = modules.load_attribute('mod_testpack.mod_test2.foo')
+ myfoo = modules.load_attribute("mod_testpack.mod_test2.foo")
# "Unable to import"
# pylint: disable=F0401
from mod_testpack.mod_test2 import foo
+
assert foo is myfoo
# nonexisting attribute
with pytest.raises(modules.FailedImport):
- modules.load_attribute('snakeoil.froznicator')
+ modules.load_attribute("snakeoil.froznicator")
# nonexisting top-level
with pytest.raises(modules.FailedImport):
- modules.load_attribute('spork_does_not_exist.foo')
+ modules.load_attribute("spork_does_not_exist.foo")
# not an attr
with pytest.raises(modules.FailedImport):
- modules.load_attribute('sys')
+ modules.load_attribute("sys")
# not imported yet
with pytest.raises(modules.FailedImport):
- modules.load_attribute('mod_testpack.mod_test3')
+ modules.load_attribute("mod_testpack.mod_test3")
def test_load_any(self):
# import an already-imported module
- assert modules.load_any('snakeoil.modules') is modules
+ assert modules.load_any("snakeoil.modules") is modules
# attribute of an already imported module
- assert modules.load_any('sys.path') is sys.path
+ assert modules.load_any("sys.path") is sys.path
# already imported toplevel.
- assert sys is modules.load_any('sys')
+ assert sys is modules.load_any("sys")
# unimported
- myfoo = modules.load_any('mod_testpack.mod_test2.foo')
+ myfoo = modules.load_any("mod_testpack.mod_test2.foo")
# "Unable to import"
# pylint: disable=F0401
from mod_testpack.mod_test2 import foo
+
assert foo is myfoo
# nonexisting attribute
with pytest.raises(modules.FailedImport):
- modules.load_any('snakeoil.froznicator')
+ modules.load_any("snakeoil.froznicator")
# nonexisting top-level
with pytest.raises(modules.FailedImport):
- modules.load_any('spork_does_not_exist.foo')
+ modules.load_any("spork_does_not_exist.foo")
with pytest.raises(modules.FailedImport):
- modules.load_any('spork_does_not_exist')
+ modules.load_any("spork_does_not_exist")
# not imported yet
with pytest.raises(modules.FailedImport):
- modules.load_any('mod_testpack.mod_test3')
+ modules.load_any("mod_testpack.mod_test3")
def test_broken_module(self):
for func in [modules.load_module, modules.load_any]:
with pytest.raises(modules.FailedImport):
- func('mod_testpack.mod_horked')
- assert 'mod_testpack.mod_horked' not in sys.modules
+ func("mod_testpack.mod_horked")
+ assert "mod_testpack.mod_horked" not in sys.modules
diff --git a/tests/test_obj.py b/tests/test_obj.py
index 83f9f776..78083d8b 100644
--- a/tests/test_obj.py
+++ b/tests/test_obj.py
@@ -7,7 +7,6 @@ make_DIkls = obj.DelayedInstantiation_kls
class TestDelayedInstantiation:
-
def test_simple(self):
t = tuple([1, 2, 3])
o = make_DI(tuple, lambda: t)
@@ -19,19 +18,34 @@ class TestDelayedInstantiation:
assert t >= o
def test_descriptor_awareness(self):
- def assertKls(cls, ignores=(),
- default_ignores=("__new__", "__init__", "__init_subclass__",
- "__getattribute__", "__class__",
- "__getnewargs__", "__getstate__",
- "__doc__", "__class_getitem__")):
- required = set(x for x in dir(cls)
- if x.startswith("__") and x.endswith("__"))
+ def assertKls(
+ cls,
+ ignores=(),
+ default_ignores=(
+ "__new__",
+ "__init__",
+ "__init_subclass__",
+ "__getattribute__",
+ "__class__",
+ "__getnewargs__",
+ "__getstate__",
+ "__doc__",
+ "__class_getitem__",
+ ),
+ ):
+ required = set(
+ x for x in dir(cls) if x.startswith("__") and x.endswith("__")
+ )
missing = required.difference(obj.kls_descriptors)
missing.difference_update(obj.base_kls_descriptors)
missing.difference_update(default_ignores)
missing.difference_update(ignores)
- assert not missing, ("object %r potentially has unsupported special "
- "attributes: %s" % (cls, ', '.join(missing)))
+ assert (
+ not missing
+ ), "object %r potentially has unsupported special " "attributes: %s" % (
+ cls,
+ ", ".join(missing),
+ )
assertKls(object)
assertKls(1)
@@ -43,25 +57,38 @@ class TestDelayedInstantiation:
def test_BaseDelayedObject(self):
# assert that all methods/descriptors of object
# are covered via the base.
- o = set(dir(object)).difference(f"__{x}__" for x in (
- "class", "getattribute", "new", "init", "init_subclass", "getstate", "doc"))
+ o = set(dir(object)).difference(
+ f"__{x}__"
+ for x in (
+ "class",
+ "getattribute",
+ "new",
+ "init",
+ "init_subclass",
+ "getstate",
+ "doc",
+ )
+ )
diff = o.difference(obj.base_kls_descriptors)
- assert not diff, ("base delayed instantiation class should cover all of object, but "
- "%r was spotted" % (",".join(sorted(diff)),))
+ assert not diff, (
+ "base delayed instantiation class should cover all of object, but "
+ "%r was spotted" % (",".join(sorted(diff)),)
+ )
assert obj.DelayedInstantiation_kls(int, "1") + 2 == 3
-
def test_klass_choice_optimization(self):
"""ensure that BaseDelayedObject is used whenever possible"""
# note object is an odd one- it actually has a __doc__, thus
# it must always be a custom
o = make_DI(object, object)
- assert object.__getattribute__(o, '__class__') is not obj.BaseDelayedObject
+ assert object.__getattribute__(o, "__class__") is not obj.BaseDelayedObject
+
class foon:
pass
+
o = make_DI(foon, foon)
- cls = object.__getattribute__(o, '__class__')
+ cls = object.__getattribute__(o, "__class__")
assert cls is obj.BaseDelayedObject
# now ensure we always get the same kls back for derivatives
@@ -70,39 +97,43 @@ class TestDelayedInstantiation:
return True
o = make_DI(foon, foon)
- cls = object.__getattribute__(o, '__class__')
+ cls = object.__getattribute__(o, "__class__")
assert cls is not obj.BaseDelayedObject
o = make_DI(foon, foon)
- cls2 = object.__getattribute__(o, '__class__')
+ cls2 = object.__getattribute__(o, "__class__")
assert cls is cls2
def test__class__(self):
l = []
+
def f():
l.append(False)
return True
+
o = make_DI(bool, f)
assert isinstance(o, bool)
assert not l, "accessing __class__ shouldn't trigger instantiation"
def test__doc__(self):
l = []
+
def f():
l.append(True)
return foon()
+
class foon:
__doc__ = "monkey"
o = make_DI(foon, f)
- assert o.__doc__ == 'monkey'
+ assert o.__doc__ == "monkey"
assert not l, (
"in accessing __doc__, the instance was generated- "
"this is a class level attribute, thus shouldn't "
- "trigger instantiation")
+ "trigger instantiation"
+ )
class TestPopattr:
-
class Object:
pass
@@ -113,21 +144,21 @@ class TestPopattr:
def test_no_attrs(self):
# object without any attrs
with pytest.raises(AttributeError):
- obj.popattr(object(), 'nonexistent')
+ obj.popattr(object(), "nonexistent")
def test_nonexistent_attr(self):
# object with attr trying to get nonexistent attr
with pytest.raises(AttributeError):
- obj.popattr(self.o, 'nonexistent')
+ obj.popattr(self.o, "nonexistent")
def test_fallback(self):
# object with attr trying to get nonexistent attr using fallback
- value = obj.popattr(self.o, 'nonexistent', 2)
+ value = obj.popattr(self.o, "nonexistent", 2)
assert value == 2
def test_removed_attr(self):
- value = obj.popattr(self.o, 'test')
+ value = obj.popattr(self.o, "test")
assert value == 1
# verify that attr was removed from the object
with pytest.raises(AttributeError):
- obj.popattr(self.o, 'test')
+ obj.popattr(self.o, "test")
diff --git a/tests/test_osutils.py b/tests/test_osutils.py
index 18092823..264d670d 100644
--- a/tests/test_osutils.py
+++ b/tests/test_osutils.py
@@ -16,44 +16,45 @@ from snakeoil.osutils.mount import MNT_DETACH, MS_BIND, mount, umount
class ReaddirCommon:
-
@pytest.fixture
def subdir(self, tmp_path):
- subdir = tmp_path / 'dir'
+ subdir = tmp_path / "dir"
subdir.mkdir()
- (tmp_path / 'file').touch()
- os.mkfifo((tmp_path / 'fifo'))
+ (tmp_path / "file").touch()
+ os.mkfifo((tmp_path / "fifo"))
return subdir
def _test_missing(self, tmp_path, funcs):
for func in funcs:
- pytest.raises(OSError, func, tmp_path / 'spork')
+ pytest.raises(OSError, func, tmp_path / "spork")
class TestNativeListDir(ReaddirCommon):
-
def test_listdir(self, tmp_path, subdir):
- assert set(native_readdir.listdir(tmp_path)) == {'dir', 'fifo', 'file'}
+ assert set(native_readdir.listdir(tmp_path)) == {"dir", "fifo", "file"}
assert native_readdir.listdir(subdir) == []
def test_listdir_dirs(self, tmp_path, subdir):
- assert native_readdir.listdir_dirs(tmp_path) == ['dir']
+ assert native_readdir.listdir_dirs(tmp_path) == ["dir"]
assert native_readdir.listdir_dirs(subdir) == []
def test_listdir_files(self, tmp_path, subdir):
- assert native_readdir.listdir_files(tmp_path) == ['file']
+ assert native_readdir.listdir_files(tmp_path) == ["file"]
assert native_readdir.listdir_dirs(subdir) == []
def test_missing(self, tmp_path, subdir):
- return self._test_missing(tmp_path, (
- native_readdir.listdir,
- native_readdir.listdir_dirs,
- native_readdir.listdir_files,
- ))
+ return self._test_missing(
+ tmp_path,
+ (
+ native_readdir.listdir,
+ native_readdir.listdir_dirs,
+ native_readdir.listdir_files,
+ ),
+ )
def test_dangling_sym(self, tmp_path, subdir):
(tmp_path / "monkeys").symlink_to("foon")
- assert native_readdir.listdir_files(tmp_path) == ['file']
+ assert native_readdir.listdir_files(tmp_path) == ["file"]
class TestNativeReaddir(ReaddirCommon):
@@ -78,36 +79,37 @@ class TestNativeReaddir(ReaddirCommon):
class TestEnsureDirs:
-
def check_dir(self, path, uid, gid, mode):
assert path.is_dir()
st = os.stat(path)
- assert stat.S_IMODE(st.st_mode) == mode, \
- '0%o != 0%o' % (stat.S_IMODE(st.st_mode), mode)
+ assert stat.S_IMODE(st.st_mode) == mode, "0%o != 0%o" % (
+ stat.S_IMODE(st.st_mode),
+ mode,
+ )
assert st.st_uid == uid
assert st.st_gid == gid
def test_ensure_dirs(self, tmp_path):
# default settings
- path = tmp_path / 'foo' / 'bar'
+ path = tmp_path / "foo" / "bar"
assert osutils.ensure_dirs(path)
self.check_dir(path, os.geteuid(), os.getegid(), 0o777)
def test_minimal_nonmodifying(self, tmp_path):
- path = tmp_path / 'foo' / 'bar'
+ path = tmp_path / "foo" / "bar"
assert osutils.ensure_dirs(path, mode=0o755)
os.chmod(path, 0o777)
assert osutils.ensure_dirs(path, mode=0o755, minimal=True)
self.check_dir(path, os.geteuid(), os.getegid(), 0o777)
def test_minimal_modifying(self, tmp_path):
- path = tmp_path / 'foo' / 'bar'
+ path = tmp_path / "foo" / "bar"
assert osutils.ensure_dirs(path, mode=0o750)
assert osutils.ensure_dirs(path, mode=0o005, minimal=True)
self.check_dir(path, os.geteuid(), os.getegid(), 0o755)
def test_create_unwritable_subdir(self, tmp_path):
- path = tmp_path / 'restricted' / 'restricted'
+ path = tmp_path / "restricted" / "restricted"
# create the subdirs without 020 first
assert osutils.ensure_dirs(path.parent)
assert osutils.ensure_dirs(path, mode=0o020)
@@ -118,38 +120,39 @@ class TestEnsureDirs:
def test_path_is_a_file(self, tmp_path):
# fail if passed a path to an existing file
- path = tmp_path / 'file'
+ path = tmp_path / "file"
touch(path)
assert path.is_file()
assert not osutils.ensure_dirs(path, mode=0o700)
def test_non_dir_in_path(self, tmp_path):
# fail if one of the parts of the path isn't a dir
- path = tmp_path / 'file' / 'dir'
- (tmp_path / 'file').touch()
+ path = tmp_path / "file" / "dir"
+ (tmp_path / "file").touch()
assert not osutils.ensure_dirs(path, mode=0o700)
def test_mkdir_failing(self, tmp_path):
# fail if os.mkdir fails
- with mock.patch('snakeoil.osutils.os.mkdir') as mkdir:
- mkdir.side_effect = OSError(30, 'Read-only file system')
- path = tmp_path / 'dir'
+ with mock.patch("snakeoil.osutils.os.mkdir") as mkdir:
+ mkdir.side_effect = OSError(30, "Read-only file system")
+ path = tmp_path / "dir"
assert not osutils.ensure_dirs(path, mode=0o700)
# force temp perms
assert not osutils.ensure_dirs(path, mode=0o400)
- mkdir.side_effect = OSError(17, 'File exists')
+ mkdir.side_effect = OSError(17, "File exists")
assert not osutils.ensure_dirs(path, mode=0o700)
def test_chmod_or_chown_failing(self, tmp_path):
# fail if chmod or chown fails
- path = tmp_path / 'dir'
+ path = tmp_path / "dir"
path.mkdir()
path.chmod(0o750)
- with mock.patch('snakeoil.osutils.os.chmod') as chmod, \
- mock.patch('snakeoil.osutils.os.chown') as chown:
- chmod.side_effect = OSError(5, 'Input/output error')
+ with mock.patch("snakeoil.osutils.os.chmod") as chmod, mock.patch(
+ "snakeoil.osutils.os.chown"
+ ) as chown:
+ chmod.side_effect = OSError(5, "Input/output error")
# chmod failure when file exists and trying to reset perms to match
# the specified mode
@@ -163,13 +166,13 @@ class TestEnsureDirs:
# chown failure when resetting perms on parents
chmod.side_effect = None
- chown.side_effect = OSError(5, 'Input/output error')
+ chown.side_effect = OSError(5, "Input/output error")
assert not osutils.ensure_dirs(path, uid=1000, gid=1000, mode=0o400)
def test_reset_sticky_parent_perms(self, tmp_path):
# make sure perms are reset after traversing over sticky parents
- sticky_parent = tmp_path / 'dir'
- path = sticky_parent / 'dir'
+ sticky_parent = tmp_path / "dir"
+ path = sticky_parent / "dir"
sticky_parent.mkdir()
sticky_parent.chmod(0o2755)
pre_sticky_parent = os.stat(sticky_parent)
@@ -178,7 +181,7 @@ class TestEnsureDirs:
assert pre_sticky_parent.st_mode == post_sticky_parent.st_mode
def test_mode(self, tmp_path):
- path = tmp_path / 'mode' / 'mode'
+ path = tmp_path / "mode" / "mode"
assert osutils.ensure_dirs(path, mode=0o700)
self.check_dir(path, os.geteuid(), os.getegid(), 0o700)
# unrestrict it
@@ -188,12 +191,12 @@ class TestEnsureDirs:
def test_gid(self, tmp_path):
# abuse the portage group as secondary group
try:
- portage_gid = grp.getgrnam('portage').gr_gid
+ portage_gid = grp.getgrnam("portage").gr_gid
except KeyError:
- pytest.skip('the portage group does not exist')
+ pytest.skip("the portage group does not exist")
if portage_gid not in os.getgroups():
- pytest.skip('you are not in the portage group')
- path = tmp_path / 'group' / 'group'
+ pytest.skip("you are not in the portage group")
+ path = tmp_path / "group" / "group"
assert osutils.ensure_dirs(path, gid=portage_gid)
self.check_dir(path, os.geteuid(), portage_gid, 0o777)
assert osutils.ensure_dirs(path)
@@ -203,12 +206,11 @@ class TestEnsureDirs:
class TestAbsSymlink:
-
def test_abssymlink(self, tmp_path):
- target = tmp_path / 'target'
- linkname = tmp_path / 'link'
+ target = tmp_path / "target"
+ linkname = tmp_path / "link"
target.mkdir()
- linkname.symlink_to('target')
+ linkname.symlink_to("target")
assert osutils.abssymlink(linkname) == str(target)
@@ -223,28 +225,30 @@ class Test_Native_NormPath:
got = f(src)
assert got == val, f"{src!r}: expected {val!r}, got {got!r}"
- check('/foo/', '/foo')
- check('//foo/', '/foo')
- check('//foo/.', '/foo')
- check('//..', '/')
- check('//..//foo', '/foo')
- check('/foo/..', '/')
- check('..//foo', '../foo')
- check('../foo/../', '..')
- check('../', '..')
- check('../foo/..', '..')
- check('../foo/../dar', '../dar')
- check('.//foo', 'foo')
- check('/foo/../../', '/')
- check('/foo/../../..', '/')
- check('/tmp/foo/../dar/', '/tmp/dar')
- check('/tmp/foo/../dar', '/tmp/dar')
+ check("/foo/", "/foo")
+ check("//foo/", "/foo")
+ check("//foo/.", "/foo")
+ check("//..", "/")
+ check("//..//foo", "/foo")
+ check("/foo/..", "/")
+ check("..//foo", "../foo")
+ check("../foo/../", "..")
+ check("../", "..")
+ check("../foo/..", "..")
+ check("../foo/../dar", "../dar")
+ check(".//foo", "foo")
+ check("/foo/../../", "/")
+ check("/foo/../../..", "/")
+ check("/tmp/foo/../dar/", "/tmp/dar")
+ check("/tmp/foo/../dar", "/tmp/dar")
# explicit unicode and bytes
- check('/tmṕ/föo//../dár', '/tmṕ/dár')
- check(b'/tm\xe1\xb9\x95/f\xc3\xb6o//../d\xc3\xa1r', b'/tm\xe1\xb9\x95/d\xc3\xa1r')
- check('/föó/..', '/')
- check(b'/f\xc3\xb6\xc3\xb3/..', b'/')
+ check("/tmṕ/föo//../dár", "/tmṕ/dár")
+ check(
+ b"/tm\xe1\xb9\x95/f\xc3\xb6o//../d\xc3\xa1r", b"/tm\xe1\xb9\x95/d\xc3\xa1r"
+ )
+ check("/föó/..", "/")
+ check(b"/f\xc3\xb6\xc3\xb3/..", b"/")
@pytest.mark.skipif(os.getuid() != 0, reason="these tests must be ran as root")
@@ -253,7 +257,7 @@ class TestAccess:
func = staticmethod(osutils.fallback_access)
def test_fallback(self, tmp_path):
- fp = tmp_path / 'file'
+ fp = tmp_path / "file"
# create the file
fp.touch()
fp.chmod(0o000)
@@ -270,9 +274,9 @@ class Test_unlink_if_exists:
def test_it(self, tmp_path):
f = self.func
- path = tmp_path / 'target'
+ path = tmp_path / "target"
f(path)
- path.write_text('')
+ path.write_text("")
f(path)
assert not path.exists()
# and once more for good measure...
@@ -280,18 +284,17 @@ class Test_unlink_if_exists:
class TestSupportedSystems:
-
def test_supported_system(self):
- @supported_systems('supported')
+ @supported_systems("supported")
def func():
return True
- with mock.patch('snakeoil.osutils.sys') as _sys:
- _sys.configure_mock(platform='supported')
+ with mock.patch("snakeoil.osutils.sys") as _sys:
+ _sys.configure_mock(platform="supported")
assert func()
def test_unsupported_system(self):
- @supported_systems('unsupported')
+ @supported_systems("unsupported")
def func():
return True
@@ -299,39 +302,39 @@ class TestSupportedSystems:
func()
# make sure we're iterating through the system params correctly
- with mock.patch('snakeoil.osutils.sys') as _sys:
- _sys.configure_mock(platform='u')
+ with mock.patch("snakeoil.osutils.sys") as _sys:
+ _sys.configure_mock(platform="u")
with pytest.raises(NotImplementedError):
func()
def test_multiple_systems(self):
- @supported_systems('darwin', 'linux')
+ @supported_systems("darwin", "linux")
def func():
return True
- with mock.patch('snakeoil.osutils.sys') as _sys:
- _sys.configure_mock(platform='nonexistent')
+ with mock.patch("snakeoil.osutils.sys") as _sys:
+ _sys.configure_mock(platform="nonexistent")
with pytest.raises(NotImplementedError):
func()
- for platform in ('linux2', 'darwin'):
+ for platform in ("linux2", "darwin"):
_sys.configure_mock(platform=platform)
assert func()
-@pytest.mark.skipif(not sys.platform.startswith('linux'),
- reason='supported on Linux only')
+@pytest.mark.skipif(
+ not sys.platform.startswith("linux"), reason="supported on Linux only"
+)
class TestMount:
-
@pytest.fixture
def source(self, tmp_path):
- source = tmp_path / 'source'
+ source = tmp_path / "source"
source.mkdir()
return source
@pytest.fixture
def target(self, tmp_path):
- target = tmp_path / 'target'
+ target = tmp_path / "target"
target.mkdir()
return target
@@ -340,21 +343,25 @@ class TestMount:
# byte strings; if they are unicode strings the arguments get mangled
# leading to errors when the syscall is run. This confirms mount() from
# snakeoil.osutils always converts the arguments into byte strings.
- for source, target, fstype in ((b'source', b'target', b'fstype'),
- ('source', 'target', 'fstype')):
- with mock.patch('snakeoil.osutils.mount.ctypes') as mock_ctypes:
+ for source, target, fstype in (
+ (b"source", b"target", b"fstype"),
+ ("source", "target", "fstype"),
+ ):
+ with mock.patch("snakeoil.osutils.mount.ctypes") as mock_ctypes:
with pytest.raises(OSError):
mount(str(source), str(target), fstype, MS_BIND)
- mount_call = next(x for x in mock_ctypes.mock_calls if x[0] == 'CDLL().mount')
+ mount_call = next(
+ x for x in mock_ctypes.mock_calls if x[0] == "CDLL().mount"
+ )
for arg in mount_call[1][0:3]:
assert isinstance(arg, bytes)
def test_missing_dirs(self):
with pytest.raises(OSError) as cm:
- mount('source', 'target', None, MS_BIND)
+ mount("source", "target", None, MS_BIND)
assert cm.value.errno in (errno.EPERM, errno.ENOENT)
- @pytest.mark.skipif(os.getuid() == 0, reason='this test must be run as non-root')
+ @pytest.mark.skipif(os.getuid() == 0, reason="this test must be run as non-root")
def test_no_perms(self, source, target):
with pytest.raises(OSError) as cm:
mount(str(source), str(target), None, MS_BIND)
@@ -363,11 +370,15 @@ class TestMount:
umount(str(target))
assert cm.value.errno in (errno.EPERM, errno.EINVAL)
- @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/mnt') and os.path.exists('/proc/self/ns/user')),
- reason='user and mount namespace support required')
+ @pytest.mark.skipif(
+ not (
+ os.path.exists("/proc/self/ns/mnt") and os.path.exists("/proc/self/ns/user")
+ ),
+ reason="user and mount namespace support required",
+ )
def test_bind_mount(self, source, target):
- src_file = source / 'file'
- bind_file = target / 'file'
+ src_file = source / "file"
+ bind_file = target / "file"
src_file.touch()
try:
@@ -378,15 +389,19 @@ class TestMount:
umount(str(target))
assert not bind_file.exists()
except PermissionError:
- pytest.skip('No permission to use user and mount namespace')
-
- @pytest.mark.skipif(not (os.path.exists('/proc/self/ns/mnt') and os.path.exists('/proc/self/ns/user')),
- reason='user and mount namespace support required')
+ pytest.skip("No permission to use user and mount namespace")
+
+ @pytest.mark.skipif(
+ not (
+ os.path.exists("/proc/self/ns/mnt") and os.path.exists("/proc/self/ns/user")
+ ),
+ reason="user and mount namespace support required",
+ )
def test_lazy_unmount(self, source, target):
- src_file = source / 'file'
- bind_file = target / 'file'
+ src_file = source / "file"
+ bind_file = target / "file"
src_file.touch()
- src_file.write_text('foo')
+ src_file.write_text("foo")
try:
with Namespace(user=True, mount=True):
@@ -403,14 +418,14 @@ class TestMount:
# confirm the file doesn't exist in the bind mount anymore
assert not bind_file.exists()
# but the file is still accessible to the process
- assert f.read() == 'foo'
+ assert f.read() == "foo"
# trying to reopen causes IOError
with pytest.raises(IOError) as cm:
f = bind_file.open()
assert cm.value.errno == errno.ENOENT
except PermissionError:
- pytest.skip('No permission to use user and mount namespace')
+ pytest.skip("No permission to use user and mount namespace")
class TestSizeofFmt:
diff --git a/tests/test_process.py b/tests/test_process.py
index bb45712e..488b7b03 100644
--- a/tests/test_process.py
+++ b/tests/test_process.py
@@ -30,8 +30,10 @@ class TestFindBinary:
process.find_binary(self.script)
def test_fallback(self):
- fallback = process.find_binary(self.script, fallback=os.path.join('bin', self.script))
- assert fallback == os.path.join('bin', self.script)
+ fallback = process.find_binary(
+ self.script, fallback=os.path.join("bin", self.script)
+ )
+ assert fallback == os.path.join("bin", self.script)
def test_not_executable(self, tmp_path):
fp = tmp_path / self.script
diff --git a/tests/test_process_spawn.py b/tests/test_process_spawn.py
index 8981c6e7..556b34cc 100644
--- a/tests/test_process_spawn.py
+++ b/tests/test_process_spawn.py
@@ -6,11 +6,11 @@ from snakeoil import process
from snakeoil.contexts import chdir
from snakeoil.process import spawn
-BASH_BINARY = process.find_binary("bash", fallback='')
+BASH_BINARY = process.find_binary("bash", fallback="")
-@pytest.mark.skipif(not BASH_BINARY, reason='missing bash binary')
-class TestSpawn:
+@pytest.mark.skipif(not BASH_BINARY, reason="missing bash binary")
+class TestSpawn:
@pytest.fixture(autouse=True)
def _setup(self, tmp_path):
orig_path = os.environ["PATH"]
@@ -37,21 +37,25 @@ class TestSpawn:
def test_get_output(self, tmp_path, dev_null):
filename = "spawn-getoutput.sh"
for r, s, text, args in (
- [0, ["dar\n"], "echo dar\n", {}],
- [0, ["dar"], "echo -n dar", {}],
- [1, ["blah\n", "dar\n"], "echo blah\necho dar\nexit 1", {}],
- [0, [], "echo dar 1>&2", {"fd_pipes": {1: 1, 2: dev_null}}]):
+ [0, ["dar\n"], "echo dar\n", {}],
+ [0, ["dar"], "echo -n dar", {}],
+ [1, ["blah\n", "dar\n"], "echo blah\necho dar\nexit 1", {}],
+ [0, [], "echo dar 1>&2", {"fd_pipes": {1: 1, 2: dev_null}}],
+ ):
fp = self.generate_script(tmp_path, filename, text)
- assert (r, s) == spawn.spawn_get_output(str(fp), spawn_type=spawn.spawn_bash, **args)
+ assert (r, s) == spawn.spawn_get_output(
+ str(fp), spawn_type=spawn.spawn_bash, **args
+ )
os.unlink(fp)
@pytest.mark.skipif(not spawn.is_sandbox_capable(), reason="missing sandbox binary")
def test_sandbox(self, tmp_path):
- fp = self.generate_script(
- tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD")
+ fp = self.generate_script(tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD")
ret = spawn.spawn_get_output(str(fp), spawn_type=spawn.spawn_sandbox)
assert ret[1], "no output; exit code was %s; script location %s" % (ret[0], fp)
- assert "libsandbox.so" in [os.path.basename(x.strip()) for x in ret[1][0].split()]
+ assert "libsandbox.so" in [
+ os.path.basename(x.strip()) for x in ret[1][0].split()
+ ]
os.unlink(fp)
@pytest.mark.skipif(not spawn.is_sandbox_capable(), reason="missing sandbox binary")
@@ -60,15 +64,17 @@ class TestSpawn:
this verifies our fix works.
"""
- fp = self.generate_script(
- tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD")
+ fp = self.generate_script(tmp_path, "spawn-sandbox.sh", "echo $LD_PRELOAD")
dpath = tmp_path / "dar"
dpath.mkdir()
with chdir(dpath):
dpath.rmdir()
- assert "libsandbox.so" in \
- [os.path.basename(x.strip()) for x in spawn.spawn_get_output(
- str(fp), spawn_type=spawn.spawn_sandbox, cwd='/')[1][0].split()]
+ assert "libsandbox.so" in [
+ os.path.basename(x.strip())
+ for x in spawn.spawn_get_output(
+ str(fp), spawn_type=spawn.spawn_sandbox, cwd="/"
+ )[1][0].split()
+ ]
fp.unlink()
def test_process_exit_code(self):
@@ -98,13 +104,12 @@ class TestSpawn:
def test_spawn_bash(self, capfd):
# bash builtin for true without exec'ing true (eg, no path lookup)
- assert 0 == spawn.spawn_bash('echo bash')
+ assert 0 == spawn.spawn_bash("echo bash")
out, _err = capfd.readouterr()
- assert out.strip() == 'bash'
+ assert out.strip() == "bash"
def test_umask(self, tmp_path):
- fp = self.generate_script(
- tmp_path, "spawn-umask.sh", f"#!{BASH_BINARY}\numask")
+ fp = self.generate_script(tmp_path, "spawn-umask.sh", f"#!{BASH_BINARY}\numask")
try:
old_umask = os.umask(0)
if old_umask == 0:
@@ -113,7 +118,8 @@ class TestSpawn:
os.umask(desired)
else:
desired = 0
- assert str(desired).lstrip("0") == \
- spawn.spawn_get_output(str(fp))[1][0].strip().lstrip("0")
+ assert str(desired).lstrip("0") == spawn.spawn_get_output(str(fp))[1][
+ 0
+ ].strip().lstrip("0")
finally:
os.umask(old_umask)
diff --git a/tests/test_sequences.py b/tests/test_sequences.py
index edbaa5a0..0d8c5a62 100644
--- a/tests/test_sequences.py
+++ b/tests/test_sequences.py
@@ -8,13 +8,11 @@ from snakeoil.sequences import split_elements, split_negations
class UnhashableComplex(complex):
-
def __hash__(self):
raise TypeError
class TestStableUnique:
-
def common_check(self, func):
# silly
assert func(()) == []
@@ -23,9 +21,10 @@ class TestStableUnique:
# neither
def test_stable_unique(self, func=sequences.stable_unique):
- assert list(set([1, 2, 3])) == [1, 2, 3], \
- "this test is reliant on the interpreter hasing 1,2,3 into a specific ordering- " \
+ assert list(set([1, 2, 3])) == [1, 2, 3], (
+ "this test is reliant on the interpreter hasing 1,2,3 into a specific ordering- "
"for whatever reason, ordering differs, thus this test can't verify it"
+ )
assert func([3, 2, 1]) == [3, 2, 1]
def test_iter_stable_unique(self):
@@ -43,20 +42,19 @@ class TestStableUnique:
uc = UnhashableComplex
res = sequences.unstable_unique([uc(1, 0), uc(0, 1), uc(1, 0)])
# sortable
- assert sorted(sequences.unstable_unique(
- [[1, 2], [1, 3], [1, 2], [1, 3]])) == [[1, 2], [1, 3]]
+ assert sorted(sequences.unstable_unique([[1, 2], [1, 3], [1, 2], [1, 3]])) == [
+ [1, 2],
+ [1, 3],
+ ]
assert res == [uc(1, 0), uc(0, 1)] or res == [uc(0, 1), uc(1, 0)]
assert sorted(sequences.unstable_unique(self._generator())) == sorted(range(6))
class TestChainedLists:
-
@staticmethod
def gen_cl():
return sequences.ChainedLists(
- list(range(3)),
- list(range(3, 6)),
- list(range(6, 100))
+ list(range(3)), list(range(3, 6)), list(range(6, 100))
)
def test_contains(self):
@@ -72,7 +70,7 @@ class TestChainedLists:
def test_str(self):
l = sequences.ChainedLists(list(range(3)), list(range(3, 5)))
- assert str(l) == '[ [0, 1, 2], [3, 4] ]'
+ assert str(l) == "[ [0, 1, 2], [3, 4] ]"
def test_getitem(self):
cl = self.gen_cl()
@@ -108,15 +106,18 @@ class Test_iflatten_instance:
def test_it(self):
o = OrderedDict((k, None) for k in range(10))
for l, correct, skip in (
- (["asdf", ["asdf", "asdf"], 1, None],
- ["asdf", "asdf", "asdf", 1, None], str),
- ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)),
- ([o, 1, "fds"], list(range(10)) + [1, "fds"], str),
- ("fds", ["fds"], str),
- ("fds", ["f", "d", "s"], int),
- ('', [''], str),
- (1, [1], int),
- ):
+ (
+ ["asdf", ["asdf", "asdf"], 1, None],
+ ["asdf", "asdf", "asdf", 1, None],
+ str,
+ ),
+ ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)),
+ ([o, 1, "fds"], list(range(10)) + [1, "fds"], str),
+ ("fds", ["fds"], str),
+ ("fds", ["f", "d", "s"], int),
+ ("", [""], str),
+ (1, [1], int),
+ ):
iterator = self.func(l, skip)
assert list(iterator) == correct
assert list(iterator) == []
@@ -126,6 +127,7 @@ class Test_iflatten_instance:
# have to iterate.
def fail():
return list(self.func(None))
+
with pytest.raises(TypeError):
fail()
@@ -148,13 +150,16 @@ class Test_iflatten_func:
def test_it(self):
o = OrderedDict((k, None) for k in range(10))
for l, correct, skip in (
- (["asdf", ["asdf", "asdf"], 1, None],
- ["asdf", "asdf", "asdf", 1, None], str),
- ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)),
- ([o, 1, "fds"], list(range(10)) + [1, "fds"], str),
- ("fds", ["fds"], str),
- (1, [1], int),
- ):
+ (
+ ["asdf", ["asdf", "asdf"], 1, None],
+ ["asdf", "asdf", "asdf", 1, None],
+ str,
+ ),
+ ([o, 1, "fds"], [o, 1, "fds"], (str, OrderedDict)),
+ ([o, 1, "fds"], list(range(10)) + [1, "fds"], str),
+ ("fds", ["fds"], str),
+ (1, [1], int),
+ ):
iterator = self.func(l, lambda x: isinstance(x, skip))
assert list(iterator) == correct
assert list(iterator) == []
@@ -164,6 +169,7 @@ class Test_iflatten_func:
# have to iterate.
def fail():
return list(self.func(None, lambda x: False))
+
with pytest.raises(TypeError):
fail()
@@ -189,25 +195,24 @@ class Test_predicate_split:
assert true_l == list(range(0, 100, 2))
def test_key(self):
- false_l, true_l = self.kls(lambda x: x % 2 == 0,
- ([0, x] for x in range(100)),
- key=itemgetter(1))
+ false_l, true_l = self.kls(
+ lambda x: x % 2 == 0, ([0, x] for x in range(100)), key=itemgetter(1)
+ )
assert false_l == [[0, x] for x in range(1, 100, 2)]
assert true_l == [[0, x] for x in range(0, 100, 2)]
class TestSplitNegations:
-
def test_empty(self):
# empty input
- seq = ''
+ seq = ""
assert split_negations(seq) == ((), ())
def test_bad_value(self):
# no-value negation should raise a ValueError
bad_values = (
- '-',
- 'a b c - d f e',
+ "-",
+ "a b c - d f e",
)
for s in bad_values:
@@ -216,7 +221,7 @@ class TestSplitNegations:
def test_negs(self):
# all negs
- seq = ('-' + str(x) for x in range(100))
+ seq = ("-" + str(x) for x in range(100))
assert split_negations(seq) == (tuple(map(str, range(100))), ())
def test_pos(self):
@@ -226,31 +231,33 @@ class TestSplitNegations:
def test_neg_pos(self):
# both
- seq = (('-' + str(x), str(x)) for x in range(100))
+ seq = (("-" + str(x), str(x)) for x in range(100))
seq = chain.from_iterable(seq)
- assert split_negations(seq) == (tuple(map(str, range(100))), tuple(map(str, range(100))))
+ assert split_negations(seq) == (
+ tuple(map(str, range(100))),
+ tuple(map(str, range(100))),
+ )
def test_converter(self):
# converter method
- seq = (('-' + str(x), str(x)) for x in range(100))
+ seq = (("-" + str(x), str(x)) for x in range(100))
seq = chain.from_iterable(seq)
assert split_negations(seq, int) == (tuple(range(100)), tuple(range(100)))
class TestSplitElements:
-
def test_empty(self):
# empty input
- seq = ''
+ seq = ""
assert split_elements(seq) == ((), (), ())
def test_bad_value(self):
# no-value neg/pos should raise ValueErrors
bad_values = (
- '-',
- '+',
- 'a b c - d f e',
- 'a b c + d f e',
+ "-",
+ "+",
+ "a b c - d f e",
+ "a b c + d f e",
)
for s in bad_values:
@@ -259,7 +266,7 @@ class TestSplitElements:
def test_negs(self):
# all negs
- seq = ('-' + str(x) for x in range(100))
+ seq = ("-" + str(x) for x in range(100))
assert split_elements(seq) == (tuple(map(str, range(100))), (), ())
def test_neutral(self):
@@ -269,12 +276,12 @@ class TestSplitElements:
def test_pos(self):
# all pos
- seq = ('+' + str(x) for x in range(100))
+ seq = ("+" + str(x) for x in range(100))
assert split_elements(seq) == ((), (), tuple(map(str, range(100))))
def test_neg_pos(self):
# both negative and positive values
- seq = (('-' + str(x), '+' + str(x)) for x in range(100))
+ seq = (("-" + str(x), "+" + str(x)) for x in range(100))
seq = chain.from_iterable(seq)
assert split_elements(seq) == (
tuple(map(str, range(100))),
@@ -284,7 +291,7 @@ class TestSplitElements:
def test_neg_neu_pos(self):
# all three value types
- seq = (('-' + str(x), str(x), '+' + str(x)) for x in range(100))
+ seq = (("-" + str(x), str(x), "+" + str(x)) for x in range(100))
seq = chain.from_iterable(seq)
assert split_elements(seq) == (
tuple(map(str, range(100))),
@@ -294,7 +301,10 @@ class TestSplitElements:
def test_converter(self):
# converter method
- seq = (('-' + str(x), str(x), '+' + str(x)) for x in range(100))
+ seq = (("-" + str(x), str(x), "+" + str(x)) for x in range(100))
seq = chain.from_iterable(seq)
assert split_elements(seq, int) == (
- tuple(range(100)), tuple(range(100)), tuple(range(100)))
+ tuple(range(100)),
+ tuple(range(100)),
+ tuple(range(100)),
+ )
diff --git a/tests/test_stringio.py b/tests/test_stringio.py
index 4fb7c78b..1e6d1e5d 100644
--- a/tests/test_stringio.py
+++ b/tests/test_stringio.py
@@ -34,6 +34,7 @@ class readonly_mixin:
class Test_text_readonly(readonly_mixin):
kls = stringio.text_readonly
-class Test_bytes_readonly(readonly_mixin ):
+
+class Test_bytes_readonly(readonly_mixin):
kls = stringio.bytes_readonly
- encoding = 'utf8'
+ encoding = "utf8"
diff --git a/tests/test_strings.py b/tests/test_strings.py
index b55c2306..19305708 100644
--- a/tests/test_strings.py
+++ b/tests/test_strings.py
@@ -3,38 +3,36 @@ from snakeoil.strings import doc_dedent, pluralism
class TestPluralism:
-
def test_none(self):
# default
- assert pluralism([]) == 's'
+ assert pluralism([]) == "s"
# different suffix for nonexistence
- assert pluralism([], none='') == ''
+ assert pluralism([], none="") == ""
def test_singular(self):
# default
- assert pluralism([1]) == ''
+ assert pluralism([1]) == ""
# different suffix for singular existence
- assert pluralism([1], singular='o') == 'o'
+ assert pluralism([1], singular="o") == "o"
def test_plural(self):
# default
- assert pluralism([1, 2]) == 's'
+ assert pluralism([1, 2]) == "s"
# different suffix for plural existence
- assert pluralism([1, 2], plural='ies') == 'ies'
+ assert pluralism([1, 2], plural="ies") == "ies"
def test_int(self):
- assert pluralism(0) == 's'
- assert pluralism(1) == ''
- assert pluralism(2) == 's'
+ assert pluralism(0) == "s"
+ assert pluralism(1) == ""
+ assert pluralism(2) == "s"
class TestDocDedent:
-
def test_empty(self):
- s = ''
+ s = ""
assert s == doc_dedent(s)
def test_non_string(self):
@@ -42,20 +40,20 @@ class TestDocDedent:
doc_dedent(None)
def test_line(self):
- s = 'line'
+ s = "line"
assert s == doc_dedent(s)
def test_indented_line(self):
- for indent in ('\t', ' '):
- s = f'{indent}line'
- assert 'line' == doc_dedent(s)
+ for indent in ("\t", " "):
+ s = f"{indent}line"
+ assert "line" == doc_dedent(s)
def test_docstring(self):
s = """Docstring to test.
foo bar
"""
- assert 'Docstring to test.\n\nfoo bar\n' == doc_dedent(s)
+ assert "Docstring to test.\n\nfoo bar\n" == doc_dedent(s)
def test_all_indented(self):
s = """\
@@ -63,4 +61,4 @@ class TestDocDedent:
foo bar
"""
- assert 'Docstring to test.\n\nfoo bar\n' == doc_dedent(s)
+ assert "Docstring to test.\n\nfoo bar\n" == doc_dedent(s)
diff --git a/tests/test_version.py b/tests/test_version.py
index 7dad73e4..09927542 100644
--- a/tests/test_version.py
+++ b/tests/test_version.py
@@ -7,7 +7,6 @@ from snakeoil import __version__, version
class TestVersion:
-
def setup_method(self, method):
# reset the cached version in the module
reload(version)
@@ -17,124 +16,142 @@ class TestVersion:
def test_get_version_unknown(self):
with pytest.raises(ValueError):
- version.get_version('snakeoilfoo', __file__)
+ version.get_version("snakeoilfoo", __file__)
def test_get_version_api(self):
- v = version.get_version('snakeoil', __file__, '9.9.9')
- assert v.startswith('snakeoil 9.9.9')
+ v = version.get_version("snakeoil", __file__, "9.9.9")
+ assert v.startswith("snakeoil 9.9.9")
def test_get_version_git_dev(self):
- with mock.patch('snakeoil.version.import_module') as import_module, \
- mock.patch('snakeoil.version.get_git_version') as get_git_version:
+ with mock.patch("snakeoil.version.import_module") as import_module, mock.patch(
+ "snakeoil.version.get_git_version"
+ ) as get_git_version:
import_module.side_effect = ImportError
verinfo = {
- 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64',
- 'date': 'Mon Sep 25 13:50:24 2017 -0400',
- 'tag': None
+ "rev": "1ff76b021d208f7df38ac524537b6419404f1c64",
+ "date": "Mon Sep 25 13:50:24 2017 -0400",
+ "tag": None,
}
get_git_version.return_value = verinfo
- result = version.get_version('snakeoil', __file__, __version__)
- assert result == f"snakeoil {__version__}-g{verinfo['rev'][:7]} -- {verinfo['date']}"
+ result = version.get_version("snakeoil", __file__, __version__)
+ assert (
+ result
+ == f"snakeoil {__version__}-g{verinfo['rev'][:7]} -- {verinfo['date']}"
+ )
def test_get_version_git_release(self):
verinfo = {
- 'rev': 'ab38751890efa8be96b7f95938d6b868b769bab6',
- 'date': 'Thu Sep 21 15:57:38 2017 -0400',
- 'tag': '2.3.4',
+ "rev": "ab38751890efa8be96b7f95938d6b868b769bab6",
+ "date": "Thu Sep 21 15:57:38 2017 -0400",
+ "tag": "2.3.4",
}
# fake snakeoil._verinfo module object
class Verinfo:
version_info = verinfo
- with mock.patch('snakeoil.version.import_module') as import_module:
+ with mock.patch("snakeoil.version.import_module") as import_module:
import_module.return_value = Verinfo()
- result = version.get_version('snakeoil', __file__, verinfo['tag'])
+ result = version.get_version("snakeoil", __file__, verinfo["tag"])
assert result == f"snakeoil {verinfo['tag']} -- released {verinfo['date']}"
def test_get_version_no_git_version(self):
- with mock.patch('snakeoil.version.import_module') as import_module, \
- mock.patch('snakeoil.version.get_git_version') as get_git_version:
+ with mock.patch("snakeoil.version.import_module") as import_module, mock.patch(
+ "snakeoil.version.get_git_version"
+ ) as get_git_version:
import_module.side_effect = ImportError
get_git_version.return_value = None
- result = version.get_version('snakeoil', 'nonexistent', __version__)
- assert result == f'snakeoil {__version__}'
+ result = version.get_version("snakeoil", "nonexistent", __version__)
+ assert result == f"snakeoil {__version__}"
def test_get_version_caching(self):
# retrieved version info is cached in a module attr
- v = version.get_version('snakeoil', __file__)
- assert v.startswith(f'snakeoil {__version__}')
+ v = version.get_version("snakeoil", __file__)
+ assert v.startswith(f"snakeoil {__version__}")
# re-running get_version returns the cached attr instead of reprocessing
- with mock.patch('snakeoil.version.import_module') as import_module:
- v = version.get_version('snakeoil', __file__)
+ with mock.patch("snakeoil.version.import_module") as import_module:
+ v = version.get_version("snakeoil", __file__)
assert not import_module.called
class TestGitVersion:
-
def test_get_git_version_not_available(self):
- with mock.patch('snakeoil.version._run_git') as run_git:
- run_git.side_effect = EnvironmentError(errno.ENOENT, 'git not found')
- assert version.get_git_version('nonexistent') is None
+ with mock.patch("snakeoil.version._run_git") as run_git:
+ run_git.side_effect = EnvironmentError(errno.ENOENT, "git not found")
+ assert version.get_git_version("nonexistent") is None
def test_get_git_version_error(self):
- with mock.patch('snakeoil.version._run_git') as run_git:
- run_git.return_value = (b'foo', 1)
- assert version.get_git_version('nonexistent') is None
+ with mock.patch("snakeoil.version._run_git") as run_git:
+ run_git.return_value = (b"foo", 1)
+ assert version.get_git_version("nonexistent") is None
def test_get_git_version_non_repo(self, tmpdir):
assert version.get_git_version(str(tmpdir)) is None
def test_get_git_version_exc(self):
with pytest.raises(OSError):
- with mock.patch('snakeoil.version._run_git') as run_git:
- run_git.side_effect = OSError(errno.EIO, 'Input/output error')
- version.get_git_version('nonexistent')
+ with mock.patch("snakeoil.version._run_git") as run_git:
+ run_git.side_effect = OSError(errno.EIO, "Input/output error")
+ version.get_git_version("nonexistent")
def test_get_git_version_good_dev(self):
- with mock.patch('snakeoil.version._run_git') as run_git:
+ with mock.patch("snakeoil.version._run_git") as run_git:
# dev version
run_git.return_value = (
- b'1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400', 0)
- result = version.get_git_version('nonexistent')
+ b"1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400",
+ 0,
+ )
+ result = version.get_git_version("nonexistent")
expected = {
- 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64',
- 'date': 'Mon Sep 25 13:50:24 2017 -0400',
- 'tag': None,
- 'commits': 2,
+ "rev": "1ff76b021d208f7df38ac524537b6419404f1c64",
+ "date": "Mon Sep 25 13:50:24 2017 -0400",
+ "tag": None,
+ "commits": 2,
}
assert result == expected
def test_get_git_version_good_tag(self):
- with mock.patch('snakeoil.version._run_git') as run_git, \
- mock.patch('snakeoil.version._get_git_tag') as get_git_tag:
+ with mock.patch("snakeoil.version._run_git") as run_git, mock.patch(
+ "snakeoil.version._get_git_tag"
+ ) as get_git_tag:
# tagged, release version
run_git.return_value = (
- b'1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400', 0)
- get_git_tag.return_value = '1.1.1'
- result = version.get_git_version('nonexistent')
+ b"1ff76b021d208f7df38ac524537b6419404f1c64\nMon Sep 25 13:50:24 2017 -0400",
+ 0,
+ )
+ get_git_tag.return_value = "1.1.1"
+ result = version.get_git_version("nonexistent")
expected = {
- 'rev': '1ff76b021d208f7df38ac524537b6419404f1c64',
- 'date': 'Mon Sep 25 13:50:24 2017 -0400',
- 'tag': '1.1.1',
- 'commits': 2,
+ "rev": "1ff76b021d208f7df38ac524537b6419404f1c64",
+ "date": "Mon Sep 25 13:50:24 2017 -0400",
+ "tag": "1.1.1",
+ "commits": 2,
}
assert result == expected
def test_get_git_tag_bad_output(self):
- with mock.patch('snakeoil.version._run_git') as run_git:
+ with mock.patch("snakeoil.version._run_git") as run_git:
# unknown git tag rev output
- run_git.return_value = (b'a', 1)
- assert version._get_git_tag('foo', 'bar') is None
- run_git.return_value = (b'a foo/v0.7.2', 0)
- assert version._get_git_tag('foo', 'bar') is None
+ run_git.return_value = (b"a", 1)
+ assert version._get_git_tag("foo", "bar") is None
+ run_git.return_value = (b"a foo/v0.7.2", 0)
+ assert version._get_git_tag("foo", "bar") is None
# expected output formats
- run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1^0', 0)
- assert version._get_git_tag('foo', 'bar') == '1.1.1'
- run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1', 0)
- assert version._get_git_tag('foo', 'bar') == '1.1.1'
- run_git.return_value = (b'ab38751890efa8be96b7f95938d6b868b769bab6 tags/1.1.1', 0)
- assert version._get_git_tag('foo', 'bar') == '1.1.1'
+ run_git.return_value = (
+ b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1^0",
+ 0,
+ )
+ assert version._get_git_tag("foo", "bar") == "1.1.1"
+ run_git.return_value = (
+ b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/v1.1.1",
+ 0,
+ )
+ assert version._get_git_tag("foo", "bar") == "1.1.1"
+ run_git.return_value = (
+ b"ab38751890efa8be96b7f95938d6b868b769bab6 tags/1.1.1",
+ 0,
+ )
+ assert version._get_git_tag("foo", "bar") == "1.1.1"