blob: 7566c2eab07da69c0d209f1b49b67927de056adc [file] [log] [blame]
Minos Galanakisea421232019-06-20 17:11:28 +01001#!/usr/bin/env python3
2
3""" fastmodel_wrapper.py:
4
5 Wraps around Fast models which will execute in headless model
6 producing serial output to a defined log file. It will spawn two Proccesses
7 and one thread to monitor the output of the simulation and end it when a
8 user defined condition is matched. It will perform a set of tests and will
9 change the script exit code based on the output of the test """
10
11from __future__ import print_function
12
13__copyright__ = """
14/*
15 * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
16 *
17 * SPDX-License-Identifier: BSD-3-Clause
18 *
19 */
20 """
21__author__ = "Minos Galanakis"
22__email__ = "minos.galanakis@linaro.org"
23__project__ = "Trusted Firmware-M Open CI"
24__status__ = "stable"
25__version__ = "1.1"
26
27import os
28import re
29import sys
30import argparse
31from time import sleep
32from pprint import pprint
33from copy import deepcopy
34from threading import Thread
35from queue import Queue, Empty
36from subprocess import Popen, PIPE, STDOUT
37
38try:
39 from tfm_ci_pylib.utils import find_missing_files, \
40 detect_python3, test, check_pid_status, save_json, save_dict_json, \
41 load_json
42except ImportError:
43 dir_path = os.path.dirname(os.path.realpath(__file__))
44 sys.path.append(os.path.join(dir_path, "../"))
45 from tfm_ci_pylib.utils import find_missing_files, \
46 detect_python3, test, check_pid_status, save_json, save_dict_json, \
47 load_json
48
49
50class FastmodelWrapper(object):
51 """ Controlling Class that wraps around an ARM Fastmodel and controls
52 execution, adding regex flow controls, and headless testing """
53
54 def __init__(self,
55 fvp_cfg=None,
56 work_dir="./",
57 fvp_dir=None,
58 fvp_binary=None,
59 fvp_app=None,
60 fvp_boot=None,
61 terminal_file=None,
62 fvp_time_out=None,
63 fvp_test_error=None):
64
65 # Required by other methods, always set working directory first
66 self.work_dir = os.path.abspath(work_dir)
67
68 # Load the configuration from object or file
69 self.config, self.name = self.load_config(fvp_cfg)
70
71 self.show_config()
72
73 # Print a header
74 ln = int((62 - len(self.name) + 1) / 2)
75 print("\n%s Running Test: %s %s\n" % ("#" * ln, self.name, "#" * ln))
76
77 # consume the configuration parameters not related to FPV
78 # Extract test cases
79 self.test_list = self.config.pop("test_cases")
80 self.test_end_string = self.config.pop("test_end_string")
81 self.test_rex = self.config.pop("test_rex")
82
83 # Command line arguments overrides
84 # When those arguments are provided they override config entries
85 f_dir = self.config.pop("directory")
86 if fvp_dir:
87 self.fvp_dir = os.path.abspath(fvp_dir)
88 else:
89 self.fvp_dir = os.path.abspath(f_dir)
90
91 ef = self.config.pop("error_on_failed")
92 if fvp_test_error:
93 self.fvp_test_error = fvp_test_error
94 else:
95 self.fvp_test_error = ef
96
97 tf = self.config.pop("terminal_log")
98 if terminal_file:
99 self.term_file = os.path.abspath(terminal_file)
100 else:
101 tf = os.path.join(self.work_dir, tf)
102 self.term_file = os.path.abspath(tf)
103
104 # Override config entries directly
105 if fvp_binary:
106 self.config["bin"] = fvp_binary
107
108 if fvp_boot:
109 if re.match(r'[\S]+.axf$', fvp_boot):
110 self.config["application"] = "cpu0=" +\
111 os.path.abspath(fvp_boot)
112 else:
113 print("Invalid bootloader %s. Expecting .axf file" % fvp_app)
114 sys.exit(1)
115
116 # Ensure that the firmware is copied at the appropriate memory region
117 # perfect mathc regx for future ref r'^(?:cpu=)[\S]+.bin@0x10080000$'
118 # TODO remove that when other platforms are added
119 if fvp_app:
120 if re.match(r'[\S]+.bin$', fvp_app):
121 self.config["data"] = "cpu0=" +\
122 os.path.abspath(fvp_app) +\
123 "@0x10080000"
124 else:
125 print("Invalid firmware %s. Expecting .bin file" % fvp_app)
126 sys.exit(1)
127
128 if fvp_time_out:
129 self.fvp_time_out = fvp_time_out
130 self.config["simlimit"] = fvp_time_out
131
132 self.monitor_q = Queue()
133 self.stop_all = False
134 self.pids = []
135 self.fvp_test_summary = False
136
137 # Asserted only after a complete test run,including end string matching
138 self.test_complete = False
139
140 self.test_report = None
141
142 # Change to working directory
143 os.chdir(self.work_dir)
144 print("Switching to working directory: %s" % self.work_dir)
145 # Clear the file it it has been created before
146 with open(self.term_file, "w") as F:
147 F.write("")
148
149 def show_config(self):
150 """ print the configuration to console """
151
152 print("\n%s config:\n" % self.name)
153 pprint(self.config)
154
155 def load_config(self, config):
156 """ Load the configuration from a json file or a memory map"""
157
158 try:
159 # If config is an dictionary object use it as is
160 if isinstance(config, dict):
161 ret_config = config
162 elif isinstance(config, str):
163 # if the file provided is not detected attempt to look for it
164 # in working directory
165 if not os.path.isfile(config):
166 # remove path from file
167 cfg_file_2 = os.path.split(config)[-1]
168 # look in the current working directory
169 cfg_file_2 = os.path.join(self.work_dir, cfg_file_2)
170 if not os.path.isfile(cfg_file_2):
171 m = "Could not find cfg in %s or %s " % (config,
172 cfg_file_2)
173 raise Exception(m)
174 # If fille exists in working directory
175 else:
176 config = cfg_file_2
177 # Attempt to load the configuration from File
178 ret_config = load_json(config)
179 else:
180 raise Exception("Need to provide a valid config name or file."
181 "Please use --config/--config-file parameter.")
182
183 except Exception as e:
184 print("Error! Could not load config. Quitting")
185 sys.exit(1)
186
187 # Generate Test name (Used in test report) from terminal file.
188 tname = ret_config["terminal_log"].replace("terminal_", "")\
189 .split(".")[0].lower()
190
191 return deepcopy(ret_config), tname
192
193 def save_config(self, config_file="fvp_tfm_config.json"):
194 """ Safe current configuration to a json file """
195
196 # Add stripped information to config
197 exp_cfg = deepcopy(self.config)
198
199 exp_cfg["terminal_log"] = self.term_file
200 exp_cfg["error_on_failed"] = self.fvp_test_error
201 exp_cfg["directory"] = self.fvp_dir
202 exp_cfg["test_cases"] = self.test_list
203 exp_cfg["test_end_string"] = self.test_end_string
204 exp_cfg["test_rex"] = self.test_rex
205
206 cfg_f = os.path.join(self.work_dir, config_file)
207 save_dict_json(cfg_f, exp_cfg, exp_cfg.get_sort_order())
208 print("Configuration %s exported." % cfg_f)
209
210 def compile_cmd(self):
211 """ Compile all the FPV realted information into a command that can
212 be executed manually """
213
214 cmd = ""
215 for name, value in self.config.items():
216 # Place executable to the beggining of the machine
217 if name == "bin":
218 cmd = value + cmd
219 elif name == "parameters":
220 cmd += " " + " ".join(["--parameter %s" % p for p in value])
221 # Allows setting a second binary file as data field
222 elif name == "application" and ".bin@0x0" in value:
223 cmd += " --data %s" % value
224 else:
225 cmd += " --%s %s" % (name, value)
226
227 # Add the path to the command
228 cmd = os.path.join(self.fvp_dir, cmd)
229
230 # Add the log file to the command (optional)
231 cmd = cmd.replace("$TERM_FILE", self.term_file)
232 return cmd
233
234 def show_cmd(self):
235 """ print the FPV command to console """
236
237 print(self.compile_cmd())
238
239 def run_fpv(self):
240 """ Run the Fast Model test in a different proccess and return
241 the pid for housekeeping puproses """
242
243 def fpv_stdout_parser(dstream, queue):
244 """ THREAD: Read STDOUT/STDERR and stop if proccess is done """
245
246 for line in iter(dstream.readline, b''):
247 if self.stop_all:
248 break
249 else:
250 # Python2 ignores byte literals, P3 requires parsing
251 if detect_python3():
252 line = line.decode("utf-8")
253 if "Info: /OSCI/SystemC: Simulation stopped by user" in line:
254 print("/OSCI/SystemC: Simulation stopped")
255 self.stop()
256 break
257
258 # Convert to list
259 cmd = self.compile_cmd().split(" ")
260
261 # Run it as subproccess
262 self.fvp_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
263 self._fvp_thread = Thread(target=fpv_stdout_parser,
264 args=(self.fvp_proc.stdout,
265 self.monitor_q))
266 self._fvp_thread.daemon = True
267 self._fvp_thread.start()
268 return self.fvp_proc.pid
269
270 def run_monitor(self):
271 """ Run a parallel threaded proccess that monitors the output of
272 the FPV and stops it when the a user specified string is found.
273 It returns the pid of the proccess for housekeeping """
274
275 def monitor_producer(dstream, queue):
276 """ THREAD: Read STDOUT and push data into a queue """
277
278 for line in iter(dstream.readline, b''):
279 if self.stop_all:
280 break
281 else:
282 # Python2 ignores byte literals, P3 requires parsing
283 if detect_python3():
284 line = line.decode("utf-8")
285
286 queue.put(line)
287
288 # If the text end string is found terminate
289 if self.test_end_string in str(line):
290
291 queue.put("Found End String \"%s\"" % self.test_end_string)
292 self.test_complete = True
293 self.stop()
294 break
295 # If the FPV stopps by iteself (i.e simlimit reached) terminate
296 if "SystemC: Simulation stopped by user" in str(line):
297
298 queue.put("Simulation Ended \"%s\"" % self.test_end_string)
299 self.stop()
300 break
301
302 dstream.close()
303 return
304
305 # Run the tail as a separate proccess
306 cmd = ["tail", "-f", self.term_file]
307 self.monitor_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
308
309 self._fvp_mon_thread = Thread(target=monitor_producer,
310 args=(self.monitor_proc.stdout,
311 self.monitor_q))
312 self._fvp_mon_thread.daemon = True
313 self._fvp_mon_thread.start()
314 return self.monitor_proc.pid
315
316 def monitor_consumer(self):
317 """ Read the ouptut of the monitor thread and print the queue entries
318 one entry at the time (One line per call) """
319 try:
320 line = self.monitor_q.get_nowait()
321 except Empty:
322 pass
323 else:
324 print(line.rstrip())
325
326 def has_stopped(self):
327 """Retrun status of stop flag. True indicated stopped state """
328
329 return self.stop_all
330
331 def start(self):
332 """ Start the FPV and the montor procccesses and keep
333 track of their pids"""
334
335 # Do not spawn fpv unless everything is in place if
336 bin_list = [os.path.join(self.fvp_dir, self.config["bin"]),
337 self.config["application"].replace("cpu0=", "")
338 .replace("@0x0", ""),
339 self.config["data"].replace("@0x10080000", "")
340 .replace("@0x00100000", "")
341 .replace("cpu0=", "")]
342
343 if find_missing_files(bin_list):
344 print("Could not find all binaries from %s" % ", ".join(bin_list))
345 print("Missing Files:", ", ".join(find_missing_files(bin_list)))
346 sys.exit(1)
347
348 self.pids.append(self.run_fpv())
349 self.pids.append(self.run_monitor())
350 print("Spawned Proccesses with PID %s" % repr(self.pids)[1:-1])
351 return self
352
353 def stop(self):
354 """ Stop all threads, proccesses and make sure there are no leaks """
355
356 self.stop_all = True
357
358 # Send the gratious shutdown signal
359 self.monitor_proc.terminate()
360 self.fvp_proc.terminate()
361 sleep(1)
362 # List the Zombies
363 # TODO remove debug output
364 for pid in sorted(self.pids):
365 if check_pid_status(pid, ["zombie", ]):
366 pass
367 # print("Warning. Defunc proccess %s" % pid)
368
369 def test(self):
370 """ Parse the output terminal file and evaluate status of tests """
371
372 # read the output file
373 with open(self.term_file, "r") as F:
374 terminal_log = F.read()
375
376 pass_text = "PASSED"
377 # create a filtering regex
378 rex = re.compile(self.test_rex)
379
380 # Extract tests status as a tuple list
381 tests = rex.findall(terminal_log)
382
383 try:
384 if isinstance(tests, list):
385 if len(tests):
386 # when test regex is in format [(test_name, RESULT),...]
387 if isinstance(tests[0], tuple):
388 # Convert result into a dictionary
389 tests = dict(zip(*list(zip(*tests))))
390 # when regex is in format [(test_name, test_name 2),...]
391 # we just need to verify they exist
392 elif isinstance(tests[0], str):
393 pass_text = "PRESENT"
394 tests = dict(zip(tests,
395 [pass_text for n in range(len(tests))]))
396 else:
397 raise Exception("Incompatible Test Format")
398 else:
399 raise Exception("Incompatible Test Format")
400 else:
401 raise Exception("Incompatible Test Format")
402 except Exception:
403
404 if not self.test_complete:
405 print("Warning! Test did not complete.")
406 else:
407 print("Error", "Invalid tests format: %s type: %s" %
408 (tests, type(tests)))
409 # Pass an empty output to test. Do not exit prematurely
410 tests = {}
411
412 # Run the test and store the report
413 self.test_report = test(self.test_list,
414 tests,
415 pass_text=pass_text,
416 test_name=self.name,
417 error_on_failed=self.fvp_test_error,
418 summary=self.fvp_test_summary)
419 return self
420
421 def get_report(self):
422 """ Return the test report object to caller """
423
424 if not self.test_report:
425 raise Exception("Can not create report from incomplete run cycle!")
426 return self.test_report
427
428 def save_report(self, rep_f=None):
429 """ Export report into a file, set by test name but can be overidden by
430 rep_file"""
431
432 if not self.stop_all or not self.test_report:
433 print("Can not create report from incomplete run cycle!")
434 return
435
436 if not rep_f:
437 rep_f = os.path.join(self.work_dir, "report_%s.json" % self.name)
438 rep_f = os.path.abspath(rep_f)
439 save_json(rep_f, self.test_report)
440 print("Exported test report: %s" % rep_f)
441 return self
442
443 def block_wait(self):
444 """ Block execution flow and wait for the monitor to complete """
445 try:
446 while True:
447 for pid in sorted(self.pids):
448
449 if not check_pid_status(pid, ["running",
450 "sleeping",
451 "disk"]):
452 print("Child proccess of pid: %s has died, exitting!" %
453 pid)
454 self.stop()
455 if self.has_stopped():
456 break
457 else:
458 self.monitor_consumer()
459
460 except KeyboardInterrupt:
461 print("User initiated interrupt")
462 self.stop()
463 # Allows method to be chainloaded
464 return self
465
466
467def get_cmd_args():
468 """ Parse command line arguments """
469
470 # Parse command line arguments to override config
471 parser = argparse.ArgumentParser(description="TFM Fastmodel wrapper.")
472 parser.add_argument("--bin",
473 dest="fvp_bin",
474 action="store",
475 help="Fast Model platform binary file")
476 parser.add_argument("--firmware",
477 dest="fvp_firm",
478 action="store",
479 help="Firmware application file to run")
480 parser.add_argument("--boot",
481 dest="fvp_boot",
482 action="store",
483 help="Fast Model bootloader file")
484 parser.add_argument("--fpv-path",
485 dest="fvp_dir",
486 action="store",
487 help="Directory path containing the Fast Models")
488 parser.add_argument("--work-path",
489 dest="work_dir", action="store",
490 default="./",
491 help="Working directory (Where logs are stored)")
492 parser.add_argument("--time-limit",
493 dest="time", action="store",
494 help="Time in seconds to run the simulation")
495 parser.add_argument("--log-file",
496 dest="termf",
497 action="store",
498 help="Set terminal log file name")
499 parser.add_argument("--error",
500 dest="test_err",
501 action="store",
502 help="raise sys.error = 1 if test failed")
503 parser.add_argument("--config-file",
504 dest="config_file",
505 action="store",
506 help="Path of configuration file")
507 parser.add_argument("--print-config",
508 dest="p_config",
509 action="store_true",
510 help="Print the configuration to console")
511 parser.add_argument("--print-command",
512 dest="p_command",
513 action="store_true",
514 help="Print the FPV launch command to console")
515 return parser.parse_args()
516
517
518def main(user_args):
519 """ Main logic """
520
521 # Create FPV handler
522 F = FastmodelWrapper(fvp_cfg=user_args.config_file,
523 work_dir=user_args.work_dir,
524 fvp_dir=user_args.fvp_dir,
525 fvp_binary=user_args.fvp_bin,
526 fvp_boot=user_args.fvp_boot,
527 fvp_app=user_args.fvp_firm,
528 terminal_file=user_args.termf,
529 fvp_time_out=user_args.time,
530 fvp_test_error=user_args.test_err)
531
532 if user_args.p_config:
533 F.show_config()
534 sys.exit(0)
535
536 if user_args.p_command:
537 F.show_cmd()
538 sys.exit(0)
539
540 # Start the wrapper
541 F.start()
542
543 # Wait for the wrapper to complete
544 F.block_wait()
545
546 print("Shutting Down")
547 # Test the output of the system only after a full execution
548 if F.test_complete:
549 F.test()
550
551
552if __name__ == "__main__":
553 main(get_cmd_args())