1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
|
# Copyright 2020-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import functools
import shlex
import subprocess
from _emerge.AsynchronousTask import AsynchronousTask
import portage
from portage import os
from portage.proxy.objectproxy import ObjectProxy
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util.futures import asyncio
class _file_close_wrapper(ObjectProxy):
"""
Prevent fd inheritance via fork, ensuring that we can observe
EOF on the read end of the pipe (bug 919072).
"""
__slots__ = ("_file",)
def __init__(self, file):
ObjectProxy.__init__(self)
object.__setattr__(self, "_file", file)
portage.locks._open_fds[file.fileno()] = self
def _get_target(self):
return object.__getattribute__(self, "_file")
def __getattribute__(self, attr):
if attr == "close":
return object.__getattribute__(self, attr)
return getattr(object.__getattribute__(self, "_file"), attr)
def close(self):
file = object.__getattribute__(self, "_file")
if not file.closed:
# This must only be called if the file is open,
# which ensures that file.fileno() does not
# collide with an open lock file descriptor.
del portage.locks._open_fds[file.fileno()]
file.close()
class BuildLogger(AsynchronousTask):
"""
Write to a log file, with compression support provided by PipeLogger.
If the log_filter_file parameter is specified, then it is interpreted
as a command to execute which filters log output (see the
PORTAGE_LOG_FILTER_FILE_CMD variable in make.conf(5)). The stdin property
provides access to a writable binary file stream (refers to a pipe)
that log content should be written to (usually redirected from
subprocess stdout and stderr streams).
"""
__slots__ = (
"env",
"log_path",
"log_filter_file",
"_main_task",
"_main_task_cancel",
"_stdin",
)
@property
def stdin(self):
return self._stdin
def _start(self):
filter_proc = None
log_input = None
if self.log_path is not None:
log_filter_file = self.log_filter_file
if log_filter_file is not None:
split_value = shlex.split(log_filter_file)
log_filter_file = split_value if split_value else None
if log_filter_file:
filter_input, stdin = os.pipe()
log_input, filter_output = os.pipe()
try:
filter_proc = PopenProcess(
proc=subprocess.Popen(
log_filter_file,
env=self.env,
stdin=filter_input,
stdout=filter_output,
stderr=filter_output,
),
scheduler=self.scheduler,
)
filter_proc.start()
except OSError:
# Maybe the command is missing or broken somehow...
os.close(filter_input)
os.close(stdin)
os.close(log_input)
os.close(filter_output)
else:
self._stdin = _file_close_wrapper(os.fdopen(stdin, "wb", 0))
os.close(filter_input)
os.close(filter_output)
if self._stdin is None:
# Since log_filter_file is unspecified or refers to a file
# that is missing or broken somehow, create a pipe that
# logs directly to pipe_logger.
log_input, stdin = os.pipe()
self._stdin = _file_close_wrapper(os.fdopen(stdin, "wb", 0))
# Set background=True so that pipe_logger does not log to stdout.
pipe_logger = PipeLogger(
background=True,
scheduler=self.scheduler,
input_fd=log_input,
log_file_path=self.log_path,
)
pipe_logger.start()
self._main_task_cancel = functools.partial(
self._main_cancel, filter_proc, pipe_logger
)
self._main_task = asyncio.ensure_future(
self._main(filter_proc, pipe_logger), loop=self.scheduler
)
self._main_task.add_done_callback(self._main_exit)
async def _main(self, filter_proc, pipe_logger):
try:
if pipe_logger.poll() is None:
await pipe_logger.async_wait()
if filter_proc is not None and filter_proc.poll() is None:
await filter_proc.async_wait()
except asyncio.CancelledError:
self._main_cancel(filter_proc, pipe_logger)
raise
def _main_cancel(self, filter_proc, pipe_logger):
if pipe_logger.poll() is None:
pipe_logger.cancel()
if filter_proc is not None and filter_proc.poll() is None:
filter_proc.cancel()
def _cancel(self):
if self._main_task is not None:
if not self._main_task.done():
if self._main_task_cancel is not None:
self._main_task_cancel()
self._main_task_cancel = None
self._main_task.cancel()
if self._stdin is not None and not self._stdin.closed:
self._stdin.close()
def _main_exit(self, main_task):
self._main_task = None
self._main_task_cancel = None
try:
main_task.result()
except asyncio.CancelledError:
self.cancel()
self._was_cancelled()
self.returncode = self.returncode or 0
self._async_wait()
|