diff options
author | Tim Harder <radhermit@gmail.com> | 2021-03-05 02:37:32 -0700 |
---|---|---|
committer | Tim Harder <radhermit@gmail.com> | 2021-03-05 02:37:32 -0700 |
commit | 44a8d7f8404ba67c91f742678e894ecbda4d5d2a (patch) | |
tree | 47fbdbdc455cc403e9ec945dd85296478a494781 /tests | |
parent | tests: add initial Mangler tests (diff) | |
download | pkgdev-44a8d7f8404ba67c91f742678e894ecbda4d5d2a.tar.gz pkgdev-44a8d7f8404ba67c91f742678e894ecbda4d5d2a.tar.bz2 pkgdev-44a8d7f8404ba67c91f742678e894ecbda4d5d2a.zip |
tests: add SIGINT test for Mangler
Based off the same one used with pkgcheck's parallel pipeline iterator
that uses the same design.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_mangle.py | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/tests/test_mangle.py b/tests/test_mangle.py index 39dde3f..975d888 100644 --- a/tests/test_mangle.py +++ b/tests/test_mangle.py @@ -1,3 +1,6 @@ +import os +import multiprocessing +import signal from unittest.mock import patch from pkgdev.mangle import Mangler @@ -44,3 +47,43 @@ class TestMangler: with patch('pkgdev.mangle.Mangler._mangle_eof', _mangle_func): with pytest.raises(UserException, match='Exception: func failed'): list(Mangler(repo, [path])) + + def test_sigint_handling(self, repo): + """Verify SIGINT is properly handled by the parallelized pipeline.""" + path = pjoin(repo.location, 'file') + with open(path, 'w') as f: + f.write('# comment\n') + + def run(queue): + """Mangler run in a separate process that gets interrupted.""" + import sys + import time + from functools import partial + from unittest.mock import patch + + from pkgdev.mangle import Mangler + + def sleep(): + """Notify testing process then sleep.""" + queue.put('ready') + time.sleep(100) + + with patch('pkgdev.mangle.Mangler.__iter__') as fake_iter: + fake_iter.side_effect = partial(sleep) + try: + iter(Mangler(repo, [path])) + except KeyboardInterrupt: + queue.put(None) + sys.exit(0) + queue.put(None) + sys.exit(1) + + mp_ctx = multiprocessing.get_context('fork') + queue = mp_ctx.SimpleQueue() + p = mp_ctx.Process(target=run, args=(queue,)) + p.start() + # wait for pipeline object to be fully initialized then send SIGINT + for _ in iter(queue.get, None): + os.kill(p.pid, signal.SIGINT) + p.join() + assert p.exitcode == 0 |