1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
|
import os
import shlex
import shutil
import subprocess
import tempfile
import textwrap
from collections import defaultdict
from functools import partial
from io import StringIO
from operator import attrgetter
from unittest.mock import patch
import pytest
from pkgcheck import __title__ as project
from pkgcheck import base
from pkgcheck import checks as checks_mod
from pkgcheck import const, objects, reporters, scan
from pkgcheck.scripts import run
from pkgcore import const as pkgcore_const
from pkgcore.ebuild import atom, restricts
from pkgcore.restrictions import packages
from snakeoil.contexts import chdir, os_environ
from snakeoil.fileutils import touch
from snakeoil.formatters import PlainTextFormatter
from snakeoil.osutils import pjoin
from ..misc import Profile
class TestPkgcheckScanParseArgs:
def test_skipped_checks(self, tool):
options, _ = tool.parse_args(['scan'])
assert options.enabled_checks
# some checks should always be skipped by default
assert set(options.enabled_checks) != set(objects.CHECKS.values())
def test_enabled_check(self, tool):
options, _ = tool.parse_args(['scan', '-c', 'PkgDirCheck'])
assert options.enabled_checks == {checks_mod.pkgdir.PkgDirCheck}
def test_disabled_check(self, tool):
options, _ = tool.parse_args(['scan'])
assert checks_mod.pkgdir.PkgDirCheck in options.enabled_checks
options, _ = tool.parse_args(['scan', '-c=-PkgDirCheck'])
assert options.enabled_checks
assert checks_mod.pkgdir.PkgDirCheck not in options.enabled_checks
def test_targets(self, tool):
options, _ = tool.parse_args(['scan', 'dev-util/foo'])
assert list(options.restrictions) == [(base.package_scope, atom.atom('dev-util/foo'))]
def test_stdin_targets(self, tool):
with patch('sys.stdin', StringIO('dev-util/foo')):
options, _ = tool.parse_args(['scan', '-'])
assert list(options.restrictions) == [(base.package_scope, atom.atom('dev-util/foo'))]
def test_invalid_targets(self, tool, capsys):
with pytest.raises(SystemExit) as excinfo:
options, _ = tool.parse_args(['scan', 'dev-util/f$o'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
err = err.strip()
assert err == "pkgcheck scan: error: invalid package atom: 'dev-util/f$o'"
def test_unknown_path_target(self, tool, capsys):
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['scan', '/foo/bar'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
err = err.strip().split('\n')
assert err[-1].startswith(
"pkgcheck scan: error: 'standalone' repo doesn't contain: '/foo/bar'")
def test_target_repo_id(self, tool):
options, _ = tool.parse_args(['scan', 'standalone'])
assert options.target_repo.repo_id == 'standalone'
assert list(options.restrictions) == [(base.repo_scope, packages.AlwaysTrue)]
def test_target_dir_path(self, repo, tool):
options, _ = tool.parse_args(['scan', repo.location])
assert options.target_repo.repo_id == 'fake'
assert list(options.restrictions) == [(base.repo_scope, packages.AlwaysTrue)]
def test_target_dir_path_in_repo(self, repo, tool):
path = pjoin(repo.location, 'profiles')
options, _ = tool.parse_args(['scan', path])
assert options.target_repo.repo_id == 'fake'
assert list(options.restrictions) == [(base.profiles_scope, packages.AlwaysTrue)]
def test_target_dir_path_in_configured_repo(self, tool):
options, _ = tool.parse_args(['scan', 'standalone'])
path = pjoin(options.target_repo.location, 'profiles')
options, _ = tool.parse_args(['scan', path])
assert options.target_repo.repo_id == 'standalone'
assert list(options.restrictions) == [(base.profiles_scope, packages.AlwaysTrue)]
def test_target_non_repo_path(self, tool, capsys, tmp_path):
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['scan', str(tmp_path)])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
assert err.startswith(
f"pkgcheck scan: error: 'standalone' repo doesn't contain: '{str(tmp_path)}'")
def test_target_invalid_repo(self, tool, capsys, make_repo):
repo = make_repo(masters=['unknown'])
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['scan', repo.location])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
err = err.strip()
assert err.startswith('pkgcheck scan: error: repo init failed')
assert err.endswith("has missing masters: 'unknown'")
def test_target_file_path(self, repo, tool):
os.makedirs(pjoin(repo.location, 'dev-util', 'foo'))
ebuild_path = pjoin(repo.location, 'dev-util', 'foo', 'foo-0.ebuild')
touch(ebuild_path)
options, _ = tool.parse_args(['scan', ebuild_path])
restrictions = [
restricts.CategoryDep('dev-util'),
restricts.PackageDep('foo'),
restricts.VersionMatch('=', '0'),
]
assert list(options.restrictions) == [(base.version_scope, packages.AndRestriction(*restrictions))]
assert options.target_repo.repo_id == 'fake'
def test_target_package_dir_cwd(self, repo, tool):
os.makedirs(pjoin(repo.location, 'dev-util', 'foo'))
with chdir(pjoin(repo.location, 'dev-util', 'foo')):
options, _ = tool.parse_args(['scan'])
assert options.target_repo.repo_id == 'fake'
restrictions = [
restricts.CategoryDep('dev-util'),
restricts.PackageDep('foo'),
]
assert list(options.restrictions) == [(base.package_scope, packages.AndRestriction(*restrictions))]
def test_target_repo_dir_cwd(self, repo, tool):
with chdir(repo.location):
options, _ = tool.parse_args(['scan'])
assert options.target_repo.repo_id == 'fake'
assert list(options.restrictions) == [(base.repo_scope, packages.AlwaysTrue)]
def test_unknown_repo(self, tmp_path, capsys, tool):
for opt in ('-r', '--repo'):
with pytest.raises(SystemExit) as excinfo:
with chdir(str(tmp_path)):
options, _ = tool.parse_args(['scan', opt, 'foo'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
assert err.startswith(
"pkgcheck scan: error: argument -r/--repo: couldn't find repo 'foo'"
)
def test_invalid_repo(self, tmp_path, capsys, tool):
(tmp_path / 'foo').touch()
for opt in ('-r', '--repo'):
with pytest.raises(SystemExit) as excinfo:
with chdir(str(tmp_path)):
options, _ = tool.parse_args(['scan', opt, 'foo'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
assert err.startswith(
"pkgcheck scan: error: argument -r/--repo: repo init failed:")
def test_valid_repo(self, tool):
for opt in ('-r', '--repo'):
options, _ = tool.parse_args(['scan', opt, 'standalone'])
assert options.target_repo.repo_id == 'standalone'
assert list(options.restrictions) == [(base.repo_scope, packages.AlwaysTrue)]
def test_unknown_reporter(self, capsys, tool):
for opt in ('-R', '--reporter'):
with pytest.raises(SystemExit) as excinfo:
options, _ = tool.parse_args(['scan', opt, 'foo'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
assert err.startswith("pkgcheck scan: error: no reporter matches 'foo'")
def test_format_reporter(self, capsys, tool):
# missing --format
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['scan', '-R', 'FormatReporter'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
err = err.strip().split('\n')
assert err[-1].endswith(
"missing or empty --format option required by FormatReporter")
# missing -R FormatReporter
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['scan', '--format', 'foo'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
err = err.strip().split('\n')
assert err[-1].endswith(
"--format option is only valid when using FormatReporter")
# properly set
options, _ = tool.parse_args(
['scan', '-R', 'FormatReporter', '--format', 'foo'])
def test_cwd(self, capsys, tool):
# regularly working
options, _ = tool.parse_args(['scan'])
assert options.cwd == os.getcwd()
# pretend the CWD was removed out from under us
with patch('os.getcwd') as getcwd:
getcwd.side_effect = FileNotFoundError('CWD is gone')
options, _ = tool.parse_args(['scan'])
assert options.cwd == const.DATA_PATH
def test_eclass_target(self, fakerepo, tool):
(eclass_dir := fakerepo / 'eclass').mkdir()
(eclass_path := eclass_dir / 'foo.eclass').touch()
options, _ = tool.parse_args(['scan', str(eclass_path)])
assert list(options.restrictions) == [(base.eclass_scope, 'foo')]
def test_profiles_target(self, fakerepo, tool):
profiles_path = str(fakerepo / 'profiles')
options, _ = tool.parse_args(['scan', profiles_path])
assert list(options.restrictions) == [(base.profiles_scope, packages.AlwaysTrue)]
def test_profiles_path_target_file(self, fakerepo, tool):
(pkg_mask_path := fakerepo / 'profiles/package.mask').touch()
options, _ = tool.parse_args(['scan', str(pkg_mask_path)])
assert list(options.restrictions) == [(base.profile_node_scope, str(pkg_mask_path))]
def test_profiles_path_target_dir(self, fakerepo, tool):
(profile_dir := fakerepo / 'profiles/default').mkdir(parents=True)
(pkg_mask_path := profile_dir / 'package.mask').touch()
(pkg_use_path := profile_dir / 'package.use').touch()
options, _ = tool.parse_args(['scan', str(profile_dir)])
assert list(options.restrictions) == [(base.profile_node_scope, {str(pkg_mask_path), str(pkg_use_path)})]
def test_no_default_repo(self, tool, capsys):
stubconfig = pjoin(pkgcore_const.DATA_PATH, 'stubconfig')
with pytest.raises(SystemExit) as excinfo:
tool.parse_args(['--config', stubconfig, 'scan'])
assert excinfo.value.code == 2
out, err = capsys.readouterr()
assert not out
assert err.strip() == "pkgcheck scan: error: no default repo found"
@pytest.mark.parametrize(('makeopts', 'expected_jobs'), (
('', 4),
('-j1', 1),
('--jobs=6 -l 1', 6),
('--load 1', 4),
))
def test_makeopts_parsing(self, parser, makeopts, expected_jobs):
with patch('os.cpu_count', return_value=4), \
os_environ(MAKEOPTS=makeopts):
options = parser.parse_args(['scan'])
assert options.jobs == expected_jobs
assert options.tasks == 5 * expected_jobs
class TestPkgcheckScanParseConfigArgs:
@pytest.fixture(autouse=True)
def _setup(self, parser, tmp_path, repo):
self.parser = parser
self.repo = repo
self.args = ['scan', '-r', repo.location]
self.system_config = str(tmp_path / "system-config")
self.user_config = str(tmp_path / "user-config")
self.config = str(tmp_path / "custom-config")
def test_config_precedence(self):
configs = [self.system_config, self.user_config]
with patch('pkgcheck.cli.ConfigFileParser.default_configs', configs):
with open(self.system_config, 'w') as f:
f.write(textwrap.dedent("""\
[DEFAULT]
jobs=1000
"""))
options = self.parser.parse_args(self.args)
assert options.jobs == 1000
# user config overrides system config
with open(self.user_config, 'w') as f:
f.write(textwrap.dedent("""\
[DEFAULT]
jobs=1001
"""))
options = self.parser.parse_args(self.args)
assert options.jobs == 1001
# repo config overrides user config
with open(pjoin(self.repo.location, 'metadata', 'pkgcheck.conf'), 'w') as f:
f.write(textwrap.dedent("""\
[DEFAULT]
jobs=1002
"""))
options = self.parser.parse_args(self.args)
assert options.jobs == 1002
# custom config overrides user config
with open(self.config, 'w') as f:
f.write(textwrap.dedent("""\
[DEFAULT]
jobs=1003
"""))
config_args = self.args + ['--config', self.config]
options = self.parser.parse_args(config_args)
assert options.jobs == 1003
# repo defaults override general defaults
with open(self.config, 'a') as f:
f.write(textwrap.dedent(f"""\
[{self.repo.repo_id}]
jobs=1004
"""))
options = self.parser.parse_args(config_args)
assert options.jobs == 1004
# command line options override all config settings
options = self.parser.parse_args(config_args + ['--jobs', '9999'])
assert options.jobs == 9999
class TestPkgcheckScan:
script = staticmethod(partial(run, project))
repos_data = pytest.REPO_ROOT / 'testdata/data/repos'
repos_dir = pytest.REPO_ROOT / 'testdata/repos'
repos = tuple(sorted(x.name for x in repos_data.iterdir() if x.name != 'network'))
_all_results = [
(cls, result)
for name, cls in sorted(objects.CHECKS.items())
if not issubclass(cls, checks_mod.NetworkCheck)
for result in sorted(cls.known_results, key=attrgetter('__name__'))
]
@pytest.fixture(autouse=True)
def _setup(self, testconfig, tmp_path):
self.cache_dir = str(tmp_path)
base_args = ['--config', testconfig]
self.scan = partial(scan, base_args=base_args)
# args for running `pkgcheck scan` via API call
self.scan_args = ['--config', 'no', '--cache-dir', self.cache_dir]
# args for running pkgcheck like a script
self.args = [project] + base_args + ['scan'] + self.scan_args
def test_empty_repo(self, capsys, repo):
with patch('sys.argv', self.args + [repo.location]):
with pytest.raises(SystemExit) as excinfo:
self.script()
assert excinfo.value.code == 0
out, err = capsys.readouterr()
assert out == err == ''
def test_no_matching_checks_scope(self, tool):
options, _ = tool.parse_args(['scan', 'standalone'])
path = pjoin(options.target_repo.location, 'profiles')
error = 'no matching checks available for profiles scope'
with pytest.raises(base.PkgcheckUserException, match=error):
self.scan(self.scan_args + ['-c', 'PkgDirCheck', path])
def test_stdin_targets_with_no_args(self):
with patch('sys.stdin', StringIO()):
with pytest.raises(base.PkgcheckUserException, match='no targets'):
self.scan(self.scan_args + ['-'])
def test_exit_status(self, repo):
# create good ebuild and another with an invalid EAPI
repo.create_ebuild('newcat/pkg-0')
repo.create_ebuild('newcat/pkg-1', eapi='-1')
# exit status isn't enabled by default
args = ['-r', repo.location]
with patch('sys.argv', self.args + args):
with pytest.raises(SystemExit) as excinfo:
self.script()
assert excinfo.value.code == 0
# all error level results are flagged by default when enabled
with patch('sys.argv', self.args + args + ['--exit']):
with pytest.raises(SystemExit) as excinfo:
self.script()
assert excinfo.value.code == 1
# selective error results will only flag those specified
with patch('sys.argv', self.args + args + ['--exit', 'InvalidSlot']):
with pytest.raises(SystemExit) as excinfo:
self.script()
assert excinfo.value.code == 0
with patch('sys.argv', self.args + args + ['--exit', 'InvalidEapi']):
with pytest.raises(SystemExit) as excinfo:
self.script()
assert excinfo.value.code == 1
def test_filter_latest(self, make_repo):
repo = make_repo(arches=['amd64'])
# create stub profile to suppress ArchesWithoutProfiles result
repo.create_profiles([Profile('stub', 'amd64')])
# create ebuild with unknown keywords
repo.create_ebuild('cat/pkg-0', keywords=['unknown'], homepage='https://example.com')
# and a good ebuild for the latest version
repo.create_ebuild('cat/pkg-1', keywords=['amd64'], homepage='https://example.com')
# results for old pkgs will be shown by default
args = ['-r', repo.location]
with patch('sys.argv', self.args + args):
results = list(self.scan(self.scan_args + args))
assert len(results) == 1
# but are ignored when running using the 'latest' filter
for opt in ('-f', '--filter'):
for arg in ('latest', 'latest:KeywordsCheck', 'latest:UnknownKeywords'):
assert not list(self.scan(self.scan_args + args + [opt, arg]))
def test_scan_restrictions(self, repo):
# create two ebuilds with bad EAPIs
repo.create_ebuild('cat/pkg-0', eapi='-1')
repo.create_ebuild('cat/pkg-1', eapi='-1')
# matching version restriction returns a single result
results = list(self.scan(self.scan_args + ['-r', repo.location, '=cat/pkg-0']))
assert [x.version for x in results] == ['0']
# unmatching version restriction returns no results
results = list(self.scan(self.scan_args + ['-r', repo.location, '=cat/pkg-2']))
assert not results
# matching package restriction returns two sorted results
results = list(self.scan(self.scan_args + ['-r', repo.location, 'cat/pkg']))
assert [x.version for x in results] == ['0', '1']
# unmatching package restriction returns no results
results = list(self.scan(self.scan_args + ['-r', repo.location, 'cat/unknown']))
assert not results
def test_explict_skip_check(self):
"""SkipCheck exceptions are raised when triggered for explicitly enabled checks."""
error = 'network checks not enabled'
with pytest.raises(base.PkgcheckException, match=error):
self.scan(self.scan_args + ['-C', 'net'])
def test_cache_disabled_skip_check(self):
"""SkipCheck exceptions are raised when enabled checks require disabled cache types."""
args = ['--cache=-git', '-c', 'StableRequestCheck']
error = 'StableRequestCheck: git cache support required'
with pytest.raises(base.PkgcheckException, match=error):
self.scan(self.scan_args + args)
@pytest.mark.parametrize('module', (
pytest.param('pkgcheck.pipeline.UnversionedSource', id='producer'),
pytest.param('pkgcheck.runners.SyncCheckRunner.run', id='consumer'),
))
def test_pipeline_exceptions(self, module):
"""Test checkrunner pipeline against unhandled exceptions."""
with patch(module) as faked:
faked.side_effect = Exception('pipeline failed')
with pytest.raises(base.PkgcheckException, match='Exception: pipeline failed'):
list(self.scan(self.scan_args))
# nested mapping of repos to checks/keywords they cover
_checks = defaultdict(lambda: defaultdict(set))
@pytest.mark.parametrize('repo', repos)
def test_scan_repo_data(self, repo):
"""Make sure the test data is up to date check/result naming wise."""
for check in (self.repos_data / repo).iterdir():
assert check.name in objects.CHECKS
for keyword in check.iterdir():
assert keyword.name in objects.KEYWORDS
self._checks[repo][check.name].add(keyword.name)
@staticmethod
def _script(fix, repo_path):
try:
subprocess.run([fix], cwd=repo_path, capture_output=True, check=True, text=True)
except subprocess.CalledProcessError as exc:
error = exc.stderr if exc.stderr else exc.stdout
pytest.fail(error)
# mapping of repos to scanned results
_results = {}
_verbose_results = {}
@pytest.mark.parametrize('repo', repos)
def test_scan_repo(self, repo, tmp_path, verbosity=0):
"""Scan a target repo, saving results for verification."""
repo_dir = self.repos_dir / repo
# run all existing triggers
triggers = [
pjoin(root, 'trigger.sh')
for root, _dirs, files in os.walk(self.repos_data / repo)
if 'trigger.sh' in files
]
if triggers:
triggered_repo = tmp_path / f'triggered-{repo}'
shutil.copytree(repo_dir, triggered_repo)
for trigger in triggers:
self._script(trigger, triggered_repo)
repo_dir = triggered_repo
if repo not in self._checks:
self.test_scan_repo_data(repo)
args = (['-v'] * verbosity) + ['-r', str(repo_dir), '-c', ','.join(self._checks[repo])]
# add any defined extra repo args
try:
args.extend(shlex.split((repo_dir / 'metadata/pkgcheck-args').read_text()))
except FileNotFoundError:
pass
results = []
for result in self.scan(self.scan_args + args):
# ignore results generated from stubs
stubs = (getattr(result, x, '') for x in ('category', 'package'))
if any(x.startswith('stub') for x in stubs):
continue
results.append(result)
if verbosity:
self._verbose_results[repo] = set(results)
assert len(results) == len(self._verbose_results[repo])
else:
self._results[repo] = set(results)
assert len(results) == len(self._results[repo])
@pytest.mark.parametrize('repo', repos)
def test_scan_repo_verbose(self, repo, tmp_path):
"""Scan a target repo in verbose mode, saving results for verification."""
return self.test_scan_repo(repo, tmp_path, verbosity=1)
def _get_results(self, path):
"""Return the set of result objects from a given json stream file."""
try:
with (self.repos_data / path).open() as f:
return set(reporters.JsonStream.from_iter(f))
except FileNotFoundError:
return set()
def _render_results(self, results, **kwargs):
"""Render a given set of result objects into their related string form."""
with tempfile.TemporaryFile() as f:
with reporters.FancyReporter(out=PlainTextFormatter(f), **kwargs) as reporter:
for result in sorted(results):
reporter.report(result)
f.seek(0)
output = f.read().decode()
return output
@pytest.mark.parametrize('repo', repos)
def test_scan_verify(self, repo, tmp_path):
"""Run pkgcheck against test pkgs in bundled repo, verifying result output."""
results = set()
verbose_results = set()
if repo not in self._results:
self.test_scan_repo(repo, tmp_path, verbosity=0)
if repo not in self._verbose_results:
self.test_scan_repo(repo, tmp_path, verbosity=0)
for check, keywords in self._checks[repo].items():
for keyword in keywords:
# verify the expected results were seen during the repo scans
expected_results = self._get_results(f'{repo}/{check}/{keyword}/expected.json')
assert expected_results, 'regular results must always exist'
assert self._render_results(expected_results), 'failed rendering results'
results.update(expected_results)
# when expected verbose results exist use them, otherwise fallback to using the regular ones
expected_verbose_results = self._get_results(f'{repo}/{check}/{keyword}/expected-verbose.json')
if expected_verbose_results:
assert self._render_results(expected_verbose_results), 'failed rendering verbose results'
verbose_results.update(expected_verbose_results)
else:
verbose_results.update(expected_results)
if results != self._results[repo]:
missing = self._render_results(results - self._results[repo])
unknown = self._render_results(self._results[repo] - results)
error = ['unmatched repo scan results:']
if missing:
error.append(f'{repo} repo missing expected results:\n{missing}')
if unknown:
error.append(f'{repo} repo unknown results:\n{unknown}')
pytest.fail('\n'.join(error))
if verbose_results != self._verbose_results[repo]:
missing = self._render_results(verbose_results - self._verbose_results[repo])
unknown = self._render_results(self._verbose_results[repo] - verbose_results)
error = ['unmatched verbose repo scan results:']
if missing:
error.append(f'{repo} repo missing expected results:\n{missing}')
if unknown:
error.append(f'{repo} repo unknown results:\n{unknown}')
pytest.fail('\n'.join(error))
@staticmethod
def _patch(fix, repo_path):
with fix.open() as fix_file:
try:
subprocess.run(
['patch', '-p1'], cwd=repo_path, stdin=fix_file,
capture_output=True, check=True, text=True)
except subprocess.CalledProcessError as exc:
error = exc.stderr if exc.stderr else exc.stdout
pytest.fail(error)
@pytest.mark.parametrize('check, result', _all_results)
def test_fix(self, check, result, tmp_path):
"""Apply fixes to pkgs, verifying the related results are fixed."""
check_name = check.__name__
keyword = result.__name__
tested = False
for repo in self.repos:
keyword_dir = self.repos_data / repo / check_name / keyword
if (fix := keyword_dir / 'fix.patch').exists():
func = self._patch
elif (fix := keyword_dir / 'fix.sh').exists():
func = self._script
else:
continue
# apply a fix if one exists and make sure the related result doesn't appear
repo_dir = self.repos_dir / repo
fixed_repo = tmp_path / f'fixed-{repo}'
shutil.copytree(repo_dir, fixed_repo)
func(fix, fixed_repo)
args = ['-r', str(fixed_repo), '-c', check_name, '-k', keyword]
# add any defined extra repo args
try:
with open(f'{repo_dir}/metadata/pkgcheck-args') as f:
args.extend(shlex.split(f.read()))
except FileNotFoundError:
pass
results = list(self.scan(self.scan_args + args))
if results:
error = ['unexpected repo scan results:']
error.append(self._render_results(results))
pytest.fail('\n'.join(error))
shutil.rmtree(fixed_repo)
tested = True
if not tested:
pytest.skip('fix not available')
|