Gilles Peskine | 9f930e0 | 2024-10-03 17:38:32 +0200 | [diff] [blame] | 1 | """Outcome file analysis code. |
| 2 | |
| 3 | This module is the bulk of the code of tests/scripts/analyze_outcomes.py |
| 4 | in each consuming branch. The consuming script is expected to derive |
| 5 | the classes with branch-specific customizations such as ignore lists. |
| 6 | """ |
| 7 | |
| 8 | # Copyright The Mbed TLS Contributors |
| 9 | # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later |
Gilles Peskine | 082eade | 2024-10-03 18:42:37 +0200 | [diff] [blame^] | 10 | |
| 11 | import argparse |
| 12 | import sys |
| 13 | import traceback |
| 14 | import re |
| 15 | import subprocess |
| 16 | import os |
| 17 | import typing |
| 18 | |
| 19 | import collect_test_cases |
| 20 | |
| 21 | |
| 22 | # `ComponentOutcomes` is a named tuple which is defined as: |
| 23 | # ComponentOutcomes( |
| 24 | # successes = { |
| 25 | # "<suite_case>", |
| 26 | # ... |
| 27 | # }, |
| 28 | # failures = { |
| 29 | # "<suite_case>", |
| 30 | # ... |
| 31 | # } |
| 32 | # ) |
| 33 | # suite_case = "<suite>;<case>" |
| 34 | ComponentOutcomes = typing.NamedTuple('ComponentOutcomes', |
| 35 | [('successes', typing.Set[str]), |
| 36 | ('failures', typing.Set[str])]) |
| 37 | |
| 38 | # `Outcomes` is a representation of the outcomes file, |
| 39 | # which defined as: |
| 40 | # Outcomes = { |
| 41 | # "<component>": ComponentOutcomes, |
| 42 | # ... |
| 43 | # } |
| 44 | Outcomes = typing.Dict[str, ComponentOutcomes] |
| 45 | |
| 46 | |
| 47 | class Results: |
| 48 | """Process analysis results.""" |
| 49 | |
| 50 | def __init__(self): |
| 51 | self.error_count = 0 |
| 52 | self.warning_count = 0 |
| 53 | |
| 54 | def new_section(self, fmt, *args, **kwargs): |
| 55 | self._print_line('\n*** ' + fmt + ' ***\n', *args, **kwargs) |
| 56 | |
| 57 | def info(self, fmt, *args, **kwargs): |
| 58 | self._print_line('Info: ' + fmt, *args, **kwargs) |
| 59 | |
| 60 | def error(self, fmt, *args, **kwargs): |
| 61 | self.error_count += 1 |
| 62 | self._print_line('Error: ' + fmt, *args, **kwargs) |
| 63 | |
| 64 | def warning(self, fmt, *args, **kwargs): |
| 65 | self.warning_count += 1 |
| 66 | self._print_line('Warning: ' + fmt, *args, **kwargs) |
| 67 | |
| 68 | @staticmethod |
| 69 | def _print_line(fmt, *args, **kwargs): |
| 70 | sys.stderr.write((fmt + '\n').format(*args, **kwargs)) |
| 71 | |
| 72 | def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \ |
| 73 | outcome_file: str) -> None: |
| 74 | """Run the tests specified in ref_component and driver_component. Results |
| 75 | are stored in the output_file and they will be used for the following |
| 76 | coverage analysis""" |
| 77 | results.new_section("Test {} and {}", ref_component, driver_component) |
| 78 | |
| 79 | shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \ |
| 80 | " " + ref_component + " " + driver_component |
| 81 | results.info("Running: {}", shell_command) |
| 82 | ret_val = subprocess.run(shell_command.split(), check=False).returncode |
| 83 | |
| 84 | if ret_val != 0: |
| 85 | results.error("failed to run reference/driver components") |
| 86 | |
| 87 | IgnoreEntry = typing.Union[str, typing.Pattern] |
| 88 | |
| 89 | def name_matches_pattern(name: str, str_or_re: IgnoreEntry) -> bool: |
| 90 | """Check if name matches a pattern, that may be a string or regex. |
| 91 | - If the pattern is a string, name must be equal to match. |
| 92 | - If the pattern is a regex, name must fully match. |
| 93 | """ |
| 94 | # The CI's python is too old for re.Pattern |
| 95 | #if isinstance(str_or_re, re.Pattern): |
| 96 | if not isinstance(str_or_re, str): |
| 97 | return str_or_re.fullmatch(name) is not None |
| 98 | else: |
| 99 | return str_or_re == name |
| 100 | |
| 101 | def read_outcome_file(outcome_file: str) -> Outcomes: |
| 102 | """Parse an outcome file and return an outcome collection. |
| 103 | """ |
| 104 | outcomes = {} |
| 105 | with open(outcome_file, 'r', encoding='utf-8') as input_file: |
| 106 | for line in input_file: |
| 107 | (_platform, component, suite, case, result, _cause) = line.split(';') |
| 108 | # Note that `component` is not unique. If a test case passes on Linux |
| 109 | # and fails on FreeBSD, it'll end up in both the successes set and |
| 110 | # the failures set. |
| 111 | suite_case = ';'.join([suite, case]) |
| 112 | if component not in outcomes: |
| 113 | outcomes[component] = ComponentOutcomes(set(), set()) |
| 114 | if result == 'PASS': |
| 115 | outcomes[component].successes.add(suite_case) |
| 116 | elif result == 'FAIL': |
| 117 | outcomes[component].failures.add(suite_case) |
| 118 | |
| 119 | return outcomes |
| 120 | |
| 121 | |
| 122 | class Task: |
| 123 | """Base class for outcome analysis tasks.""" |
| 124 | |
| 125 | # Override the following in child classes. |
| 126 | # Map test suite names (with the test_suite_prefix) to a list of ignored |
| 127 | # test cases. Each element in the list can be either a string or a regex; |
| 128 | # see the `name_matches_pattern` function. |
| 129 | IGNORED_TESTS = {} #type: typing.Dict[str, typing.List[IgnoreEntry]] |
| 130 | |
| 131 | def __init__(self, options) -> None: |
| 132 | """Pass command line options to the tasks. |
| 133 | |
| 134 | Each task decides which command line options it cares about. |
| 135 | """ |
| 136 | pass |
| 137 | |
| 138 | def section_name(self) -> str: |
| 139 | """The section name to use in results.""" |
| 140 | raise NotImplementedError |
| 141 | |
| 142 | def ignored_tests(self, test_suite: str) -> typing.Iterator[IgnoreEntry]: |
| 143 | """Generate the ignore list for the specified test suite.""" |
| 144 | if test_suite in self.IGNORED_TESTS: |
| 145 | yield from self.IGNORED_TESTS[test_suite] |
| 146 | pos = test_suite.find('.') |
| 147 | if pos != -1: |
| 148 | base_test_suite = test_suite[:pos] |
| 149 | if base_test_suite in self.IGNORED_TESTS: |
| 150 | yield from self.IGNORED_TESTS[base_test_suite] |
| 151 | |
| 152 | def is_test_case_ignored(self, test_suite: str, test_string: str) -> bool: |
| 153 | """Check if the specified test case is ignored.""" |
| 154 | for str_or_re in self.ignored_tests(test_suite): |
| 155 | if name_matches_pattern(test_string, str_or_re): |
| 156 | return True |
| 157 | return False |
| 158 | |
| 159 | def run(self, results: Results, outcomes: Outcomes): |
| 160 | """Run the analysis on the specified outcomes. |
| 161 | |
| 162 | Signal errors via the results objects |
| 163 | """ |
| 164 | raise NotImplementedError |
| 165 | |
| 166 | |
| 167 | class CoverageTask(Task): |
| 168 | """Analyze test coverage.""" |
| 169 | |
| 170 | # Test cases whose suite and description are matched by an entry in |
| 171 | # IGNORED_TESTS are expected to be never executed. |
| 172 | # All other test cases are expected to be executed at least once. |
| 173 | |
| 174 | def __init__(self, options) -> None: |
| 175 | super().__init__(options) |
| 176 | self.full_coverage = options.full_coverage #type: bool |
| 177 | |
| 178 | @staticmethod |
| 179 | def section_name() -> str: |
| 180 | return "Analyze coverage" |
| 181 | |
| 182 | def run(self, results: Results, outcomes: Outcomes) -> None: |
| 183 | """Check that all available test cases are executed at least once.""" |
| 184 | # Make sure that the generated data files are present (and up-to-date). |
| 185 | # This allows analyze_outcomes.py to run correctly on a fresh Git |
| 186 | # checkout. |
| 187 | cp = subprocess.run(['make', 'generated_files'], |
| 188 | cwd='tests', |
| 189 | stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
| 190 | check=False) |
| 191 | if cp.returncode != 0: |
| 192 | sys.stderr.write(cp.stdout.decode('utf-8')) |
| 193 | results.error("Failed \"make generated_files\" in tests. " |
| 194 | "Coverage analysis may be incorrect.") |
| 195 | available = collect_test_cases.collect_available_test_cases() |
| 196 | for suite_case in available: |
| 197 | hit = any(suite_case in comp_outcomes.successes or |
| 198 | suite_case in comp_outcomes.failures |
| 199 | for comp_outcomes in outcomes.values()) |
| 200 | (test_suite, test_description) = suite_case.split(';') |
| 201 | ignored = self.is_test_case_ignored(test_suite, test_description) |
| 202 | |
| 203 | if not hit and not ignored: |
| 204 | if self.full_coverage: |
| 205 | results.error('Test case not executed: {}', suite_case) |
| 206 | else: |
| 207 | results.warning('Test case not executed: {}', suite_case) |
| 208 | elif hit and ignored: |
| 209 | # If a test case is no longer always skipped, we should remove |
| 210 | # it from the ignore list. |
| 211 | if self.full_coverage: |
| 212 | results.error('Test case was executed but marked as ignored for coverage: {}', |
| 213 | suite_case) |
| 214 | else: |
| 215 | results.warning('Test case was executed but marked as ignored for coverage: {}', |
| 216 | suite_case) |
| 217 | |
| 218 | |
| 219 | class DriverVSReference(Task): |
| 220 | """Compare outcomes from testing with and without a driver. |
| 221 | |
| 222 | There are 2 options to use analyze_driver_vs_reference_xxx locally: |
| 223 | 1. Run tests and then analysis: |
| 224 | - tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver> |
| 225 | - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx |
| 226 | 2. Let this script run both automatically: |
| 227 | - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx |
| 228 | """ |
| 229 | |
| 230 | # Override the following in child classes. |
| 231 | # Configuration name (all.sh component) used as the reference. |
| 232 | REFERENCE = '' |
| 233 | # Configuration name (all.sh component) used as the driver. |
| 234 | DRIVER = '' |
| 235 | # Ignored test suites (without the test_suite_ prefix). |
| 236 | IGNORED_SUITES = [] #type: typing.List[str] |
| 237 | |
| 238 | def __init__(self, options) -> None: |
| 239 | super().__init__(options) |
| 240 | self.ignored_suites = frozenset('test_suite_' + x |
| 241 | for x in self.IGNORED_SUITES) |
| 242 | |
| 243 | def section_name(self) -> str: |
| 244 | return f"Analyze driver {self.DRIVER} vs reference {self.REFERENCE}" |
| 245 | |
| 246 | def run(self, results: Results, outcomes: Outcomes) -> None: |
| 247 | """Check that all tests passing in the driver component are also |
| 248 | passing in the corresponding reference component. |
| 249 | Skip: |
| 250 | - full test suites provided in ignored_suites list |
| 251 | - only some specific test inside a test suite, for which the corresponding |
| 252 | output string is provided |
| 253 | """ |
| 254 | ref_outcomes = outcomes.get("component_" + self.REFERENCE) |
| 255 | driver_outcomes = outcomes.get("component_" + self.DRIVER) |
| 256 | |
| 257 | if ref_outcomes is None or driver_outcomes is None: |
| 258 | results.error("required components are missing: bad outcome file?") |
| 259 | return |
| 260 | |
| 261 | if not ref_outcomes.successes: |
| 262 | results.error("no passing test in reference component: bad outcome file?") |
| 263 | return |
| 264 | |
| 265 | for suite_case in ref_outcomes.successes: |
| 266 | # suite_case is like "test_suite_foo.bar;Description of test case" |
| 267 | (full_test_suite, test_string) = suite_case.split(';') |
| 268 | test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name |
| 269 | |
| 270 | # Immediately skip fully-ignored test suites |
| 271 | if test_suite in self.ignored_suites or \ |
| 272 | full_test_suite in self.ignored_suites: |
| 273 | continue |
| 274 | |
| 275 | # For ignored test cases inside test suites, just remember and: |
| 276 | # don't issue an error if they're skipped with drivers, |
| 277 | # but issue an error if they're not (means we have a bad entry). |
| 278 | ignored = self.is_test_case_ignored(full_test_suite, test_string) |
| 279 | |
| 280 | if not ignored and not suite_case in driver_outcomes.successes: |
| 281 | results.error("SKIP/FAIL -> PASS: {}", suite_case) |
| 282 | if ignored and suite_case in driver_outcomes.successes: |
| 283 | results.error("uselessly ignored: {}", suite_case) |
| 284 | |
| 285 | |
| 286 | def main(known_tasks: typing.Dict[str, typing.Type[Task]]) -> None: |
| 287 | main_results = Results() |
| 288 | |
| 289 | try: |
| 290 | parser = argparse.ArgumentParser(description=__doc__) |
| 291 | parser.add_argument('outcomes', metavar='OUTCOMES.CSV', |
| 292 | help='Outcome file to analyze') |
| 293 | parser.add_argument('specified_tasks', default='all', nargs='?', |
| 294 | help='Analysis to be done. By default, run all tasks. ' |
| 295 | 'With one or more TASK, run only those. ' |
| 296 | 'TASK can be the name of a single task or ' |
| 297 | 'comma/space-separated list of tasks. ') |
| 298 | parser.add_argument('--list', action='store_true', |
| 299 | help='List all available tasks and exit.') |
| 300 | parser.add_argument('--require-full-coverage', action='store_true', |
| 301 | dest='full_coverage', help="Require all available " |
| 302 | "test cases to be executed and issue an error " |
| 303 | "otherwise. This flag is ignored if 'task' is " |
| 304 | "neither 'all' nor 'analyze_coverage'") |
| 305 | options = parser.parse_args() |
| 306 | |
| 307 | if options.list: |
| 308 | for task_name in known_tasks: |
| 309 | print(task_name) |
| 310 | sys.exit(0) |
| 311 | |
| 312 | if options.specified_tasks == 'all': |
| 313 | tasks_list = list(known_tasks.keys()) |
| 314 | else: |
| 315 | tasks_list = re.split(r'[, ]+', options.specified_tasks) |
| 316 | for task_name in tasks_list: |
| 317 | if task_name not in known_tasks: |
| 318 | sys.stderr.write('invalid task: {}\n'.format(task_name)) |
| 319 | sys.exit(2) |
| 320 | |
| 321 | # If the outcome file exists, parse it once and share the result |
| 322 | # among tasks to improve performance. |
| 323 | # Otherwise, it will be generated by execute_reference_driver_tests. |
| 324 | if not os.path.exists(options.outcomes): |
| 325 | if len(tasks_list) > 1: |
| 326 | sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n") |
| 327 | sys.exit(2) |
| 328 | |
| 329 | task_name = tasks_list[0] |
| 330 | task_class = known_tasks[task_name] |
| 331 | if not issubclass(task_class, DriverVSReference): |
| 332 | sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name)) |
| 333 | sys.exit(2) |
| 334 | # mypy isn't smart enough to know that REFERENCE and DRIVER |
| 335 | # are *class* attributes of all classes derived from |
| 336 | # DriverVSReference. (It would be smart enough if we had an |
| 337 | # instance of task_class, but we can't construct an instance |
| 338 | # until we have the outcome data, so at this point we only |
| 339 | # have the class.) So we use indirection to access the class |
| 340 | # attributes. |
| 341 | execute_reference_driver_tests(main_results, |
| 342 | getattr(task_class, 'REFERENCE'), |
| 343 | getattr(task_class, 'DRIVER'), |
| 344 | options.outcomes) |
| 345 | |
| 346 | outcomes = read_outcome_file(options.outcomes) |
| 347 | |
| 348 | for task_name in tasks_list: |
| 349 | task_constructor = known_tasks[task_name] |
| 350 | task_instance = task_constructor(options) |
| 351 | main_results.new_section(task_instance.section_name()) |
| 352 | task_instance.run(main_results, outcomes) |
| 353 | |
| 354 | main_results.info("Overall results: {} warnings and {} errors", |
| 355 | main_results.warning_count, main_results.error_count) |
| 356 | |
| 357 | sys.exit(0 if (main_results.error_count == 0) else 1) |
| 358 | |
| 359 | except Exception: # pylint: disable=broad-except |
| 360 | # Print the backtrace and exit explicitly with our chosen status. |
| 361 | traceback.print_exc() |
| 362 | sys.exit(120) |