blob: a920ca2085d9508a09682623f5e1adc8a19f2e9a [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019-2020 Arm Limited. All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#
#
# This script is used to walk a job tree, primarily to identify sub-jobs
# triggered by a top-level job.
#
# The script works by scraping console output of jobs, starting from the
# top-level one, sniffing for patterns indicative of sub-jobs, and following the
# trail.
import argparse
import contextlib
import re
import sys
import urllib.request
# Sub-job patters. All of them capture job name (j) and build number (b).
_SUBJOB_PATTERNS = (
# Usualy seen on freestyle jobs
re.compile(r"(?P<j>[-a-z_]+) #(?P<b>[0-9]+) completed. Result was (?P<s>[A-Z]+)",
re.IGNORECASE),
# Usualy seen on multi-phase jobs
re.compile(r"Finished Build : #(?P<b>[0-9]+) of Job : (?P<j>[-a-z_]+) with status : (?P<s>[A-Z]+)",
re.IGNORECASE)
)
# Generator that yields lines on a job console as strings
def _console_lines(console_url):
with urllib.request.urlopen(console_url) as console_fd:
for line in filter(None, console_fd):
# Console might have special characters. Yield an empty line in that case.
try:
yield line.decode().rstrip("\n")
except UnicodeDecodeError as e:
# In case of decode error, return up until the character that
# caused the error
yield line[:e.start].decode().rstrip("\n")
# Class representing Jenkins job
class JobInstance:
def __init__(self, url, status=None):
self.sub_jobs = []
self.url = url
self.name = None
self.build_number = None
self.config = None
self.status = status
self.depth = 0
# Representation for debugging
def __repr__(self):
return "{}#{}".format(self.name, self.build_number)
# Scrape job's console to identify sub jobs, and recurseively parse them.
def parse(self, *, depth=0):
url_fields = self.url.rstrip("/").split("/")
# Identify job name and number from the URL
try:
stem_url_list = url_fields[:-3]
self.name, self.build_number = url_fields[-2:]
if self.build_number not in ("lastBuild", "lastSuccessfulBuild"):
int(self.build_number)
except:
raise Exception(self.url + " is not a valid Jenkins build URL.")
self.depth = depth
# Scrape the job's console
console_url = "/".join(url_fields + ["consoleText"])
try:
for line in _console_lines(console_url):
# A job that prints CONFIGURATION is where we'd find the build
# artefacts
fields = line.split()
if len(fields) == 2 and fields[0] == "CONFIGURATION:":
self.config = fields[1]
return
# Look for sub job pattern, and recurse into the sub-job
child_matches = filter(None, map(lambda p: p.match(line),
_SUBJOB_PATTERNS))
for match in child_matches:
child = JobInstance("/".join(stem_url_list +
["job", match.group("j"), match.group("b")]),
match.group("s"))
child.parse(depth=depth+1)
self.sub_jobs.append(child)
except urllib.error.HTTPError:
print(console_url + " is not accessible.", file=sys.stderr)
# Generator that yields individual jobs in the hierarchy
def walk(self, *, sort=False):
if not self.sub_jobs:
yield self
else:
descendants = self.sub_jobs
if sort:
descendants = sorted(self.sub_jobs, key=lambda j: j.build_number)
for child in descendants:
yield from child.walk(sort=sort)
# Print one job
def print(self):
config_str = "[" + self.config + "]" if self.config else ""
status = self.status if self.status else ""
print("{}{} #{} {} {}".format(" " * 2 * self.depth, self.name,
self.build_number, status, config_str))
# Print the whole hierarchy
def print_tree(self, *, sort=False):
self.print()
if not self.sub_jobs:
return
descendants = self.sub_jobs
if sort:
descendants = sorted(self.sub_jobs, key=lambda j: j.build_number)
for child in descendants:
child.print_tree(sort=sort)
@contextlib.contextmanager
def open_artefact(self, path, *, text=False):
# Wrapper class that offer string reads from a byte descriptor
class TextStream:
def __init__(self, byte_fd):
self.byte_fd = byte_fd
def read(self, sz=None):
return self.byte_fd.read(sz).decode("utf-8")
art_url = "/".join([self.url, "artifact", path])
with urllib.request.urlopen(art_url) as fd:
yield TextStream(fd) if text else fd
# When invoked from command line, print the whole tree
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("build_url",
help="URL to specific build number to walk")
parser.add_argument("--unique-tf-configs", default=False,
action="store_const", const=True, help="Print unique TF configs")
opts = parser.parse_args()
top = JobInstance(opts.build_url)
top.parse()
if opts.unique_tf_configs:
unique_configs = set()
# Extract the base TF config name from the job's config, which contains
# group, TFTF configs etc.
for job in filter(lambda j: j.config, top.walk()):
unique_configs.add(job.config.split("/")[1].split(":")[0].split(",")[0])
for config in sorted(unique_configs):
print(config)
else:
top.print_tree()