Govindraj Raja | 4db3c00 | 2025-04-10 17:23:19 -0500 | [diff] [blame^] | 1 | #!/usr/bin/env python3 |
| 2 | # |
| 3 | # Copyright (c) 2025, Arm Limited. All rights reserved. |
| 4 | # |
| 5 | # SPDX-License-Identifier: BSD-3-Clause |
| 6 | # |
| 7 | |
| 8 | import argparse |
| 9 | import os |
| 10 | import re |
| 11 | import subprocess |
| 12 | import sys |
| 13 | import logging |
| 14 | from pathlib import Path |
| 15 | |
| 16 | |
| 17 | def subprocess_run(cmd, **kwargs): |
| 18 | logging.debug("Running command: %r %r", cmd, kwargs) |
| 19 | return subprocess.run(cmd, **kwargs) |
| 20 | |
| 21 | |
| 22 | def parse_workarounds(filepath: str): |
| 23 | """ |
| 24 | Parse the file line by line. For every start marker ('workaround_reset_start' |
| 25 | or 'workaround_runtime_start'), we look for its matching end marker |
| 26 | ('workaround_reset_end' or 'workaround_runtime_end'). |
| 27 | |
| 28 | If a start is missing its end, or if we find an end with no corresponding |
| 29 | start, set error value to True which is to be returned as a tuple along with |
| 30 | the list of dictionaries. |
| 31 | |
| 32 | Returns: |
| 33 | A list of dictionaries. Each dictionary has: |
| 34 | - start_line: line number of the workaround start |
| 35 | - end_line: line number of the matching workaround end |
| 36 | - marker_type: 'reset' or 'runtime' |
| 37 | - erratum_number: integer if it's an ERRATUM (from ERRATUM(X)), else None |
| 38 | - cve_year: integer if it's a CVE, else None |
| 39 | - cve_number: integer if it's a CVE, else None |
| 40 | Error value set to True if we fail to match workaround start to an end. |
| 41 | """ |
| 42 | |
| 43 | # Read all lines in memory |
| 44 | with open(filepath, "r") as f: |
| 45 | lines = f.readlines() |
| 46 | |
| 47 | # We'll keep a stack of active "starts" that haven't yet found their "end" |
| 48 | start_stack = [] |
| 49 | results = [] |
| 50 | error = False |
| 51 | |
| 52 | # Regex patterns for capturing ERRATUM and CVE |
| 53 | # Example: ERRATUM(123) or CVE-2022-789 |
| 54 | erratum_pattern = re.compile(r"ERRATUM\s*\(\s*(\d+)\s*\)", re.IGNORECASE) |
| 55 | cve_pattern = re.compile(r"CVE[-_:]?(\d{4})[-_:]?(\d+)", re.IGNORECASE) |
| 56 | |
| 57 | for i, line in enumerate(lines, start=1): |
| 58 | stripped = line.strip() |
| 59 | |
| 60 | # ---------------------------------------------------------------------- |
| 61 | # 1) Check for "start" markers |
| 62 | # We look first for 'workaround_reset_start' or 'workaround_runtime_start' |
| 63 | # ---------------------------------------------------------------------- |
| 64 | if "workaround_reset_start" in stripped: |
| 65 | marker_type = "reset" |
| 66 | # Attempt to extract ERRATUM or CVE |
| 67 | erratum_match = erratum_pattern.search(stripped) |
| 68 | cve_match = cve_pattern.search(stripped) |
| 69 | |
| 70 | if erratum_match: |
| 71 | erratum_number = int(erratum_match.group(1)) |
| 72 | cve_year, cve_number = None, None |
| 73 | elif cve_match: |
| 74 | erratum_number = None |
| 75 | cve_year = int(cve_match.group(1)) |
| 76 | cve_number = int(cve_match.group(2)) |
| 77 | else: |
| 78 | error |= True |
| 79 | logging.error( |
| 80 | f"Couldn't find a valid Errata number or CVE year " |
| 81 | f"in marker type {marker_type} in line number {i}" |
| 82 | ) |
| 83 | return results, error |
| 84 | |
| 85 | # Push onto the stack |
| 86 | start_stack.append({ |
| 87 | "start_line": i, |
| 88 | "marker_type": marker_type, # 'reset' |
| 89 | "erratum_number": erratum_number, |
| 90 | "cve_year": cve_year, |
| 91 | "cve_number": cve_number |
| 92 | }) |
| 93 | |
| 94 | elif "workaround_runtime_start" in stripped: |
| 95 | marker_type = "runtime" |
| 96 | # Attempt to extract ERRATUM or CVE |
| 97 | erratum_match = erratum_pattern.search(stripped) |
| 98 | cve_match = cve_pattern.search(stripped) |
| 99 | |
| 100 | if erratum_match: |
| 101 | erratum_number = int(erratum_match.group(1)) |
| 102 | cve_year, cve_number = None, None |
| 103 | elif cve_match: |
| 104 | erratum_number = None |
| 105 | cve_year = int(cve_match.group(1)) |
| 106 | cve_number = int(cve_match.group(2)) |
| 107 | else: |
| 108 | error |= True |
| 109 | logging.error( |
| 110 | f"Couldn't find a valid Errata number or CVE year " |
| 111 | f"in marker type {marker_type} in line number {i}" |
| 112 | ) |
| 113 | return results, error |
| 114 | |
| 115 | # Push onto the stack |
| 116 | start_stack.append({ |
| 117 | "start_line": i, |
| 118 | "marker_type": marker_type, # 'runtime' |
| 119 | "erratum_number": erratum_number, |
| 120 | "cve_year": cve_year, |
| 121 | "cve_number": cve_number |
| 122 | }) |
| 123 | |
| 124 | # ---------------------------------------------------------------------- |
| 125 | # 2) Check for "end" markers |
| 126 | # We look for 'workaround_reset_end' or 'workaround_runtime_end' |
| 127 | # ---------------------------------------------------------------------- |
| 128 | elif "workaround_reset_end" in stripped: |
| 129 | # Attempt to pop the most recent start |
| 130 | if not start_stack: |
| 131 | logging.error( |
| 132 | f"[Line {i}] Found 'workaround_reset_end' " |
| 133 | f"without matching 'workaround_reset_start'." |
| 134 | ) |
| 135 | error |= True |
| 136 | break |
| 137 | |
| 138 | # Pop the most recent start |
| 139 | last_item = start_stack.pop() |
| 140 | |
| 141 | # Check the marker type |
| 142 | if last_item["marker_type"] != "reset": |
| 143 | error = True |
| 144 | logging.error( |
| 145 | f"[Line {i}] Found 'workaround_reset_end' " |
| 146 | f"that does not match " |
| 147 | f"the most recent '{last_item['marker_type']}' " |
| 148 | f"start at line {last_item['start_line']}." |
| 149 | ) |
| 150 | error |= True |
| 151 | break |
| 152 | |
| 153 | last_item["end_line"] = i |
| 154 | results.append(last_item) |
| 155 | |
| 156 | elif "workaround_runtime_end" in stripped: |
| 157 | # We need a matching "runtime" start |
| 158 | if not start_stack: |
| 159 | logging.error( |
| 160 | f"[Line {i}] Found 'workaround_runtime_end' " |
| 161 | f"without matching start." |
| 162 | ) |
| 163 | error |= True |
| 164 | break |
| 165 | |
| 166 | # Pop the most recent start |
| 167 | last_item = start_stack.pop() |
| 168 | |
| 169 | # Check the marker type |
| 170 | if last_item["marker_type"] != "runtime": |
| 171 | logging.error( |
| 172 | f"[Line {i}] Found 'workaround_runtime_end' " |
| 173 | f"that does not match " |
| 174 | f"the most recent '{last_item['marker_type']}' " |
| 175 | f"start at line {last_item['start_line']}." |
| 176 | ) |
| 177 | error |= True |
| 178 | break |
| 179 | |
| 180 | last_item["end_line"] = i |
| 181 | results.append(last_item) |
| 182 | |
| 183 | # ---------------------------------------------------------------------- |
| 184 | # After processing all lines, if the stack is not empty, it means some |
| 185 | # starts have no matching ends |
| 186 | # ---------------------------------------------------------------------- |
| 187 | if start_stack: |
| 188 | first_unmatched = start_stack[0] |
| 189 | logging.error( |
| 190 | f"'workaround_{first_unmatched[1]}_start' " |
| 191 | f"at line {first_unmatched[0]} " |
| 192 | f"did not have a matching end marker." |
| 193 | ) |
| 194 | |
| 195 | return results, error |
| 196 | |
| 197 | |
| 198 | def check_ascending_order(data): |
| 199 | """ |
| 200 | Ensures that: |
| 201 | 1) All ERRATUM blocks appear first (in ascending order of their erratum_number), |
| 202 | 2) Then all CVE blocks appear (in ascending order of their cve_year and if the |
| 203 | year is the same, ascending by cve_number as well). |
| 204 | |
| 205 | Returns: |
| 206 | False, If an ERRATUM appears after a CVE has started, or if the ordering within |
| 207 | ERRATUMs or CVEs is incorrect, else returns True. |
| 208 | """ |
| 209 | |
| 210 | # Sort everything by the line number where the workaround starts |
| 211 | data_sorted = sorted(data, key=lambda x: x["start_line"]) |
| 212 | |
| 213 | # We'll gather ERRATUM items first, in the order they appear, |
| 214 | # then CVE items. If we ever see an ERRATUM after we've started |
| 215 | # collecting CVEs, we'll raise an error. |
| 216 | found_cve = False |
| 217 | errata_list = [] |
| 218 | cve_list = [] |
| 219 | |
| 220 | for item in data_sorted: |
| 221 | # Is this entry an ERRATUM or a CVE? |
| 222 | if item["erratum_number"] is not None: # This is an ERRATUM |
| 223 | if found_cve: |
| 224 | # We already encountered a CVE, so no more ERRATUMs allowed |
| 225 | logging.error( |
| 226 | f"ERRATUM({item['erratum_number']}) found " |
| 227 | f"at line {item['start_line']} " |
| 228 | f"after the first CVE has already appeared." |
| 229 | ) |
| 230 | return False |
| 231 | errata_list.append(item) |
| 232 | elif item["cve_year"] is not None: # This is a CVE |
| 233 | found_cve = True |
| 234 | cve_list.append(item) |
| 235 | else: |
| 236 | # If neither erratum_number nor cve_year is present |
| 237 | # return False to fail the check. |
| 238 | logging.error( |
| 239 | f"ERRATUM or CVE year not found at " |
| 240 | f"line {item['start_line']}" |
| 241 | ) |
| 242 | return False |
| 243 | |
| 244 | # ------------------------------------------------------------- |
| 245 | # 1) Check ascending order of ERRATUM IDs |
| 246 | # ------------------------------------------------------------- |
| 247 | prev_erratum = 0 |
| 248 | for erratum_item in errata_list: |
| 249 | eno = erratum_item["erratum_number"] |
| 250 | if prev_erratum and eno < prev_erratum: |
| 251 | logging.error( |
| 252 | f"ERRATUM IDs are not in ascending order! " |
| 253 | f"Found ERRATUM({eno}) " |
| 254 | f"after ERRATUM({prev_erratum})." |
| 255 | ) |
| 256 | return False |
| 257 | prev_erratum = eno |
| 258 | |
| 259 | # ------------------------------------------------------------- |
| 260 | # 2) Check CVE year (and then CVE number) are ascending |
| 261 | # ------------------------------------------------------------- |
| 262 | prev_cve_year = 0 |
| 263 | prev_cve_number = 0 |
| 264 | for cve_item in cve_list: |
| 265 | year = cve_item["cve_year"] |
| 266 | num = cve_item["cve_number"] |
| 267 | |
| 268 | if prev_cve_year and year < prev_cve_year: |
| 269 | logging.error( |
| 270 | f"CVE years are not in ascending order! " |
| 271 | f"Found CVE({year},...) " |
| 272 | f"after CVE({prev_cve_year},...)." |
| 273 | ) |
| 274 | return False |
| 275 | elif year == prev_cve_year: |
| 276 | # Years match, so check if this CVE number < previous CVE number |
| 277 | if num < prev_cve_number: |
| 278 | logging.error( |
| 279 | f"CVE Numbers are not in ascending order! " |
| 280 | f"Found CVE({year, num} ,...) " |
| 281 | f"after CVE({prev_cve_year, prev_cve_number},...)." |
| 282 | ) |
| 283 | return False |
| 284 | |
| 285 | # Update previous references |
| 286 | prev_cve_year = year |
| 287 | prev_cve_number = num |
| 288 | |
| 289 | # If we reach here, then the ordering is correct return True. |
| 290 | return True |
| 291 | |
| 292 | |
| 293 | def patch_has_cpu_files(base_commit, end_commit): |
| 294 | """Get the output of a git diff and analyse each modified file.""" |
| 295 | |
| 296 | # Get patches of the affected commits with one line of context. |
| 297 | gitdiff = subprocess_run( |
| 298 | [ |
| 299 | "git", |
| 300 | "diff", |
| 301 | "--name-only", |
| 302 | base_commit + ".." + end_commit, |
| 303 | "lib/cpus/aarch64/" |
| 304 | ], |
| 305 | stdout=subprocess.PIPE, |
| 306 | ) |
| 307 | |
| 308 | if gitdiff.returncode != 0: |
| 309 | return False |
| 310 | |
| 311 | cpu_files_modified = gitdiff.stdout.decode("utf-8").splitlines() |
| 312 | return cpu_files_modified |
| 313 | |
| 314 | |
| 315 | def list_files_in_directory(dir_path): |
| 316 | """ |
| 317 | Returns a list of files in the specified directory. |
| 318 | Args: |
| 319 | dir_path: The path to the directory. |
| 320 | |
| 321 | Returns: |
| 322 | A list of file names in the directory. |
| 323 | """ |
| 324 | try: |
| 325 | files = [ |
| 326 | os.path.join(dir_path, f) for f in os.listdir(dir_path) |
| 327 | if os.path.isfile(os.path.join(dir_path, f)) |
| 328 | ] |
| 329 | return files |
| 330 | except FileNotFoundError: |
| 331 | return f"Directory not found: {dir_path}" |
| 332 | except NotADirectoryError: |
| 333 | return f"Not a directory: {dir_path}" |
| 334 | except Exception as e: |
| 335 | return f"An error occurred: {e}" |
| 336 | |
| 337 | |
| 338 | def parse_cmd_line(argv, prog_name): |
| 339 | parser = argparse.ArgumentParser( |
| 340 | prog=prog_name, |
| 341 | formatter_class=argparse.RawTextHelpFormatter, |
| 342 | description="Check alphabetical order of #includes", |
| 343 | epilog=""" |
| 344 | For each source file in the tree, checks that #include's C preprocessor |
| 345 | directives are ordered alphabetically (as mandated by the Trusted |
| 346 | Firmware coding style). System header includes must come before user |
| 347 | header includes. |
| 348 | """, |
| 349 | ) |
| 350 | |
| 351 | parser.add_argument( |
| 352 | "--tree", |
| 353 | "-t", |
| 354 | help="Path to the source tree to check (default: %(default)s)", |
| 355 | default=os.curdir, |
| 356 | ) |
| 357 | parser.add_argument( |
| 358 | "--patch", |
| 359 | "-p", |
| 360 | help=""" |
| 361 | Patch mode. |
| 362 | Instead of checking all files in the source tree, the script will consider |
| 363 | only files that are modified by the latest patch(es).""", |
| 364 | action="store_true", |
| 365 | ) |
| 366 | parser.add_argument( |
| 367 | "--from-ref", |
| 368 | help="Base commit in patch mode (default: %(default)s)", |
| 369 | default="master", |
| 370 | ) |
| 371 | parser.add_argument( |
| 372 | "--to-ref", |
| 373 | help="Final commit in patch mode (default: %(default)s)", |
| 374 | default="HEAD", |
| 375 | ) |
| 376 | parser.add_argument( |
| 377 | "--debug", |
| 378 | help="Enable debug logging", |
| 379 | action="store_true", |
| 380 | ) |
| 381 | |
| 382 | args = parser.parse_args(argv) |
| 383 | return args |
| 384 | |
| 385 | |
| 386 | if __name__ == "__main__": |
| 387 | args = parse_cmd_line(sys.argv[1:], sys.argv[0]) |
| 388 | |
| 389 | if args.debug: |
| 390 | logging.basicConfig(level=logging.DEBUG) |
| 391 | else: |
| 392 | logging.basicConfig(level=logging.INFO) |
| 393 | |
| 394 | os.chdir(args.tree) |
| 395 | |
| 396 | if args.patch: |
| 397 | logging.info( |
| 398 | "Checking CPU files modified between patches " |
| 399 | + args.from_ref |
| 400 | + " and " |
| 401 | + args.to_ref |
| 402 | + " ..." |
| 403 | ) |
| 404 | list_cpu_files = patch_has_cpu_files(args.from_ref, args.to_ref) |
| 405 | if not list_cpu_files: |
| 406 | logging.info(f"No CPU files Modified") |
| 407 | sys.exit(0) |
| 408 | else: |
| 409 | dir_path = "lib/cpus/aarch64/" |
| 410 | logging.info(f"Checking all CPU files in directory `{dir_path}`") |
| 411 | list_cpu_files = list_files_in_directory(dir_path) |
| 412 | if not list_cpu_files: |
| 413 | logging.error(f"`lib/cpus/aarch64/` directory is empty") |
| 414 | sys.exit(1) |
| 415 | |
| 416 | failure = False |
| 417 | for file in list_cpu_files: |
| 418 | logging.info(f"Checking File {file} .....") |
| 419 | # 1. Parse the file for workaround blocks |
| 420 | parsed_data, error = parse_workarounds(file) |
| 421 | if error: |
| 422 | failure |= True |
| 423 | |
| 424 | if args.debug: |
| 425 | for entry in parsed_data: |
| 426 | logging.debug(entry) |
| 427 | |
| 428 | if not parsed_data: |
| 429 | logging.info(f"No Workarounds found in {file}.") |
| 430 | continue |
| 431 | |
| 432 | # 2. Check ascending order of Erratum IDs and CVE years |
| 433 | if check_ascending_order(parsed_data): |
| 434 | # 3. Print out if all is well |
| 435 | logging.info( |
| 436 | f"Workarounds matched correctly, and Errata " |
| 437 | f"IDs and CVE's are in ascending order.") |
| 438 | else: |
| 439 | logging.error( |
| 440 | f"Workarounds didn't match correctly, or Errata " |
| 441 | f"IDs and CVE's are not in ascending order.") |
| 442 | failure |= True |
| 443 | |
| 444 | if failure: |
| 445 | sys.exit(1) |
| 446 | |
| 447 | sys.exit(0) |