diff options
author | Zac Medico <zmedico@gentoo.org> | 2024-08-11 00:50:49 -0700 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2024-08-11 00:50:49 -0700 |
commit | cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a (patch) | |
tree | 57cc0afedde61e9855d8b683b1bd7abfd52062eb | |
parent | run_exitfuncs: Support loop close via hook (diff) | |
download | portage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.tar.gz portage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.tar.bz2 portage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.zip |
Support coroutine exitfuncs for non-main loops
Since an API consumer can cause loops to be instantiated
for non-main threads, support coroutine exitfuncs for each
loop. The included Socks5ServerAtExitThreadedTestCase calls
get_socks5_proxy from a non-main thread, and demonstrates
that coroutine exitfuncs for the resulting non-main loop
will reliably stop the socks5 proxy via atexit hook.
The _thread_weakrefs_atexit function will now make a
temporary adjustment to _thread_weakrefs.loops so that a
loop is associated with the current thread when it is
closing. Also, the _get_running_loop function will now
store weak references to all _AsyncioEventLoop instances
it creates, since each has a _coroutine_exithandlers
attribute that can be modified by atexit_register calls.
Bug: https://bugs.gentoo.org/937740
Signed-off-by: Zac Medico <zmedico@gentoo.org>
-rw-r--r-- | lib/portage/process.py | 11 | ||||
-rw-r--r-- | lib/portage/tests/util/test_socks5.py | 38 | ||||
-rw-r--r-- | lib/portage/util/_eventloop/asyncio_event_loop.py | 15 | ||||
-rw-r--r-- | lib/portage/util/futures/_asyncio/__init__.py | 41 |
4 files changed, 76 insertions, 29 deletions
diff --git a/lib/portage/process.py b/lib/portage/process.py index 23e2507b5..38adebda6 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -194,7 +194,6 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, opt_name=None, **keywords): _exithandlers = [] -_coroutine_exithandlers = [] def atexit_register(func, *args, **kargs): @@ -205,7 +204,9 @@ def atexit_register(func, *args, **kargs): # The internal asyncio wrapper module would trigger a circular import # if used here. if _asyncio.iscoroutinefunction(func): - _coroutine_exithandlers.append((func, args, kargs)) + # Add this coroutine function to the exit handlers for the loop + # which is associated with the current thread. + global_event_loop()._coroutine_exithandlers.append((func, args, kargs)) else: _exithandlers.append((func, args, kargs)) @@ -238,13 +239,17 @@ async def run_coroutine_exitfuncs(): """ This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction to check which functions to run. It is called by the AsyncioEventLoop - _close_main method just before the loop is closed. + _close method just before the loop is closed. If the loop is explicitly closed before exit, then that will cause run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be called via run_exitfuncs. """ + # The _thread_weakrefs_atexit function makes an adjustment to ensure + # that global_event_loop() returns the correct loop when it is closing, + # regardless of which thread the loop was initially associated with. + _coroutine_exithandlers = global_event_loop()._coroutine_exithandlers tasks = [] while _coroutine_exithandlers: func, targs, kargs = _coroutine_exithandlers.pop() diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py index 35f919d97..078e3b1a2 100644 --- a/lib/portage/tests/util/test_socks5.py +++ b/lib/portage/tests/util/test_socks5.py @@ -194,17 +194,17 @@ class Socks5ServerTestCase(TestCase): asyncio.run(self._test_socks5_proxy()) async def _test_socks5_proxy(self): - loop = asyncio.get_running_loop() + loop = global_event_loop() host = "127.0.0.1" content = b"Hello World!" path = "/index.html" proxy = None tempdir = tempfile.mkdtemp() - previous_exithandlers = portage.process._coroutine_exithandlers + previous_exithandlers = loop._coroutine_exithandlers try: - portage.process._coroutine_exithandlers = [] + loop._coroutine_exithandlers = [] with AsyncHTTPServer(host, {path: content}, loop) as server: settings = { "PORTAGE_TMPDIR": tempdir, @@ -227,11 +227,11 @@ class Socks5ServerTestCase(TestCase): finally: try: # Also run_coroutine_exitfuncs to test atexit hook cleanup. - self.assertNotEqual(portage.process._coroutine_exithandlers, []) + self.assertNotEqual(loop._coroutine_exithandlers, []) await portage.process.run_coroutine_exitfuncs() - self.assertEqual(portage.process._coroutine_exithandlers, []) + self.assertEqual(loop._coroutine_exithandlers, []) finally: - portage.process._coroutine_exithandlers = previous_exithandlers + loop._coroutine_exithandlers = previous_exithandlers shutil.rmtree(tempdir) @@ -284,6 +284,8 @@ class Socks5ServerAtExitTestCase(TestCase): so this test uses python -c to ensure that atexit hooks will work. """ + _threaded = False + def testSocks5ServerAtExit(self): tempdir = tempfile.mkdtemp() try: @@ -295,24 +297,36 @@ class Socks5ServerAtExitTestCase(TestCase): "-c", """ import sys +import threading from portage.const import PORTAGE_BIN_PATH from portage.util import socks5 from portage.util._eventloop.global_event_loop import global_event_loop tempdir = sys.argv[0] -loop = global_event_loop() +threaded = bool(sys.argv[1]) settings = { "PORTAGE_TMPDIR": tempdir, "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH, } -socks5.get_socks5_proxy(settings) -loop.run_until_complete(socks5.proxy.ready()) -print(socks5.proxy._proc.pid, flush=True) +def main(): + loop = global_event_loop() + socks5.get_socks5_proxy(settings) + loop.run_until_complete(socks5.proxy.ready()) + print(socks5.proxy._proc.pid, flush=True) + +if __name__ == "__main__": + if threaded: + t = threading.Thread(target=main) + t.start() + t.join() + else: + main() """, tempdir, + str(self._threaded), ], env=env, ) @@ -323,3 +337,7 @@ print(socks5.proxy._proc.pid, flush=True) os.kill(pid, 0) finally: shutil.rmtree(tempdir) + + +class Socks5ServerAtExitThreadedTestCase(Socks5ServerAtExitTestCase): + _threaded = True diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py index 821cc7f10..c69e5c2f0 100644 --- a/lib/portage/util/_eventloop/asyncio_event_loop.py +++ b/lib/portage/util/_eventloop/asyncio_event_loop.py @@ -3,7 +3,6 @@ import os import signal -import threading import asyncio as _real_asyncio from asyncio.events import AbstractEventLoop as _AbstractEventLoop @@ -54,8 +53,7 @@ class AsyncioEventLoop(_AbstractEventLoop): self._child_watcher = None # Used to drop recursive calls to _close. self._closing = False - # Initialized in _run_until_complete. - self._is_main = None + self._coroutine_exithandlers = [] if portage._internal_caller: loop.set_exception_handler(self._internal_caller_exception_handler) @@ -68,15 +66,11 @@ class AsyncioEventLoop(_AbstractEventLoop): """ if not (self._closing or self.is_closed()): self._closing = True - if self._is_main: - self.run_until_complete(self._close_main()) + if self._coroutine_exithandlers: + self.run_until_complete(portage.process.run_coroutine_exitfuncs()) self._loop.close() self._closing = False - async def _close_main(self): - await portage.process.run_coroutine_exitfuncs() - portage.process.run_exitfuncs() - @staticmethod def _internal_caller_exception_handler(loop, context): """ @@ -157,9 +151,6 @@ class AsyncioEventLoop(_AbstractEventLoop): In order to avoid potential interference with API consumers, this implementation is only used when portage._internal_caller is True. """ - if self._is_main is None: - self._is_main = threading.current_thread() is threading.main_thread() - if not portage._internal_caller: return self._loop.run_until_complete(future) diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index 8942bcb67..8805e3575 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -339,15 +339,30 @@ def _get_running_loop(): elif _loop is None: return loop if loop.is_running() else None - # If _loop it not None here it means it was probably a temporary - # loop created by asyncio.run, so we don't try to cache it, and - # just return a temporary wrapper. - return None if _loop is None else _AsyncioEventLoop(loop=_loop) + if _loop is None: + return None + + # If _loop it not None here it means it was probably a temporary + # loop created by asyncio.run. Still keep a weak reference in case + # we need to lookup this _AsyncioEventLoop instance later to add + # _coroutine_exithandlers in the atexit_register function. + if _thread_weakrefs.pid != portage.getpid(): + _thread_weakrefs.pid = portage.getpid() + _thread_weakrefs.mainloop = None + _thread_weakrefs.loops = weakref.WeakValueDictionary() + + loop = _thread_weakrefs.loops[threading.get_ident()] = _AsyncioEventLoop( + loop=_loop + ) + + return loop def _thread_weakrefs_atexit(): while True: loop = None + thread_key = None + restore_loop = None with _thread_weakrefs.lock: if _thread_weakrefs.pid != portage.getpid(): return @@ -356,11 +371,29 @@ def _thread_weakrefs_atexit(): thread_key, loop = _thread_weakrefs.loops.popitem() except KeyError: return + else: + # Temporarily associate it as the loop for the current thread so + # that it can be looked up during run_coroutine_exitfuncs calls. + # Also create a reference to a different loop if one is associated + # with this thread so we can restore it later. + try: + restore_loop = _thread_weakrefs.loops[threading.get_ident()] + except KeyError: + pass + _thread_weakrefs.loops[threading.get_ident()] = loop # Release the lock while closing the loop, since it may call # run_coroutine_exitfuncs interally. if loop is not None: loop.close() + with _thread_weakrefs.lock: + try: + if _thread_weakrefs.loops[threading.get_ident()] is loop: + del _thread_weakrefs.loops[threading.get_ident()] + except KeyError: + pass + if restore_loop is not None: + _thread_weakrefs.loops[threading.get_ident()] = restore_loop _thread_weakrefs = types.SimpleNamespace( |