| #!/usr/bin/env python3 |
| # |
| # Copyright (c) 2025, Arm Limited. All rights reserved. |
| # |
| # SPDX-License-Identifier: BSD-3-Clause |
| # |
| |
| import argparse |
| import os |
| import re |
| import subprocess |
| import sys |
| import logging |
| from pathlib import Path |
| |
| |
| def subprocess_run(cmd, **kwargs): |
| logging.debug("Running command: %r %r", cmd, kwargs) |
| return subprocess.run(cmd, **kwargs) |
| |
| |
| def parse_workarounds(filepath: str): |
| """ |
| Parse the file line by line. For every start marker ('workaround_reset_start' |
| or 'workaround_runtime_start'), we look for its matching end marker |
| ('workaround_reset_end' or 'workaround_runtime_end'). |
| |
| If a start is missing its end, or if we find an end with no corresponding |
| start, set error value to True which is to be returned as a tuple along with |
| the list of dictionaries. |
| |
| Returns: |
| A list of dictionaries. Each dictionary has: |
| - start_line: line number of the workaround start |
| - end_line: line number of the matching workaround end |
| - marker_type: 'reset' or 'runtime' |
| - erratum_number: integer if it's an ERRATUM (from ERRATUM(X)), else None |
| - cve_year: integer if it's a CVE, else None |
| - cve_number: integer if it's a CVE, else None |
| Error value set to True if we fail to match workaround start to an end. |
| """ |
| |
| # Read all lines in memory |
| with open(filepath, "r") as f: |
| lines = f.readlines() |
| |
| # We'll keep a stack of active "starts" that haven't yet found their "end" |
| start_stack = [] |
| results = [] |
| error = False |
| |
| # Regex patterns for capturing ERRATUM and CVE |
| # Example: ERRATUM(123) or CVE-2022-789 |
| erratum_pattern = re.compile(r"ERRATUM\s*\(\s*(\d+)\s*\)", re.IGNORECASE) |
| cve_pattern = re.compile(r"CVE[-_:]?(\d{4})[-_:]?(\d+)", re.IGNORECASE) |
| |
| for i, line in enumerate(lines, start=1): |
| stripped = line.strip() |
| |
| # ---------------------------------------------------------------------- |
| # 1) Check for "start" markers |
| # We look first for 'workaround_reset_start' or 'workaround_runtime_start' |
| # ---------------------------------------------------------------------- |
| if "workaround_reset_start" in stripped: |
| marker_type = "reset" |
| # Attempt to extract ERRATUM or CVE |
| erratum_match = erratum_pattern.search(stripped) |
| cve_match = cve_pattern.search(stripped) |
| |
| if erratum_match: |
| erratum_number = int(erratum_match.group(1)) |
| cve_year, cve_number = None, None |
| elif cve_match: |
| erratum_number = None |
| cve_year = int(cve_match.group(1)) |
| cve_number = int(cve_match.group(2)) |
| else: |
| error |= True |
| logging.error( |
| f"Couldn't find a valid Errata number or CVE year " |
| f"in marker type {marker_type} in line number {i}" |
| ) |
| return results, error |
| |
| # Push onto the stack |
| start_stack.append({ |
| "start_line": i, |
| "marker_type": marker_type, # 'reset' |
| "erratum_number": erratum_number, |
| "cve_year": cve_year, |
| "cve_number": cve_number |
| }) |
| |
| elif "workaround_runtime_start" in stripped: |
| marker_type = "runtime" |
| # Attempt to extract ERRATUM or CVE |
| erratum_match = erratum_pattern.search(stripped) |
| cve_match = cve_pattern.search(stripped) |
| |
| if erratum_match: |
| erratum_number = int(erratum_match.group(1)) |
| cve_year, cve_number = None, None |
| elif cve_match: |
| erratum_number = None |
| cve_year = int(cve_match.group(1)) |
| cve_number = int(cve_match.group(2)) |
| else: |
| error |= True |
| logging.error( |
| f"Couldn't find a valid Errata number or CVE year " |
| f"in marker type {marker_type} in line number {i}" |
| ) |
| return results, error |
| |
| # Push onto the stack |
| start_stack.append({ |
| "start_line": i, |
| "marker_type": marker_type, # 'runtime' |
| "erratum_number": erratum_number, |
| "cve_year": cve_year, |
| "cve_number": cve_number |
| }) |
| |
| # ---------------------------------------------------------------------- |
| # 2) Check for "end" markers |
| # We look for 'workaround_reset_end' or 'workaround_runtime_end' |
| # ---------------------------------------------------------------------- |
| elif "workaround_reset_end" in stripped: |
| # Attempt to pop the most recent start |
| if not start_stack: |
| logging.error( |
| f"[Line {i}] Found 'workaround_reset_end' " |
| f"without matching 'workaround_reset_start'." |
| ) |
| error |= True |
| break |
| |
| # Pop the most recent start |
| last_item = start_stack.pop() |
| |
| # Check the marker type |
| if last_item["marker_type"] != "reset": |
| error = True |
| logging.error( |
| f"[Line {i}] Found 'workaround_reset_end' " |
| f"that does not match " |
| f"the most recent '{last_item['marker_type']}' " |
| f"start at line {last_item['start_line']}." |
| ) |
| error |= True |
| break |
| |
| last_item["end_line"] = i |
| results.append(last_item) |
| |
| elif "workaround_runtime_end" in stripped: |
| # We need a matching "runtime" start |
| if not start_stack: |
| logging.error( |
| f"[Line {i}] Found 'workaround_runtime_end' " |
| f"without matching start." |
| ) |
| error |= True |
| break |
| |
| # Pop the most recent start |
| last_item = start_stack.pop() |
| |
| # Check the marker type |
| if last_item["marker_type"] != "runtime": |
| logging.error( |
| f"[Line {i}] Found 'workaround_runtime_end' " |
| f"that does not match " |
| f"the most recent '{last_item['marker_type']}' " |
| f"start at line {last_item['start_line']}." |
| ) |
| error |= True |
| break |
| |
| last_item["end_line"] = i |
| results.append(last_item) |
| |
| # ---------------------------------------------------------------------- |
| # After processing all lines, if the stack is not empty, it means some |
| # starts have no matching ends |
| # ---------------------------------------------------------------------- |
| if start_stack: |
| first_unmatched = start_stack[0] |
| logging.error( |
| f"'workaround_{first_unmatched[1]}_start' " |
| f"at line {first_unmatched[0]} " |
| f"did not have a matching end marker." |
| ) |
| |
| return results, error |
| |
| |
| def check_ascending_order(data): |
| """ |
| Ensures that: |
| 1) All ERRATUM blocks appear first (in ascending order of their erratum_number), |
| 2) Then all CVE blocks appear (in ascending order of their cve_year and if the |
| year is the same, ascending by cve_number as well). |
| |
| Returns: |
| False, If an ERRATUM appears after a CVE has started, or if the ordering within |
| ERRATUMs or CVEs is incorrect, else returns True. |
| """ |
| |
| # Sort everything by the line number where the workaround starts |
| data_sorted = sorted(data, key=lambda x: x["start_line"]) |
| |
| # We'll gather ERRATUM items first, in the order they appear, |
| # then CVE items. If we ever see an ERRATUM after we've started |
| # collecting CVEs, we'll raise an error. |
| found_cve = False |
| errata_list = [] |
| cve_list = [] |
| |
| for item in data_sorted: |
| # Is this entry an ERRATUM or a CVE? |
| if item["erratum_number"] is not None: # This is an ERRATUM |
| if found_cve: |
| # We already encountered a CVE, so no more ERRATUMs allowed |
| logging.error( |
| f"ERRATUM({item['erratum_number']}) found " |
| f"at line {item['start_line']} " |
| f"after the first CVE has already appeared." |
| ) |
| return False |
| errata_list.append(item) |
| elif item["cve_year"] is not None: # This is a CVE |
| found_cve = True |
| cve_list.append(item) |
| else: |
| # If neither erratum_number nor cve_year is present |
| # return False to fail the check. |
| logging.error( |
| f"ERRATUM or CVE year not found at " |
| f"line {item['start_line']}" |
| ) |
| return False |
| |
| # ------------------------------------------------------------- |
| # 1) Check ascending order of ERRATUM IDs |
| # ------------------------------------------------------------- |
| prev_erratum = 0 |
| for erratum_item in errata_list: |
| eno = erratum_item["erratum_number"] |
| if prev_erratum and eno < prev_erratum: |
| logging.error( |
| f"ERRATUM IDs are not in ascending order! " |
| f"Found ERRATUM({eno}) " |
| f"after ERRATUM({prev_erratum})." |
| ) |
| return False |
| prev_erratum = eno |
| |
| # ------------------------------------------------------------- |
| # 2) Check CVE year (and then CVE number) are ascending |
| # ------------------------------------------------------------- |
| prev_cve_year = 0 |
| prev_cve_number = 0 |
| for cve_item in cve_list: |
| year = cve_item["cve_year"] |
| num = cve_item["cve_number"] |
| |
| if prev_cve_year and year < prev_cve_year: |
| logging.error( |
| f"CVE years are not in ascending order! " |
| f"Found CVE({year},...) " |
| f"after CVE({prev_cve_year},...)." |
| ) |
| return False |
| elif year == prev_cve_year: |
| # Years match, so check if this CVE number < previous CVE number |
| if num < prev_cve_number: |
| logging.error( |
| f"CVE Numbers are not in ascending order! " |
| f"Found CVE({year, num} ,...) " |
| f"after CVE({prev_cve_year, prev_cve_number},...)." |
| ) |
| return False |
| |
| # Update previous references |
| prev_cve_year = year |
| prev_cve_number = num |
| |
| # If we reach here, then the ordering is correct return True. |
| return True |
| |
| |
| def patch_has_cpu_files(base_commit, end_commit): |
| """Get the output of a git diff and analyse each modified file.""" |
| |
| # Get patches of the affected commits with one line of context. |
| gitdiff = subprocess_run( |
| [ |
| "git", |
| "diff", |
| "--name-only", |
| base_commit + ".." + end_commit, |
| "lib/cpus/aarch64/" |
| ], |
| stdout=subprocess.PIPE, |
| ) |
| |
| if gitdiff.returncode != 0: |
| return False |
| |
| cpu_files_modified = gitdiff.stdout.decode("utf-8").splitlines() |
| return cpu_files_modified |
| |
| |
| def list_files_in_directory(dir_path): |
| """ |
| Returns a list of files in the specified directory. |
| Args: |
| dir_path: The path to the directory. |
| |
| Returns: |
| A list of file names in the directory. |
| """ |
| try: |
| files = [ |
| os.path.join(dir_path, f) for f in os.listdir(dir_path) |
| if os.path.isfile(os.path.join(dir_path, f)) |
| ] |
| return files |
| except FileNotFoundError: |
| return f"Directory not found: {dir_path}" |
| except NotADirectoryError: |
| return f"Not a directory: {dir_path}" |
| except Exception as e: |
| return f"An error occurred: {e}" |
| |
| |
| def parse_cmd_line(argv, prog_name): |
| parser = argparse.ArgumentParser( |
| prog=prog_name, |
| formatter_class=argparse.RawTextHelpFormatter, |
| description="Check alphabetical order of #includes", |
| epilog=""" |
| For each source file in the tree, checks that #include's C preprocessor |
| directives are ordered alphabetically (as mandated by the Trusted |
| Firmware coding style). System header includes must come before user |
| header includes. |
| """, |
| ) |
| |
| parser.add_argument( |
| "--tree", |
| "-t", |
| help="Path to the source tree to check (default: %(default)s)", |
| default=os.curdir, |
| ) |
| parser.add_argument( |
| "--patch", |
| "-p", |
| help=""" |
| Patch mode. |
| Instead of checking all files in the source tree, the script will consider |
| only files that are modified by the latest patch(es).""", |
| action="store_true", |
| ) |
| parser.add_argument( |
| "--from-ref", |
| help="Base commit in patch mode (default: %(default)s)", |
| default="master", |
| ) |
| parser.add_argument( |
| "--to-ref", |
| help="Final commit in patch mode (default: %(default)s)", |
| default="HEAD", |
| ) |
| parser.add_argument( |
| "--debug", |
| help="Enable debug logging", |
| action="store_true", |
| ) |
| |
| args = parser.parse_args(argv) |
| return args |
| |
| |
| if __name__ == "__main__": |
| args = parse_cmd_line(sys.argv[1:], sys.argv[0]) |
| |
| if args.debug: |
| logging.basicConfig(level=logging.DEBUG) |
| else: |
| logging.basicConfig(level=logging.INFO) |
| |
| os.chdir(args.tree) |
| |
| if args.patch: |
| logging.info( |
| "Checking CPU files modified between patches " |
| + args.from_ref |
| + " and " |
| + args.to_ref |
| + " ..." |
| ) |
| list_cpu_files = patch_has_cpu_files(args.from_ref, args.to_ref) |
| if not list_cpu_files: |
| logging.info(f"No CPU files Modified") |
| sys.exit(0) |
| else: |
| dir_path = "lib/cpus/aarch64/" |
| logging.info(f"Checking all CPU files in directory `{dir_path}`") |
| list_cpu_files = list_files_in_directory(dir_path) |
| if not list_cpu_files: |
| logging.error(f"`lib/cpus/aarch64/` directory is empty") |
| sys.exit(1) |
| |
| failure = False |
| for file in list_cpu_files: |
| logging.info(f"Checking File {file} .....") |
| # 1. Parse the file for workaround blocks |
| parsed_data, error = parse_workarounds(file) |
| if error: |
| failure |= True |
| |
| if args.debug: |
| for entry in parsed_data: |
| logging.debug(entry) |
| |
| if not parsed_data: |
| logging.info(f"No Workarounds found in {file}.") |
| continue |
| |
| # 2. Check ascending order of Erratum IDs and CVE years |
| if check_ascending_order(parsed_data): |
| # 3. Print out if all is well |
| logging.info( |
| f"Workarounds matched correctly, and Errata " |
| f"IDs and CVE's are in ascending order.") |
| else: |
| logging.error( |
| f"Workarounds didn't match correctly, or Errata " |
| f"IDs and CVE's are not in ascending order.") |
| failure |= True |
| |
| if failure: |
| sys.exit(1) |
| |
| sys.exit(0) |