blob: 227d047bac0e47b7557760c387df7b7d5e2a6164 [file] [log] [blame]
Fathi Boudra422bf772019-12-02 11:10:16 +02001#!/usr/bin/env python3
2#
Leonardo Sandoval579c7372020-10-23 15:23:32 -05003# Copyright (c) 2019-2020 Arm Limited. All rights reserved.
Fathi Boudra422bf772019-12-02 11:10:16 +02004#
5# SPDX-License-Identifier: BSD-3-Clause
6#
7
8# This is a server that accepts PUT requests that's primary used to receive
9# artefacts from Jenkins. Received files are placed under the output directory
10# under the same path mentioned in the URL.
11#
12# The script takes two arguments: IP address and a port number to listen to.
13# Note that the IP address has to be externally visible.
14
15import argparse
16import calendar
17import heapq
18import http.server
19import itertools
20import json
21import os
22import shutil
23import socketserver
24import threading
25import time
26import traceback
27import urllib
28import urllib.request
29
30
31JENKINS_URL = "http://jenkins.oss.arm.com"
32counter = itertools.count()
33exiting = False
34more_consoles = threading.Event()
35pq = []
36received = set()
37
38
39# Class representing a pending job whose console is yet to be downloaded. The
40# methods help identify when the job is finished (ready to download console),
41# and to download the console along with the received artefacts.
42class PendingJob:
43 def __init__(self, job, build, path):
44 self.job = job
45 self.build = build
46 self.path = path
47 self.url = "/".join([JENKINS_URL, "job", self.job, self.build])
48
49 def download_console(self, more):
50 console_url = "/".join([self.url, "consoleText"])
51 try:
52 with urllib.request.urlopen(console_url) as cfd, \
53 open(os.path.join(self.path, "console.txt"), "wb") as fd:
54 shutil.copyfileobj(cfd, fd)
55
56 print("{}: {}#{}: console (+{})".format(time_now(), self.job,
57 self.build, more))
58 except Exception as e:
59 traceback.print_exception(Exception, e, e.__traceback__)
60
61 def is_ready(self):
62 # Return true if there were errors as otherwise this job won't ever be
63 # completed.
64 ret = True
65
66 json_url = "/".join([self.url, "api", "json"])
67 try:
68 with urllib.request.urlopen(json_url) as fd:
69 job_json = json.loads(fd.read().decode())
70 ret = job_json["building"] == False
71 except Exception as e:
72 traceback.print_exception(Exception, e, e.__traceback__)
73
74 return ret
75
76
77# PUT handler for the receiver. When an artefact with a valid job name and build
78# number is received, we keep a pending job instance to download its console
79# when the job finishes.
80class ArtefactsReceiver(http.server.BaseHTTPRequestHandler):
81 def do_PUT(self):
82 parsed = urllib.parse.urlparse(self.path)
83 path = parsed.path.lstrip("/")
84 relpath = os.path.join(opts.output_dir, os.path.dirname(path))
85
86 os.makedirs(relpath, exist_ok=True)
87 content_length = int(self.headers["Content-Length"])
88
89 with open(os.path.join(opts.output_dir, path), "wb") as fd:
90 fd.write(self.rfile.read(content_length))
91
92 self.send_response(200)
93 self.end_headers()
94
95 qs = urllib.parse.parse_qs(parsed.query)
96 job = qs.get("j", [None])[0]
97 build = qs.get("b", [None])[0]
98
99 print("{}: {}#{}: {}".format(time_now(), job, build, path))
100
101 if job and build and (job, build) not in received:
102 item = (now(), next(counter), PendingJob(job, build, relpath))
103 heapq.heappush(pq, item)
104 more_consoles.set()
105 received.add((job, build))
106
107 # Avoid default logging by overriding with a dummy function
108 def log_message(self, *args):
109 pass
110
111
112class Server(socketserver.ThreadingMixIn, http.server.HTTPServer):
113 pass
114
115
116def now():
117 return calendar.timegm(time.gmtime())
118
119
120def time_now():
121 return time.strftime("%H:%M:%S")
122
123
124def console_monitor():
125 while not exiting:
126 # Wait here for the queue to be non-empty
127 try:
128 ts, count, job = pq[0]
129 except IndexError:
130 more_consoles.wait()
131 continue
132
133 # Short nap before next job is available
134 if ts > now():
135 time.sleep(2)
136 continue
137
138 ts, count, job = heapq.heappop(pq)
139 if not job.is_ready():
140 # Re-queue the job for later
141 heapq.heappush(pq, (ts + 10, count, job))
142 continue
143
144 job.download_console(len(pq))
145 more_consoles.clear()
146
147
148parser = argparse.ArgumentParser()
149
150parser.add_argument("--output-dir", "-o", default="artefacts")
151parser.add_argument("ip", help="IP address to listen to")
152parser.add_argument("port", help="Port number to listen to")
153
154opts = parser.parse_args()
155
156os.makedirs(opts.output_dir, exist_ok=True)
157
158server = Server((opts.ip, int(opts.port)), ArtefactsReceiver)
159print("Trusted Firmware-A artefacts receiver:")
160print()
161print("\tUse artefacts_receiver=http://{}:{}".format(opts.ip, opts.port))
162print("\tArtefacts will be placed under '{}'. Waiting...".format(opts.output_dir))
163print()
164
165try:
166 more_consoles.clear()
167 console_thread = threading.Thread(target=console_monitor)
168 console_thread.start()
169 server.serve_forever()
170except KeyboardInterrupt:
171 pass
172finally:
173 print()
174 print("Exiting...")
175 exiting = True
176 more_consoles.set()
177 console_thread.join()
178 server.server_close()