aboutsummaryrefslogtreecommitdiff
blob: 3fa446370a4bc4c6e8088016eb3eae62b6270220 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
#!/usr/bin/python
#
# Copyright(c) 2004-2010, Gentoo Foundation
#
# Licensed under the GNU General Public License, v2

"""Provides common methods on a package query."""

__all__ = ("Query",)

# =======
# Imports
# =======

import fnmatch
import re
from functools import partial
from string import ascii_letters, digits

import portage

from gentoolkit import CONFIG
from gentoolkit import errors
from gentoolkit import helpers
from gentoolkit import pprinter as pp
from gentoolkit.atom import Atom
from gentoolkit.cpv import CPV
from gentoolkit.package import Package
from gentoolkit.sets import get_set_atoms, SETPREFIX

# =======
# Classes
# =======


class Query(CPV):
    """Provides common methods on a package query."""

    def __init__(self, query, is_regex=False):
        """Create query object.

        @type is_regex: bool
        @param is_regex: query is a regular expression
        """

        # We need at least one of these chars for a valid query
        needed_chars = ascii_letters + digits + "*"
        if not set(query).intersection(needed_chars):
            raise errors.GentoolkitInvalidPackage(query)

        # Separate repository
        repository = None
        if query.count(":") == 2:
            query, repository = query.rsplit(":", 1)
        self.query = query.rstrip(":")  # Don't leave dangling colon
        self.repo_filter = repository
        self.is_regex = is_regex
        self.query_type = self._get_query_type()

        # Name the rest of the chunks, if possible
        if self.query_type != "set":
            try:
                atom = Atom(self.query)
                self.__dict__.update(atom.__dict__)
            except errors.GentoolkitInvalidAtom:
                CPV.__init__(self, self.query)
                self.operator = ""
                self.atom = self.cpv

    def __repr__(self):
        rx = ""
        if self.is_regex:
            rx = " regex"
        repo = ""
        if self.repo_filter:
            repo = " in %s" % self.repo_filter
        return f"<{self.__class__.__name__}{rx} {self.query!r}{repo}>"

    def __str__(self):
        return self.query

    def print_summary(self):
        """Print a summary of the query."""

        if self.query_type == "set":
            cat_str = ""
            pkg_str = pp.emph(self.query)
        else:
            try:
                cat, pkg = self.category, self.name + self.fullversion
            except errors.GentoolkitInvalidCPV:
                cat = ""
                pkg = self.atom
            if cat and not self.is_regex:
                cat_str = "in %s " % pp.emph(cat.lstrip("><=~!"))
            else:
                cat_str = ""

            if self.is_regex:
                pkg_str = pp.emph(self.query)
            else:
                pkg_str = pp.emph(pkg)

        repo = ""
        if self.repo_filter is not None:
            repo = " %s" % pp.section(self.repo_filter)

        pp.uprint(f" * Searching{repo} for {pkg_str} {cat_str}...")

    def smart_find(
        self,
        in_installed=True,
        in_porttree=True,
        in_overlay=True,
        include_masked=True,
        show_progress=True,
        no_matches_fatal=True,
        **kwargs,
    ):
        """A high-level wrapper around gentoolkit package-finder functions.

        @type in_installed: bool
        @param in_installed: search for query in VARDB
        @type in_porttree: bool
        @param in_porttree: search for query in PORTDB
        @type in_overlay: bool
        @param in_overlay: search for query in overlays
        @type show_progress: bool
        @param show_progress: output search progress
        @type no_matches_fatal: bool
        @param no_matches_fatal: raise errors.GentoolkitNoMatches
        @rtype: list
        @return: Package objects matching query
        """

        if in_installed:
            if in_porttree or in_overlay:
                simple_package_finder = partial(
                    self.find, include_masked=include_masked
                )
                complex_package_finder = helpers.get_cpvs
            else:
                simple_package_finder = self.find_installed
                complex_package_finder = helpers.get_installed_cpvs
        elif in_porttree or in_overlay:
            simple_package_finder = partial(
                self.find, include_masked=include_masked, in_installed=False
            )
            complex_package_finder = helpers.get_uninstalled_cpvs
        else:
            raise errors.GentoolkitFatalError(
                "Not searching in installed, Portage tree, or overlay. "
                "Nothing to do."
            )

        if self.query_type == "set":
            self.package_finder = simple_package_finder
            matches = self._do_set_lookup(show_progress=show_progress)
        elif self.query_type == "simple":
            self.package_finder = simple_package_finder
            matches = self._do_simple_lookup(
                in_installed=in_installed, show_progress=show_progress
            )
        else:
            self.package_finder = complex_package_finder
            matches = self._do_complex_lookup(show_progress=show_progress)

        if self.repo_filter is not None:
            matches = self._filter_by_repository(matches)

        if no_matches_fatal and not matches:
            ii = in_installed and not (in_porttree or in_overlay)
            raise errors.GentoolkitNoMatches(self.query, in_installed=ii)
        return matches

    def find(self, in_installed=True, include_masked=True):
        """Returns a list of Package objects that matched the query.

        @rtype: list
        @return: matching Package objects
        """

        if not self.query:
            return []

        try:
            if include_masked:
                matches = portage.db[portage.root]["porttree"].dbapi.xmatch(
                    "match-all", self.query
                )
            else:
                matches = portage.db[portage.root]["porttree"].dbapi.match(self.query)
            if in_installed:
                matches.extend(
                    portage.db[portage.root]["vartree"].dbapi.match(self.query)
                )
        except portage.exception.InvalidAtom as err:
            message = "query.py: find(), query={}, InvalidAtom={}".format(
                self.query,
                str(err),
            )
            raise errors.GentoolkitInvalidAtom(message)

        return [Package(x) for x in set(matches)]

    def find_installed(self):
        """Return a list of Package objects that matched the search key."""

        try:
            matches = portage.db[portage.root]["vartree"].dbapi.match(self.query)
        # catch the ambiguous package Exception
        except portage.exception.AmbiguousPackageName as err:
            matches = []
            for pkgkey in err.args[0]:
                matches.extend(portage.db[portage.root]["vartree"].dbapi.match(pkgkey))
        except portage.exception.InvalidAtom as err:
            raise errors.GentoolkitInvalidAtom(err)

        return [Package(x) for x in set(matches)]

    def find_best(self, include_keyworded=True, include_masked=True):
        """Returns the "best" version available.

        Order of preference:
                highest available stable =>
                highest available keyworded =>
                highest available masked

        @rtype: Package object or None
        @return: best of up to three options
        @raise errors.GentoolkitInvalidAtom: if query is not valid input
        """

        best = keyworded = masked = None
        try:
            best = portage.db[portage.root]["porttree"].dbapi.xmatch(
                "bestmatch-visible", self.query
            )
        except portage.exception.InvalidAtom as err:
            message = (
                "query.py: find_best(), bestmatch-visible, "
                + f"query={self.query}, InvalidAtom={str(err)}"
            )
            raise errors.GentoolkitInvalidAtom(message)
        # xmatch can return an empty string, so checking for None is not enough
        if not best:
            if not (include_keyworded or include_masked):
                return None
            try:
                matches = portage.db[portage.root]["porttree"].dbapi.xmatch(
                    "match-all", self.query
                )
            except portage.exception.InvalidAtom as err:
                message = (
                    "query.py: find_best(), match-all, query=%s, InvalidAtom=%s"
                    % (self.query, str(err))
                )
                raise errors.GentoolkitInvalidAtom(message)
            masked = portage.best(matches)
            keywordable = []
            for m in matches:
                status = portage.getmaskingstatus(m)
                if "package.mask" not in status or "profile" not in status:
                    keywordable.append(m)
                if matches:
                    keyworded = portage.best(keywordable)
        else:
            return Package(best)
        if include_keyworded and keyworded:
            return Package(keyworded)
        if include_masked and masked:
            return Package(masked)
        return None

    def uses_globbing(self):
        """Check the query to see if it is using globbing.

        @rtype: bool
        @return: True if query uses globbing, else False
        """

        if set("!*?[]").intersection(self.query):
            # Is query an atom such as '=sys-apps/portage-2.2*'?
            if self.query[0] != "=":
                return True

        return False

    def is_ranged(self):
        """Return True if the query appears to be ranged, else False."""

        q = self.query
        return q.startswith(("~", "<", ">")) or q.endswith("*")

    def _do_simple_lookup(self, in_installed=True, show_progress=True):
        """Find matches for a query which is an atom or cpv."""

        result = []

        if show_progress and CONFIG["verbose"]:
            self.print_summary()

        result = self.package_finder()
        if not in_installed:
            result = [x for x in result if not x.is_installed()]

        return result

    def _do_complex_lookup(self, show_progress=True):
        """Find matches for a query which is a regex or includes globbing."""

        result = []

        if show_progress and not CONFIG["piping"]:
            self.print_summary()

        try:
            cat = CPV(self.query).category
        except errors.GentoolkitInvalidCPV:
            cat = ""

        pre_filter = []
        # The "get_" functions can pre-filter against the whole package key,
        # but since we allow globbing now, we run into issues like:
        # >>> portage.dep.dep_getkey("sys-apps/portage-*")
        # 'sys-apps/portage-'
        # So the only way to guarantee we don't overrun the key is to
        # prefilter by cat only.
        if cat:
            if self.is_regex:
                cat_re = cat
            else:
                cat_re = fnmatch.translate(cat)
            predicate = lambda x: re.match(cat_re, x.split("/", 1)[0])
            pre_filter = self.package_finder(predicate=predicate)

        # Post-filter
        if self.is_regex:
            try:
                re.compile(self.query)
            except re.error:
                raise errors.GentoolkitInvalidRegex(self.query)
            predicate = lambda x: re.search(self.query, x)
        else:
            if cat:
                query_re = fnmatch.translate(self.query)
            else:
                query_re = fnmatch.translate("*/%s" % self.query)
            predicate = lambda x: re.search(query_re, x)
        if pre_filter:
            result = [x for x in pre_filter if predicate(x)]
        else:
            result = self.package_finder(predicate=predicate)

        return [Package(x) for x in result]

    def _do_set_lookup(self, show_progress=True):
        """Find matches for a query that is a package set."""

        if show_progress and not CONFIG["piping"]:
            self.print_summary()

        setname = self.query[len(SETPREFIX) :]
        result = []
        try:
            atoms = get_set_atoms(setname)
        except errors.GentoolkitSetNotFound:
            return result

        q = self.query
        for atom in atoms:
            self.query = str(atom)
            result.extend(self._do_simple_lookup(show_progress=False))
        self.query = q

        return result

    def _filter_by_repository(self, matches):
        """Filter out packages which do not belong to self.repo_filter."""

        result = []
        for match in matches:
            repo_name = match.repo_name()
            if repo_name == self.repo_filter:
                result.append(match)
            elif not repo_name and self.repo_filter in ("unknown", "null"):
                result.append(match)

        return result

    def _get_query_type(self):
        """Determine of what type the query is."""

        if self.query.startswith(SETPREFIX):
            return "set"
        elif self.is_regex or self.uses_globbing():
            return "complex"
        return "simple"