blob: 63fd71974ae0b187dc28f30f6f8f4b958467a4fb [file] [log] [blame]
Arthur Shed8cd8db2024-03-11 09:54:24 -07001#!/usr/bin/env python3
2#
3# Copyright (c) 2022 Google LLC. All rights reserved.
4#
5# SPDX-License-Identifier: BSD-3-Clause
6
7# quick hacky script to check patches if they are candidates for lts. it checks
8# only the non-merge commits.
9
10import os
11import git
12import re
13import sys
14import csv
15import argparse
16import json
17import subprocess
18from io import StringIO
19from unidiff import PatchSet
20from config import MESSAGE_TOKENS, CPU_PATH_TOKEN, CPU_ERRATA_TOKEN, DOC_PATH_TOKEN, DOC_ERRATA_TOKEN
21
22global_debug = False
23def debug_print(*args, **kwargs):
24 global global_var
25 if global_debug:
26 print(*args, **kwargs)
27
28def contains_re(pf, tok):
29 for hnk in pf:
30 for ln in hnk:
31 if ln.is_context:
32 continue
33 # here means the line is either added or removed
34 txt = ln.value.strip()
35 if tok.search(txt) is not None:
36 return True
37
38 return False
39
40def process_ps(ps):
41 score = 0
42
43 cpu_tok = re.compile(CPU_PATH_TOKEN)
44 doc_tok = re.compile(DOC_PATH_TOKEN)
45
46 for pf in ps:
47 if pf.is_binary_file or not pf.is_modified_file:
48 continue
49 if cpu_tok.search(pf.path) is not None:
50 debug_print("* change found in cpu path:", pf.path);
51 cpu_tok = re.compile(CPU_ERRATA_TOKEN)
52 if contains_re(pf, cpu_tok):
53 score = score + 1
54 debug_print(" found", CPU_ERRATA_TOKEN)
55
56 if doc_tok.search(pf.path) is not None:
57 debug_print("* change found in macros doc path:", pf.path);
58 doc_tok = re.compile(DOC_ERRATA_TOKEN)
59 if contains_re(pf, doc_tok):
60 score = score + 1
61 debug_print(" found", DOC_ERRATA_TOKEN)
62
63 return score
64
65def query_gerrit(gerrit_user, ssh_key_path, change_id):
66 ssh_command = [
67 "ssh",
68 "-o", "UserKnownHostsFile=/dev/null",
69 "-o", "StrictHostKeyChecking=no",
70 "-o", "PubkeyAcceptedKeyTypes=+ssh-rsa",
71 "-p", "29418",
72 "-i", ssh_key_path,
73 f"{gerrit_user}@review.trustedfirmware.org",
74 f"gerrit query --format=JSON change:'{change_id}'",
75 "repo:'TF-A/trusted-firmware-a'"
76 ]
77
78 try:
79 result = subprocess.run(ssh_command, capture_output=True, text=True, check=True)
80 output = result.stdout.strip().split("\n")
81 changes = [json.loads(line) for line in output if line.strip()]
82 # Create a dictionary with branch as key and URL as value
83 branches_urls = {change["branch"]: change["url"] for change in changes if "branch" in change and "url" in change}
84 return branches_urls
85
86 except subprocess.CalledProcessError as e:
87 print("Error executing SSH command:", e)
88 return {}
89
90# REBASE_DEPTH is number of commits from tip of the LTS branch that we need
91# to check to find the commit that the current patch set is based on
92REBASE_DEPTH = 20
93
94
95## TODO: for case like 921081049ec3 where we need to refactor first for security
96# patch to be applied then we should:
97# 1. find the security patch
98# 2. from that patch find CVE number if any
99# 3. look for all patches that contain that CVE number in commit message
100
101## TODO: similar to errata macros and rst file additions, we have CVE macros and rst file
102# additions. so we can use similar logic for that.
103
104## TODO: for security we should look for CVE numbed regex match and if found flag it
105def main():
106 parser = argparse.ArgumentParser(prog="lts-triage.py", description="check patches for LTS candidacy")
107 parser.add_argument("--repo", required=True, help="path to tf-a git repo")
108 parser.add_argument("--csv_path", required=True, help="path including the filename for CSV file")
109 parser.add_argument("--lts", required=True, help="LTS branch, ex. lts-v2.8")
110 parser.add_argument("--gerrit_user", required=True, help="The user id to perform the Gerrit query")
111 parser.add_argument("--ssh_keyfile", required=True, help="The SSH keyfile")
112 parser.add_argument("--debug", help="print debug logs", action="store_true")
113
114 args = parser.parse_args()
115 lts_branch = args.lts
116 gerrit_user = args.gerrit_user
117 ssh_keyfile = args.ssh_keyfile
118 global global_debug
119 global_debug = args.debug
120
121 csv_columns = ["index", "commit id in the integration branch", "commit summary",
122 "score", "Gerrit Change-Id", "patch link for the LTS branch",
123 "patch link for the integration branch"]
124 csv_data = []
125 idx = 1
126
127 repo = git.Repo(args.repo)
128
129 # collect the LTS hashes in a list
130 lts_change_ids = set() # Set to store Gerrit Change-Ids from the LTS branch
131
132 for cmt in repo.iter_commits(lts_branch):
133 # Extract Gerrit Change-Id from the commit message
134 change_id_match = re.search(r'Change-Id:\s*(\w+)', cmt.message)
135 if change_id_match:
136 lts_change_ids.add(change_id_match.group(1))
137
138 if len(lts_change_ids) >= REBASE_DEPTH:
139 break
140
141 for cmt in repo.iter_commits('integration'):
142 score = 0
143
144 # if we find a same Change-Id among the ones we collected from the LTS branch
145 # then we have seen all the new patches in the integration branch, so we should exit.
146 change_id_match = re.search(r'Change-Id:\s*(\w+)', cmt.message)
147 if change_id_match:
148 change_id = change_id_match.group(1)
149 if change_id in lts_change_ids:
150 print("## stopping because found common Gerrit Change-Id between the two branches: ", change_id)
151 break;
152
153 # don't process merge commits
154 if len(cmt.parents) > 1:
155 continue
156
157 tok = re.compile(MESSAGE_TOKENS, re.IGNORECASE)
158 if tok.search(cmt.message) is not None:
159 debug_print("## commit message match")
160 score = score + 1
161
162 diff_text = repo.git.diff(cmt.hexsha + "~1", cmt.hexsha, ignore_blank_lines=True, ignore_space_at_eol=True)
163 ps = PatchSet(StringIO(diff_text))
164 debug_print("# score before process_ps:", score)
165 score = score + process_ps(ps)
166 debug_print("# score after process_ps:", score)
167
168 ln = f"{cmt.summary}: {score}"
169 print(ln)
170
171 if score > 0:
172 gerrit_links = query_gerrit(gerrit_user, ssh_keyfile, change_id)
173 # Append data to CSV
174 csv_data.append({
175 "index": idx,
176 "commit id in the integration branch": cmt.hexsha,
177 "commit summary": cmt.summary,
178 "score": score,
179 "Gerrit Change-Id": change_id,
180 "patch link for the LTS branch": gerrit_links.get(lts_branch, "N/A"),
181 "patch link for the integration branch": gerrit_links.get("integration", "N/A")
182 })
183 idx += 1
184 at_least_one_match = True
185
186 if at_least_one_match == True:
187 try:
188 with open(args.csv_path, "w", newline='') as csvfile:
189 writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
190 writer.writeheader()
191 for data in csv_data:
192 writer.writerow(data)
193 except:
194 print("\n\nERROR: Couldn't open CSV file due to error: ", sys.exc_info()[0])
195
196if __name__ == '__main__':
197 main()