aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2024-08-11 00:50:49 -0700
committerZac Medico <zmedico@gentoo.org>2024-08-11 00:50:49 -0700
commitcb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a (patch)
tree57cc0afedde61e9855d8b683b1bd7abfd52062eb
parentrun_exitfuncs: Support loop close via hook (diff)
downloadportage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.tar.gz
portage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.tar.bz2
portage-cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a.zip
Support coroutine exitfuncs for non-main loops
Since an API consumer can cause loops to be instantiated for non-main threads, support coroutine exitfuncs for each loop. The included Socks5ServerAtExitThreadedTestCase calls get_socks5_proxy from a non-main thread, and demonstrates that coroutine exitfuncs for the resulting non-main loop will reliably stop the socks5 proxy via atexit hook. The _thread_weakrefs_atexit function will now make a temporary adjustment to _thread_weakrefs.loops so that a loop is associated with the current thread when it is closing. Also, the _get_running_loop function will now store weak references to all _AsyncioEventLoop instances it creates, since each has a _coroutine_exithandlers attribute that can be modified by atexit_register calls. Bug: https://bugs.gentoo.org/937740 Signed-off-by: Zac Medico <zmedico@gentoo.org>
-rw-r--r--lib/portage/process.py11
-rw-r--r--lib/portage/tests/util/test_socks5.py38
-rw-r--r--lib/portage/util/_eventloop/asyncio_event_loop.py15
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py41
4 files changed, 76 insertions, 29 deletions
diff --git a/lib/portage/process.py b/lib/portage/process.py
index 23e2507b5..38adebda6 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -194,7 +194,6 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, opt_name=None, **keywords):
_exithandlers = []
-_coroutine_exithandlers = []
def atexit_register(func, *args, **kargs):
@@ -205,7 +204,9 @@ def atexit_register(func, *args, **kargs):
# The internal asyncio wrapper module would trigger a circular import
# if used here.
if _asyncio.iscoroutinefunction(func):
- _coroutine_exithandlers.append((func, args, kargs))
+ # Add this coroutine function to the exit handlers for the loop
+ # which is associated with the current thread.
+ global_event_loop()._coroutine_exithandlers.append((func, args, kargs))
else:
_exithandlers.append((func, args, kargs))
@@ -238,13 +239,17 @@ async def run_coroutine_exitfuncs():
"""
This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction
to check which functions to run. It is called by the AsyncioEventLoop
- _close_main method just before the loop is closed.
+ _close method just before the loop is closed.
If the loop is explicitly closed before exit, then that will cause
run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a
run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be
called via run_exitfuncs.
"""
+ # The _thread_weakrefs_atexit function makes an adjustment to ensure
+ # that global_event_loop() returns the correct loop when it is closing,
+ # regardless of which thread the loop was initially associated with.
+ _coroutine_exithandlers = global_event_loop()._coroutine_exithandlers
tasks = []
while _coroutine_exithandlers:
func, targs, kargs = _coroutine_exithandlers.pop()
diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
index 35f919d97..078e3b1a2 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -194,17 +194,17 @@ class Socks5ServerTestCase(TestCase):
asyncio.run(self._test_socks5_proxy())
async def _test_socks5_proxy(self):
- loop = asyncio.get_running_loop()
+ loop = global_event_loop()
host = "127.0.0.1"
content = b"Hello World!"
path = "/index.html"
proxy = None
tempdir = tempfile.mkdtemp()
- previous_exithandlers = portage.process._coroutine_exithandlers
+ previous_exithandlers = loop._coroutine_exithandlers
try:
- portage.process._coroutine_exithandlers = []
+ loop._coroutine_exithandlers = []
with AsyncHTTPServer(host, {path: content}, loop) as server:
settings = {
"PORTAGE_TMPDIR": tempdir,
@@ -227,11 +227,11 @@ class Socks5ServerTestCase(TestCase):
finally:
try:
# Also run_coroutine_exitfuncs to test atexit hook cleanup.
- self.assertNotEqual(portage.process._coroutine_exithandlers, [])
+ self.assertNotEqual(loop._coroutine_exithandlers, [])
await portage.process.run_coroutine_exitfuncs()
- self.assertEqual(portage.process._coroutine_exithandlers, [])
+ self.assertEqual(loop._coroutine_exithandlers, [])
finally:
- portage.process._coroutine_exithandlers = previous_exithandlers
+ loop._coroutine_exithandlers = previous_exithandlers
shutil.rmtree(tempdir)
@@ -284,6 +284,8 @@ class Socks5ServerAtExitTestCase(TestCase):
so this test uses python -c to ensure that atexit hooks will work.
"""
+ _threaded = False
+
def testSocks5ServerAtExit(self):
tempdir = tempfile.mkdtemp()
try:
@@ -295,24 +297,36 @@ class Socks5ServerAtExitTestCase(TestCase):
"-c",
"""
import sys
+import threading
from portage.const import PORTAGE_BIN_PATH
from portage.util import socks5
from portage.util._eventloop.global_event_loop import global_event_loop
tempdir = sys.argv[0]
-loop = global_event_loop()
+threaded = bool(sys.argv[1])
settings = {
"PORTAGE_TMPDIR": tempdir,
"PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
}
-socks5.get_socks5_proxy(settings)
-loop.run_until_complete(socks5.proxy.ready())
-print(socks5.proxy._proc.pid, flush=True)
+def main():
+ loop = global_event_loop()
+ socks5.get_socks5_proxy(settings)
+ loop.run_until_complete(socks5.proxy.ready())
+ print(socks5.proxy._proc.pid, flush=True)
+
+if __name__ == "__main__":
+ if threaded:
+ t = threading.Thread(target=main)
+ t.start()
+ t.join()
+ else:
+ main()
""",
tempdir,
+ str(self._threaded),
],
env=env,
)
@@ -323,3 +337,7 @@ print(socks5.proxy._proc.pid, flush=True)
os.kill(pid, 0)
finally:
shutil.rmtree(tempdir)
+
+
+class Socks5ServerAtExitThreadedTestCase(Socks5ServerAtExitTestCase):
+ _threaded = True
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index 821cc7f10..c69e5c2f0 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -3,7 +3,6 @@
import os
import signal
-import threading
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
@@ -54,8 +53,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
self._child_watcher = None
# Used to drop recursive calls to _close.
self._closing = False
- # Initialized in _run_until_complete.
- self._is_main = None
+ self._coroutine_exithandlers = []
if portage._internal_caller:
loop.set_exception_handler(self._internal_caller_exception_handler)
@@ -68,15 +66,11 @@ class AsyncioEventLoop(_AbstractEventLoop):
"""
if not (self._closing or self.is_closed()):
self._closing = True
- if self._is_main:
- self.run_until_complete(self._close_main())
+ if self._coroutine_exithandlers:
+ self.run_until_complete(portage.process.run_coroutine_exitfuncs())
self._loop.close()
self._closing = False
- async def _close_main(self):
- await portage.process.run_coroutine_exitfuncs()
- portage.process.run_exitfuncs()
-
@staticmethod
def _internal_caller_exception_handler(loop, context):
"""
@@ -157,9 +151,6 @@ class AsyncioEventLoop(_AbstractEventLoop):
In order to avoid potential interference with API consumers, this
implementation is only used when portage._internal_caller is True.
"""
- if self._is_main is None:
- self._is_main = threading.current_thread() is threading.main_thread()
-
if not portage._internal_caller:
return self._loop.run_until_complete(future)
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index 8942bcb67..8805e3575 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -339,15 +339,30 @@ def _get_running_loop():
elif _loop is None:
return loop if loop.is_running() else None
- # If _loop it not None here it means it was probably a temporary
- # loop created by asyncio.run, so we don't try to cache it, and
- # just return a temporary wrapper.
- return None if _loop is None else _AsyncioEventLoop(loop=_loop)
+ if _loop is None:
+ return None
+
+ # If _loop it not None here it means it was probably a temporary
+ # loop created by asyncio.run. Still keep a weak reference in case
+ # we need to lookup this _AsyncioEventLoop instance later to add
+ # _coroutine_exithandlers in the atexit_register function.
+ if _thread_weakrefs.pid != portage.getpid():
+ _thread_weakrefs.pid = portage.getpid()
+ _thread_weakrefs.mainloop = None
+ _thread_weakrefs.loops = weakref.WeakValueDictionary()
+
+ loop = _thread_weakrefs.loops[threading.get_ident()] = _AsyncioEventLoop(
+ loop=_loop
+ )
+
+ return loop
def _thread_weakrefs_atexit():
while True:
loop = None
+ thread_key = None
+ restore_loop = None
with _thread_weakrefs.lock:
if _thread_weakrefs.pid != portage.getpid():
return
@@ -356,11 +371,29 @@ def _thread_weakrefs_atexit():
thread_key, loop = _thread_weakrefs.loops.popitem()
except KeyError:
return
+ else:
+ # Temporarily associate it as the loop for the current thread so
+ # that it can be looked up during run_coroutine_exitfuncs calls.
+ # Also create a reference to a different loop if one is associated
+ # with this thread so we can restore it later.
+ try:
+ restore_loop = _thread_weakrefs.loops[threading.get_ident()]
+ except KeyError:
+ pass
+ _thread_weakrefs.loops[threading.get_ident()] = loop
# Release the lock while closing the loop, since it may call
# run_coroutine_exitfuncs interally.
if loop is not None:
loop.close()
+ with _thread_weakrefs.lock:
+ try:
+ if _thread_weakrefs.loops[threading.get_ident()] is loop:
+ del _thread_weakrefs.loops[threading.get_ident()]
+ except KeyError:
+ pass
+ if restore_loop is not None:
+ _thread_weakrefs.loops[threading.get_ident()] = restore_loop
_thread_weakrefs = types.SimpleNamespace(