blob: 92a8074adc1f504e11d2b254e9cb5bd026af047b [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2022 Google LLC. All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
# quick hacky script to check patches if they are candidates for lts. it checks
# only the non-merge commits.
import os
import git
import re
import sys
import csv
import argparse
import json
import subprocess
from io import StringIO
from unidiff import PatchSet
from config import MESSAGE_TOKENS, CPU_PATH_TOKEN, CPU_ERRATA_TOKEN, DOC_PATH_TOKEN, DOC_ERRATA_TOKEN
global_debug = False
def debug_print(*args, **kwargs):
global global_var
if global_debug:
print(*args, **kwargs)
def contains_re(pf, tok):
for hnk in pf:
for ln in hnk:
if ln.is_context:
continue
# here means the line is either added or removed
txt = ln.value.strip()
if tok.search(txt) is not None:
return True
return False
def process_ps(ps):
score = 0
cpu_tok = re.compile(CPU_PATH_TOKEN)
doc_tok = re.compile(DOC_PATH_TOKEN)
for pf in ps:
if pf.is_binary_file or not pf.is_modified_file:
continue
if cpu_tok.search(pf.path) is not None:
debug_print("* change found in cpu path:", pf.path);
cpu_tok = re.compile(CPU_ERRATA_TOKEN)
if contains_re(pf, cpu_tok):
score = score + 1
debug_print(" found", CPU_ERRATA_TOKEN)
if doc_tok.search(pf.path) is not None:
debug_print("* change found in macros doc path:", pf.path);
doc_tok = re.compile(DOC_ERRATA_TOKEN)
if contains_re(pf, doc_tok):
score = score + 1
debug_print(" found", DOC_ERRATA_TOKEN)
return score
def query_gerrit(gerrit_user, ssh_key_path, change_id):
ssh_command = [
"ssh",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "StrictHostKeyChecking=no",
"-o", "PubkeyAcceptedKeyTypes=+ssh-rsa",
"-p", "29418",
"-i", ssh_key_path,
f"{gerrit_user}@review.trustedfirmware.org",
f"gerrit query --format=JSON change:'{change_id}'",
"repo:'TF-A/trusted-firmware-a'"
]
try:
result = subprocess.run(ssh_command, capture_output=True, text=True, check=True)
output = result.stdout.strip().split("\n")
changes = [json.loads(line) for line in output if line.strip()]
# Create a dictionary with branch as key and URL as value
branches_urls = {change["branch"]: change["url"] for change in changes if "branch" in change and "url" in change}
return branches_urls
except subprocess.CalledProcessError as e:
print("Error executing SSH command:", e)
return {}
# REBASE_DEPTH is number of commits from tip of the LTS branch that we need
# to check to find the commit that the current patch set is based on
REBASE_DEPTH = 20
## TODO: for case like 921081049ec3 where we need to refactor first for security
# patch to be applied then we should:
# 1. find the security patch
# 2. from that patch find CVE number if any
# 3. look for all patches that contain that CVE number in commit message
## TODO: similar to errata macros and rst file additions, we have CVE macros and rst file
# additions. so we can use similar logic for that.
## TODO: for security we should look for CVE numbed regex match and if found flag it
def main():
at_least_one_match = False
parser = argparse.ArgumentParser(prog="lts-triage.py", description="check patches for LTS candidacy")
parser.add_argument("--repo", required=True, help="path to tf-a git repo")
parser.add_argument("--csv_path", required=True, help="path including the filename for CSV file")
parser.add_argument("--lts", required=True, help="LTS branch, ex. lts-v2.8")
parser.add_argument("--gerrit_user", required=True, help="The user id to perform the Gerrit query")
parser.add_argument("--ssh_keyfile", required=True, help="The SSH keyfile")
parser.add_argument("--debug", help="print debug logs", action="store_true")
args = parser.parse_args()
lts_branch = args.lts
gerrit_user = args.gerrit_user
ssh_keyfile = args.ssh_keyfile
global global_debug
global_debug = args.debug
csv_columns = ["index", "commit id in the integration branch", "commit summary",
"score", "Gerrit Change-Id", "patch link for the LTS branch",
"patch link for the integration branch"]
csv_data = []
idx = 1
repo = git.Repo(args.repo)
# collect the LTS hashes in a list
lts_change_ids = set() # Set to store Gerrit Change-Ids from the LTS branch
for cmt in repo.iter_commits(lts_branch):
# Extract Gerrit Change-Id from the commit message
change_id_match = re.search(r'Change-Id:\s*(\w+)', cmt.message)
if change_id_match:
lts_change_ids.add(change_id_match.group(1))
if len(lts_change_ids) >= REBASE_DEPTH:
break
for cmt in repo.iter_commits('integration'):
score = 0
# if we find a same Change-Id among the ones we collected from the LTS branch
# then we have seen all the new patches in the integration branch, so we should exit.
change_id_match = re.search(r'Change-Id:\s*(\w+)', cmt.message)
if change_id_match:
change_id = change_id_match.group(1)
if change_id in lts_change_ids:
print("## stopping because found common Gerrit Change-Id between the two branches: ", change_id)
break;
# don't process merge commits
if len(cmt.parents) > 1:
continue
tok = re.compile(MESSAGE_TOKENS, re.IGNORECASE)
if tok.search(cmt.message) is not None:
debug_print("## commit message match")
score = score + 1
diff_text = repo.git.diff(cmt.hexsha + "~1", cmt.hexsha, ignore_blank_lines=True, ignore_space_at_eol=True)
ps = PatchSet(StringIO(diff_text))
debug_print("# score before process_ps:", score)
score = score + process_ps(ps)
debug_print("# score after process_ps:", score)
ln = f"{cmt.summary}: {score}"
print(ln)
if score > 0:
gerrit_links = query_gerrit(gerrit_user, ssh_keyfile, change_id)
# Append data to CSV
csv_data.append({
"index": idx,
"commit id in the integration branch": cmt.hexsha,
"commit summary": cmt.summary,
"score": score,
"Gerrit Change-Id": change_id,
"patch link for the LTS branch": gerrit_links.get(lts_branch, "N/A"),
"patch link for the integration branch": gerrit_links.get("integration", "N/A")
})
idx += 1
at_least_one_match = True
if at_least_one_match == True:
try:
with open(args.csv_path, "w", newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for data in csv_data:
writer.writerow(data)
except:
print("\n\nERROR: Couldn't open CSV file due to error: ", sys.exc_info()[0])
if __name__ == '__main__':
main()