blob: e340c27adf51fc0bed04c0455450a02c995b55b5 [file] [log] [blame]
#!/usr/bin/env python3
""" report_parser.py:
Report parser parses openci json reports and conveys the invormation in a
one or more standard formats (To be implemented)
After all information is captured it validates the success/failure status
and can change the script exit code for integration with standard CI
executors.
"""
from __future__ import print_function
__copyright__ = """
/*
* Copyright (c) 2018-2020, Arm Limited. All rights reserved.
*
* SPDX-License-Identifier: BSD-3-Clause
*
*/
"""
__author__ = "tf-m@lists.trustedfirmware.org"
__project__ = "Trusted Firmware-M Open CI"
__version__ = "1.4.0"
import os
import re
import sys
import json
import argparse
import xmltodict
from pprint import pprint
try:
from tfm_ci_pylib.utils import load_json, get_local_git_info, \
save_json, list_subdirs, get_remote_git_info, \
convert_git_ref_path
except ImportError:
dir_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(dir_path, "../"))
from tfm_ci_pylib.utils import load_json, get_local_git_info, \
save_json, list_subdirs, get_remote_git_info, \
convert_git_ref_path
def xml_read(file):
"""" Read the contects of an xml file and convert it to python object """
data = None
try:
with open(file, "r") as F:
data = xmltodict.parse(F.read())
except Exception as E:
print("Error", E)
return data
def split_keys(joint_arg, sep="="):
""" Split two keys spread by a separator, and return them as a tuple
with whitespace removed """
keys = joint_arg.split(sep)
# Remove whitespace
keys = map(str.strip, list(keys))
# If key contains the word True/False convert it.
keys = list(map(lambda x:
eval(x.title()) if x.lower() in ["true", "false"] else x,
keys))
return keys
def dependencies_mdt_collect(path_list,
out_f=None,
known_content_types=["mbedcrypto",
"cmsis",
"checkpatch",
"fpga",
"fastmodel"],
expected_paths=["mbedcrypto",
"cmsis",
"checkpatch"]):
""" Collect dependencies checkout metadata. It creates a json report which
can be optionally exported to a file """
cpaths = {k: v for k, v in [n.split("=") for n in path_list]}
cwd = os.path.abspath(os.getcwd())
# Test that all the required paths are present
intsec_set = set(expected_paths).intersection(set(cpaths.keys()))
if len(intsec_set) != len(set(expected_paths)):
_missing = set(expected_paths).difference(intsec_set)
err_msg = "Error missing core paths.\nRequired: %s\nPresent: %s" % (
",".join(_missing), ",".join(cpaths.keys())
)
print(err_msg)
raise Exception(err_msg)
# Create a dataset for the entries of known data format
known_data = {n: {} for n in
set(known_content_types).intersection(set(cpaths.keys()))}
# Create a dataset for unexpected data entries of unknown format
extra_data = {n: {}
for n in set(cpaths.keys()).difference(set(known_data))}
for d in list_subdirs(cpaths["mbedcrypto"]):
print("mbed-crypto dir: ", d)
# if checkout directory name contains a git reference convert to short
d = convert_git_ref_path(d)
git_info = get_local_git_info(d)
tag = os.path.split(git_info["dir"])[-1].split("-")[-1]
# Absolute paths will not work in jenkins since it will change the
# workspaace directory between stages convert to relative path
git_info["dir"] = os.path.relpath(git_info["dir"], cwd)
known_data["mbedcrypto"][tag] = git_info
for d in list_subdirs(cpaths["cmsis"]):
print("CMS subdir: ", d)
d = convert_git_ref_path(d)
git_info = get_local_git_info(d)
tag = os.path.split(git_info["dir"])[-1]
# Absolute paths will not work in jenkins since it will change the
# workspaace directory between stages convert to relative path
git_info["dir"] = os.path.relpath(git_info["dir"], cwd)
known_data["cmsis"][tag] = git_info
for d in list_subdirs(cpaths["checkpatch"]):
print("Checkpatch subdir:", d)
with open(os.path.join(d, "version.info"), "r") as F:
url = F.readline().strip()
git_info = get_remote_git_info(url)
d = convert_git_ref_path(d)
git_info['dir'] = d
tag = os.path.split(git_info["dir"])[-1].split("_")[-1]
# Absolute paths will not work in jenkins since it will change the
# workspaace directory between stages convert to relative path
git_info["dir"] = os.path.relpath(git_info["dir"], cwd)
known_data["checkpatch"][tag] = git_info
if "fastmodel" in cpaths:
for d in list_subdirs(cpaths["fastmodel"]):
print("Fastmodel subdir:", d)
json_info = load_json(os.path.join(d, "version.info"))
json_info["dir"] = os.path.relpath(d, cwd)
tag = json_info["version"]
# Absolute paths will not work in jenkins since it will change the
# workspaace directory between stages convert to relative path
known_data["fastmodel"][tag] = json_info
if "fpga" in cpaths:
for d in os.listdir(cpaths["fpga"]):
print("FPGA imagefile:", d)
if ".tar.gz" in d:
name = d.split(".tar.gz")[0]
platform, subsys, ver = name.split("_")
known_data["fpga"][name] = {"platform": platform,
"subsys": subsys,
"version": ver,
"recovery": os.path.join(
cpaths["fpga"],
d)}
#Attempt to detect what the unexpected paths contain
for e_path in extra_data.keys():
for d in list_subdirs(cpaths[e_path]):
print("%s subdir: %s" % (e_path, d))
# If it contains a version.info
if os.path.isfile(os.path.join(d, "version.info")):
json_info = load_json(os.path.join(d, "version.info"))
json_info["dir"] = os.path.relpath(d, cwd)
tag = json_info["version"]
# Absolute paths will not work in jenkins since it will change
# the workspaace directory between stages convert to rel-path
extra_data[e_path][tag] = json_info
# If it contains git information
elif os.path.exists(os.path.join(d, ".git")):
d = convert_git_ref_path(d)
git_info = get_local_git_info(d)
tag = os.path.split(git_info["dir"])[-1].split("-")[-1]
# Absolute paths will not work in jenkins since it will change
# the workspaace directory between stages convert to rel-path
git_info["dir"] = os.path.relpath(git_info["dir"], cwd)
extra_data[e_path][tag] = git_info
# Do not break flow if detection fails
else:
print("Error determining contents of directory: %s/%s for "
"indexing purposes" % (e_path, d))
extra_data[e_path][tag] = {"info": "N.A"}
# Add the extra paths to the expected ones
for k, v in extra_data.items():
known_data[k] = v
if out_f:
print("Exporting metadata to", out_f)
save_json(out_f, known_data)
else:
pprint(known_data)
def cppcheck_mdt_collect(file_list, out_f=None):
""" XML parse multiple cppcheck output files and create a json report """
xml_files = list(map(os.path.abspath, file_list))
dict_data = []
version = None
for xf in xml_files:
data = xml_read(xf)
version = data["results"]["cppcheck"]["@version"]
# If nothing is found the errors dictionary will be a Nonetype object
if data["results"]["errors"] is not None:
# Use json to flatten ordered dict
str_data = json.dumps(data["results"]["errors"]["error"])
# Remove @ prefix on first char of files that cppcheck adds
str_data = str_data.replace("@", '')
# Convert to dict again(xml to json will have added an array)
_dt = json.loads(str_data)
if isinstance(_dt, list):
dict_data += _dt
# If only one error is foud it will give it as a single item
elif isinstance(_dt, dict):
dict_data += [_dt]
else:
print("Ignoring cpp entry %s of type %s" % (_dt, type(_dt)))
out_data = {"_metadata_": {"cppcheck-version": version},
"report": {}}
for E in dict_data:
sever = E.pop("severity")
# Sort it based on severity
try:
out_data["report"][sever].append(E)
except KeyError:
out_data["report"][sever] = [E]
_errors = 0
for msg_sever, msg_sever_entries in out_data["report"].items():
out_data["_metadata_"][msg_sever] = str(len(msg_sever_entries))
if msg_sever == "error":
_errors = len(msg_sever_entries)
out_data["_metadata_"]["success"] = True if not int(_errors) else False
if out_f:
save_json(out_f, out_data)
else:
pprint(out_data)
def checkpatch_mdt_collect(file_name, out_f=None):
""" Regex parse a checpatch output file and create a report """
out_data = {"_metadata_": {"errors": 0,
"warnings": 0,
"lines": 0,
"success": True},
"report": {}
}
with open(file_name, "r") as F:
cpatch_data = F.read().strip()
# checkpatch will not report anything when no issues are found
if len(cpatch_data):
stat_rex = re.compile(r'^total: (\d+) errors, '
r'(\d+) warnings, (\d+) lines',
re.MULTILINE)
line_rex = re.compile(r'([\S]+:)\s([\S]+:)\s([\S ]+)\n', re.MULTILINE)
ewl = stat_rex.search(cpatch_data)
try:
_errors, _warnings, _lines = ewl.groups()
except Exception as E:
print("Exception parsing checkpatch file.", E)
# If there is text but not in know format return -1 and fail job
_errors = _warnings = _lines = "-1"
checkpath_entries = line_rex.findall(cpatch_data)
for en in checkpath_entries:
_file, _line, _ = en[0].split(":")
try:
_type, _subtype, _ = en[1].split(":")
except Exception as e:
print("WARNING: Ignoring Malformed checkpatch line: %s" %
"".join(en))
continue
_msg = en[2]
out_data["_metadata_"] = {"errors": _errors,
"warnings": _warnings,
"lines": _lines,
"success": True if not int(_errors)
else False}
E = {"id": _subtype,
"verbose": _subtype,
"msg": _msg,
"location": {"file": _file, "line": _line}
}
try:
out_data["report"][_type.lower()].append(E)
except KeyError:
out_data["report"][_type.lower()] = [E]
if out_f:
save_json(out_f, out_data)
else:
pprint(out_data)
def jenkins_mdt_collect(out_f):
""" Collects Jenkins environment information and stores
it in a key value list """
# Jenkins environment parameters are always valid
jenkins_env_keys = ["BUILD_ID",
"BUILD_URL",
"JOB_BASE_NAME",
"GERRIT_URL",
"GERRIT_PROJECT"]
# The following Gerrit parameters only exist when
# a job is triggered by a web hook
gerrit_trigger_keys = ["GERRIT_CHANGE_NUMBER",
"GERRIT_CHANGE_SUBJECT",
"GERRIT_CHANGE_ID",
"GERRIT_PATCHSET_REVISION",
"GERRIT_PATCHSET_NUMBER",
"GERRIT_REFSPEC",
"GERRIT_CHANGE_URL",
"GERRIT_BRANCH",
"GERRIT_CHANGE_OWNER_EMAIL",
"GERRIT_PATCHSET_UPLOADER_EMAIL"]
# Find as mamny of the variables in environment
el = set(os.environ).intersection(set(jenkins_env_keys +
gerrit_trigger_keys))
# Format it in key:value pairs
out_data = {n: os.environ[n] for n in el}
if out_f:
save_json(out_f, out_data)
else:
pprint(out_data)
def metadata_collect(user_args):
""" Logic for information collection during different stages of
the build """
if user_args.dependencies_checkout and user_args.content_paths:
dependencies_mdt_collect(user_args.content_paths,
user_args.out_f)
elif user_args.git_info:
git_info = get_local_git_info(os.path.abspath(user_args.git_info))
if user_args.out_f:
save_json(user_args.out_f, git_info)
else:
pprint(git_info)
elif user_args.cppcheck_files:
cppcheck_mdt_collect(user_args.cppcheck_files, user_args.out_f)
elif user_args.checkpatch_file:
checkpatch_mdt_collect(user_args.checkpatch_file, user_args.out_f)
elif user_args.jenkins_info:
jenkins_mdt_collect(user_args.out_f)
else:
print("Invalid Metadata collection arguments")
print(user_args)
sys.exit(1)
def collate_report(key_file_list, ouput_f=None, stdout=True):
""" Join different types of json formatted reports into one """
out_data = {"_metadata_": {}, "report": {}}
for kf in key_file_list:
try:
key, fl = kf.split("=")
data = load_json(fl)
# If data is a standard reprort (metdata-report parse it)
if ("_metadata_" in data.keys() and "report" in data.keys()):
out_data["_metadata_"][key] = data["_metadata_"]
out_data["report"][key] = data["report"]
# Else treat it as a raw information passing dataset
else:
try:
out_data["info"][key] = data
except KeyError as E:
out_data["info"] = {key: data}
except Exception as E:
print("Exception parsing argument", kf, E)
continue
if ouput_f:
save_json(ouput_f, out_data)
elif stdout:
pprint(out_data)
return out_data
def filter_report(key_value_list, input_f, ouput_f):
""" Generates a subset of the data contained in
input_f, by selecting only the values defined in key_value list """
try:
rep_data = load_json(input_f)
except Exception as E:
print("Exception parsing ", input_f, E)
sys.exit(1)
out_data = {}
for kf in key_value_list:
try:
tag, value = kf.split("=")
# if multiple selection
if(",") in value:
out_data[tag] = {}
for v in value.split(","):
data = rep_data[tag][v]
out_data[tag][v] = data
else:
data = rep_data[tag][value]
out_data[tag] = {value: data}
except Exception as E:
print("Could not extract data-set for k: %s v: %s" % (tag, value))
print(E)
continue
if ouput_f:
save_json(ouput_f, out_data)
else:
pprint(out_data)
def parse_report(user_args):
""" Parse a report and attempt to determine if it is overall successful or
not. It will set the script's exit code accordingly """
# Parse Mode
in_rep = load_json(user_args.report)
report_eval = None
# Extract the required condition for evaluation to pass
pass_key, pass_val = split_keys(user_args.set_pass)
print("Evaluation will succeed if \"%s\" is \"%s\"" % (pass_key,
pass_val))
try:
report_eval = in_rep["_metadata_"][pass_key] == pass_val
print("Evaluating detected '%s' field in _metaddata_. " % pass_key)
except Exception as E:
pass
if report_eval is None:
if isinstance(in_rep, dict):
# If report contains an overall success field in metadata do not
# parse the items
in_rep = in_rep["report"]
ev_list = in_rep.values()
elif isinstance(in_rep, list):
ev_list = in_rep
else:
print("Invalid data type: %s" % type(in_rep))
return
if user_args.onepass:
try:
report_eval = in_rep[user_args.onepass][pass_key] == pass_val
except Exception as e:
report_eval = False
# If every single field needs to be succesfful, invert the check and
# look for those who are not
elif user_args.allpass:
try:
if list(filter(lambda x: x[pass_key] != pass_val, ev_list)):
pass
else:
report_eval = True
except Exception as e:
print(e)
report_eval = False
else:
print("Evaluation condition not set. Please use -a or -o. Launch"
"help (-h) for more information")
print("Evaluation %s" % ("passed" if report_eval else "failed"))
if user_args.eif:
print("Setting script exit status")
sys.exit(0 if report_eval else 1)
def main(user_args):
""" Main logic """
# Metadata Collect Mode
if user_args.collect:
metadata_collect(user_args)
return
elif user_args.filter_report:
filter_report(user_args.filter_report,
user_args.report,
user_args.out_f)
elif user_args.collate_report:
collate_report(user_args.collate_report, user_args.out_f)
else:
parse_report(user_args)
def get_cmd_args():
""" Parse command line arguments """
# Parse command line arguments to override config
parser = argparse.ArgumentParser(description="TFM Report Parser.")
parser.add_argument("-e", "--error_if_failed",
dest="eif",
action="store_true",
help="If set will change the script exit code")
parser.add_argument("-s", "--set-success-field",
dest="set_pass",
default="status = Success",
action="store",
help="Set the key which the script will use to"
"assert success/failure")
parser.add_argument("-a", "--all-fields-must-pass",
dest="allpass",
action="store_true",
help="When set and a list is provided, all entries"
"must be successful for evaluation to pass")
parser.add_argument("-o", "--one-field-must-pass",
dest="onepass",
action="store",
help="Only the user defined field must pass")
parser.add_argument("-r", "--report",
dest="report",
action="store",
help="JSON file containing input report")
parser.add_argument("-c", "--collect",
dest="collect",
action="store_true",
help="When set, the parser will attempt to collect"
"information and produce a report")
parser.add_argument("-d", "--dependencies-checkout",
dest="dependencies_checkout",
action="store_true",
help="Collect information from a dependencies "
"checkout job")
parser.add_argument("-f", "--output-file",
dest="out_f",
action="store",
help="Output file to store captured information")
parser.add_argument('-p', '--content-paths',
dest="content_paths",
nargs='*',
help=("Pass a space separated list of paths in the"
"following format: -p mbedtls=/yourpath/"
"fpv=/another/path .Used in conjunction with -n"))
parser.add_argument("-g", "--git-info",
dest="git_info",
action="store",
help="Extract git information from given path. "
"Requires --colect directive. Optional parameter"
"--output-file ")
parser.add_argument("-x", "--cpp-check-xml",
dest="cppcheck_files",
nargs='*',
action="store",
help="Extract cppcheck static analysis information "
" output files, provided as a space separated "
"list. Requires --colect directive."
" Optional parameter --output-file ")
parser.add_argument("-z", "--checkpatch-parse-f",
dest="checkpatch_file",
action="store",
help="Extract checkpatch static analysis information "
" output file. Requires --colect directive."
" Optional parameter --output-file ")
parser.add_argument("-j", "--jenkins-info",
dest="jenkins_info",
action="store_true",
help="Extract jenkings and gerrit trigger environment "
"information fr. Requires --colect directive."
" Optional parameter --output-file ")
parser.add_argument("-l", "--collate-report",
dest="collate_report",
action="store",
nargs='*',
help="Pass a space separated list of key-value pairs"
"following format: -l report_key_0=report_file_0"
" report_key_1=report_file_1. Collate will "
"generate a joint dataset and print it to stdout."
"Optional parameter --output-file ")
parser.add_argument("-t", "--filter-report",
dest="filter_report",
action="store",
nargs='*',
help="Requires --report parameter for input file."
"Pass a space separated list of key-value pairs"
"following format: -l report_key_0=value_0"
" report_key_1=value_0. Filter will remote all"
"entries of the original report but the ones"
"matching the key:value pairs defined and print it"
"to stdout.Optional parameter --output-file")
return parser.parse_args()
if __name__ == "__main__":
main(get_cmd_args())