blob: 227d047bac0e47b7557760c387df7b7d5e2a6164 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright (c) 2019-2020 Arm Limited. All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#
# This is a server that accepts PUT requests that's primary used to receive
# artefacts from Jenkins. Received files are placed under the output directory
# under the same path mentioned in the URL.
#
# The script takes two arguments: IP address and a port number to listen to.
# Note that the IP address has to be externally visible.
import argparse
import calendar
import heapq
import http.server
import itertools
import json
import os
import shutil
import socketserver
import threading
import time
import traceback
import urllib
import urllib.request
JENKINS_URL = "http://jenkins.oss.arm.com"
counter = itertools.count()
exiting = False
more_consoles = threading.Event()
pq = []
received = set()
# Class representing a pending job whose console is yet to be downloaded. The
# methods help identify when the job is finished (ready to download console),
# and to download the console along with the received artefacts.
class PendingJob:
def __init__(self, job, build, path):
self.job = job
self.build = build
self.path = path
self.url = "/".join([JENKINS_URL, "job", self.job, self.build])
def download_console(self, more):
console_url = "/".join([self.url, "consoleText"])
try:
with urllib.request.urlopen(console_url) as cfd, \
open(os.path.join(self.path, "console.txt"), "wb") as fd:
shutil.copyfileobj(cfd, fd)
print("{}: {}#{}: console (+{})".format(time_now(), self.job,
self.build, more))
except Exception as e:
traceback.print_exception(Exception, e, e.__traceback__)
def is_ready(self):
# Return true if there were errors as otherwise this job won't ever be
# completed.
ret = True
json_url = "/".join([self.url, "api", "json"])
try:
with urllib.request.urlopen(json_url) as fd:
job_json = json.loads(fd.read().decode())
ret = job_json["building"] == False
except Exception as e:
traceback.print_exception(Exception, e, e.__traceback__)
return ret
# PUT handler for the receiver. When an artefact with a valid job name and build
# number is received, we keep a pending job instance to download its console
# when the job finishes.
class ArtefactsReceiver(http.server.BaseHTTPRequestHandler):
def do_PUT(self):
parsed = urllib.parse.urlparse(self.path)
path = parsed.path.lstrip("/")
relpath = os.path.join(opts.output_dir, os.path.dirname(path))
os.makedirs(relpath, exist_ok=True)
content_length = int(self.headers["Content-Length"])
with open(os.path.join(opts.output_dir, path), "wb") as fd:
fd.write(self.rfile.read(content_length))
self.send_response(200)
self.end_headers()
qs = urllib.parse.parse_qs(parsed.query)
job = qs.get("j", [None])[0]
build = qs.get("b", [None])[0]
print("{}: {}#{}: {}".format(time_now(), job, build, path))
if job and build and (job, build) not in received:
item = (now(), next(counter), PendingJob(job, build, relpath))
heapq.heappush(pq, item)
more_consoles.set()
received.add((job, build))
# Avoid default logging by overriding with a dummy function
def log_message(self, *args):
pass
class Server(socketserver.ThreadingMixIn, http.server.HTTPServer):
pass
def now():
return calendar.timegm(time.gmtime())
def time_now():
return time.strftime("%H:%M:%S")
def console_monitor():
while not exiting:
# Wait here for the queue to be non-empty
try:
ts, count, job = pq[0]
except IndexError:
more_consoles.wait()
continue
# Short nap before next job is available
if ts > now():
time.sleep(2)
continue
ts, count, job = heapq.heappop(pq)
if not job.is_ready():
# Re-queue the job for later
heapq.heappush(pq, (ts + 10, count, job))
continue
job.download_console(len(pq))
more_consoles.clear()
parser = argparse.ArgumentParser()
parser.add_argument("--output-dir", "-o", default="artefacts")
parser.add_argument("ip", help="IP address to listen to")
parser.add_argument("port", help="Port number to listen to")
opts = parser.parse_args()
os.makedirs(opts.output_dir, exist_ok=True)
server = Server((opts.ip, int(opts.port)), ArtefactsReceiver)
print("Trusted Firmware-A artefacts receiver:")
print()
print("\tUse artefacts_receiver=http://{}:{}".format(opts.ip, opts.port))
print("\tArtefacts will be placed under '{}'. Waiting...".format(opts.output_dir))
print()
try:
more_consoles.clear()
console_thread = threading.Thread(target=console_monitor)
console_thread.start()
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
print()
print("Exiting...")
exiting = True
more_consoles.set()
console_thread.join()
server.server_close()