blob: d49cd6c5e539fdd0c12cf87b415ce56acc87a134 [file] [log] [blame]
Minos Galanakisea421232019-06-20 17:11:28 +01001#!/usr/bin/env python3
2
3""" fastmodel_wrapper.py:
4
5 Wraps around Fast models which will execute in headless model
6 producing serial output to a defined log file. It will spawn two Proccesses
7 and one thread to monitor the output of the simulation and end it when a
8 user defined condition is matched. It will perform a set of tests and will
9 change the script exit code based on the output of the test """
10
11from __future__ import print_function
12
13__copyright__ = """
14/*
15 * Copyright (c) 2018-2019, Arm Limited. All rights reserved.
16 *
17 * SPDX-License-Identifier: BSD-3-Clause
18 *
19 */
20 """
21__author__ = "Minos Galanakis"
22__email__ = "minos.galanakis@linaro.org"
23__project__ = "Trusted Firmware-M Open CI"
24__status__ = "stable"
25__version__ = "1.1"
26
27import os
28import re
29import sys
30import argparse
31from time import sleep
32from pprint import pprint
33from copy import deepcopy
34from threading import Thread
35from queue import Queue, Empty
36from subprocess import Popen, PIPE, STDOUT
37
38try:
39 from tfm_ci_pylib.utils import find_missing_files, \
40 detect_python3, test, check_pid_status, save_json, save_dict_json, \
41 load_json
42except ImportError:
43 dir_path = os.path.dirname(os.path.realpath(__file__))
44 sys.path.append(os.path.join(dir_path, "../"))
45 from tfm_ci_pylib.utils import find_missing_files, \
46 detect_python3, test, check_pid_status, save_json, save_dict_json, \
47 load_json
48
49
50class FastmodelWrapper(object):
51 """ Controlling Class that wraps around an ARM Fastmodel and controls
52 execution, adding regex flow controls, and headless testing """
53
54 def __init__(self,
55 fvp_cfg=None,
56 work_dir="./",
57 fvp_dir=None,
58 fvp_binary=None,
59 fvp_app=None,
60 fvp_boot=None,
61 terminal_file=None,
62 fvp_time_out=None,
63 fvp_test_error=None):
64
65 # Required by other methods, always set working directory first
66 self.work_dir = os.path.abspath(work_dir)
67
68 # Load the configuration from object or file
69 self.config, self.name = self.load_config(fvp_cfg)
70
71 self.show_config()
72
73 # Print a header
74 ln = int((62 - len(self.name) + 1) / 2)
75 print("\n%s Running Test: %s %s\n" % ("#" * ln, self.name, "#" * ln))
76
77 # consume the configuration parameters not related to FPV
78 # Extract test cases
79 self.test_list = self.config.pop("test_cases")
80 self.test_end_string = self.config.pop("test_end_string")
81 self.test_rex = self.config.pop("test_rex")
82
83 # Command line arguments overrides
84 # When those arguments are provided they override config entries
85 f_dir = self.config.pop("directory")
86 if fvp_dir:
87 self.fvp_dir = os.path.abspath(fvp_dir)
88 else:
89 self.fvp_dir = os.path.abspath(f_dir)
90
91 ef = self.config.pop("error_on_failed")
92 if fvp_test_error:
93 self.fvp_test_error = fvp_test_error
94 else:
95 self.fvp_test_error = ef
96
97 tf = self.config.pop("terminal_log")
98 if terminal_file:
99 self.term_file = os.path.abspath(terminal_file)
100 else:
101 tf = os.path.join(self.work_dir, tf)
102 self.term_file = os.path.abspath(tf)
103
104 # Override config entries directly
105 if fvp_binary:
106 self.config["bin"] = fvp_binary
107
108 if fvp_boot:
109 if re.match(r'[\S]+.axf$', fvp_boot):
110 self.config["application"] = "cpu0=" +\
111 os.path.abspath(fvp_boot)
112 else:
113 print("Invalid bootloader %s. Expecting .axf file" % fvp_app)
114 sys.exit(1)
115
116 # Ensure that the firmware is copied at the appropriate memory region
117 # perfect mathc regx for future ref r'^(?:cpu=)[\S]+.bin@0x10080000$'
118 # TODO remove that when other platforms are added
119 if fvp_app:
120 if re.match(r'[\S]+.bin$', fvp_app):
121 self.config["data"] = "cpu0=" +\
122 os.path.abspath(fvp_app) +\
123 "@0x10080000"
124 else:
125 print("Invalid firmware %s. Expecting .bin file" % fvp_app)
126 sys.exit(1)
127
128 if fvp_time_out:
129 self.fvp_time_out = fvp_time_out
130 self.config["simlimit"] = fvp_time_out
131
132 self.monitor_q = Queue()
133 self.stop_all = False
134 self.pids = []
135 self.fvp_test_summary = False
136
137 # Asserted only after a complete test run,including end string matching
138 self.test_complete = False
139
140 self.test_report = None
141
142 # Change to working directory
143 os.chdir(self.work_dir)
144 print("Switching to working directory: %s" % self.work_dir)
145 # Clear the file it it has been created before
146 with open(self.term_file, "w") as F:
147 F.write("")
148
149 def show_config(self):
150 """ print the configuration to console """
151
152 print("\n%s config:\n" % self.name)
153 pprint(self.config)
154
155 def load_config(self, config):
156 """ Load the configuration from a json file or a memory map"""
157
158 try:
159 # If config is an dictionary object use it as is
160 if isinstance(config, dict):
161 ret_config = config
162 elif isinstance(config, str):
163 # if the file provided is not detected attempt to look for it
164 # in working directory
165 if not os.path.isfile(config):
166 # remove path from file
167 cfg_file_2 = os.path.split(config)[-1]
168 # look in the current working directory
169 cfg_file_2 = os.path.join(self.work_dir, cfg_file_2)
170 if not os.path.isfile(cfg_file_2):
171 m = "Could not find cfg in %s or %s " % (config,
172 cfg_file_2)
173 raise Exception(m)
174 # If fille exists in working directory
175 else:
176 config = cfg_file_2
177 # Attempt to load the configuration from File
178 ret_config = load_json(config)
179 else:
180 raise Exception("Need to provide a valid config name or file."
181 "Please use --config/--config-file parameter.")
182
183 except Exception as e:
184 print("Error! Could not load config. Quitting")
185 sys.exit(1)
186
187 # Generate Test name (Used in test report) from terminal file.
188 tname = ret_config["terminal_log"].replace("terminal_", "")\
189 .split(".")[0].lower()
190
191 return deepcopy(ret_config), tname
192
193 def save_config(self, config_file="fvp_tfm_config.json"):
194 """ Safe current configuration to a json file """
195
196 # Add stripped information to config
197 exp_cfg = deepcopy(self.config)
198
199 exp_cfg["terminal_log"] = self.term_file
200 exp_cfg["error_on_failed"] = self.fvp_test_error
201 exp_cfg["directory"] = self.fvp_dir
202 exp_cfg["test_cases"] = self.test_list
203 exp_cfg["test_end_string"] = self.test_end_string
204 exp_cfg["test_rex"] = self.test_rex
205
206 cfg_f = os.path.join(self.work_dir, config_file)
207 save_dict_json(cfg_f, exp_cfg, exp_cfg.get_sort_order())
208 print("Configuration %s exported." % cfg_f)
209
210 def compile_cmd(self):
211 """ Compile all the FPV realted information into a command that can
212 be executed manually """
213
214 cmd = ""
215 for name, value in self.config.items():
216 # Place executable to the beggining of the machine
217 if name == "bin":
218 cmd = value + cmd
219 elif name == "parameters":
220 cmd += " " + " ".join(["--parameter %s" % p for p in value])
221 # Allows setting a second binary file as data field
222 elif name == "application" and ".bin@0x0" in value:
223 cmd += " --data %s" % value
224 else:
225 cmd += " --%s %s" % (name, value)
226
227 # Add the path to the command
228 cmd = os.path.join(self.fvp_dir, cmd)
229
230 # Add the log file to the command (optional)
231 cmd = cmd.replace("$TERM_FILE", self.term_file)
232 return cmd
233
234 def show_cmd(self):
235 """ print the FPV command to console """
236
237 print(self.compile_cmd())
238
239 def run_fpv(self):
240 """ Run the Fast Model test in a different proccess and return
241 the pid for housekeeping puproses """
242
243 def fpv_stdout_parser(dstream, queue):
244 """ THREAD: Read STDOUT/STDERR and stop if proccess is done """
245
246 for line in iter(dstream.readline, b''):
247 if self.stop_all:
248 break
249 else:
250 # Python2 ignores byte literals, P3 requires parsing
251 if detect_python3():
252 line = line.decode("utf-8")
253 if "Info: /OSCI/SystemC: Simulation stopped by user" in line:
254 print("/OSCI/SystemC: Simulation stopped")
255 self.stop()
256 break
257
258 # Convert to list
259 cmd = self.compile_cmd().split(" ")
Karl Zhangaff558a2020-05-15 14:28:23 +0100260 print("fvp cmd ", self.compile_cmd())
Minos Galanakisea421232019-06-20 17:11:28 +0100261
262 # Run it as subproccess
263 self.fvp_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
264 self._fvp_thread = Thread(target=fpv_stdout_parser,
265 args=(self.fvp_proc.stdout,
266 self.monitor_q))
267 self._fvp_thread.daemon = True
268 self._fvp_thread.start()
269 return self.fvp_proc.pid
270
271 def run_monitor(self):
272 """ Run a parallel threaded proccess that monitors the output of
273 the FPV and stops it when the a user specified string is found.
274 It returns the pid of the proccess for housekeeping """
275
276 def monitor_producer(dstream, queue):
277 """ THREAD: Read STDOUT and push data into a queue """
278
279 for line in iter(dstream.readline, b''):
280 if self.stop_all:
281 break
282 else:
283 # Python2 ignores byte literals, P3 requires parsing
284 if detect_python3():
285 line = line.decode("utf-8")
286
287 queue.put(line)
288
289 # If the text end string is found terminate
Karl Zhangaff558a2020-05-15 14:28:23 +0100290 if str(line).find(self.test_end_string) > 0:
Minos Galanakisea421232019-06-20 17:11:28 +0100291
292 queue.put("Found End String \"%s\"" % self.test_end_string)
Karl Zhangaff558a2020-05-15 14:28:23 +0100293 print("Found End String \"%s\"" % self.test_end_string)
Minos Galanakisea421232019-06-20 17:11:28 +0100294 self.test_complete = True
295 self.stop()
296 break
297 # If the FPV stopps by iteself (i.e simlimit reached) terminate
298 if "SystemC: Simulation stopped by user" in str(line):
299
300 queue.put("Simulation Ended \"%s\"" % self.test_end_string)
301 self.stop()
302 break
303
304 dstream.close()
305 return
306
307 # Run the tail as a separate proccess
308 cmd = ["tail", "-f", self.term_file]
309 self.monitor_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
310
311 self._fvp_mon_thread = Thread(target=monitor_producer,
312 args=(self.monitor_proc.stdout,
313 self.monitor_q))
314 self._fvp_mon_thread.daemon = True
315 self._fvp_mon_thread.start()
316 return self.monitor_proc.pid
317
318 def monitor_consumer(self):
319 """ Read the ouptut of the monitor thread and print the queue entries
320 one entry at the time (One line per call) """
321 try:
322 line = self.monitor_q.get_nowait()
323 except Empty:
324 pass
325 else:
326 print(line.rstrip())
327
328 def has_stopped(self):
329 """Retrun status of stop flag. True indicated stopped state """
330
331 return self.stop_all
332
333 def start(self):
334 """ Start the FPV and the montor procccesses and keep
335 track of their pids"""
336
337 # Do not spawn fpv unless everything is in place if
338 bin_list = [os.path.join(self.fvp_dir, self.config["bin"]),
339 self.config["application"].replace("cpu0=", "")
340 .replace("@0x0", ""),
341 self.config["data"].replace("@0x10080000", "")
342 .replace("@0x00100000", "")
343 .replace("cpu0=", "")]
344
345 if find_missing_files(bin_list):
346 print("Could not find all binaries from %s" % ", ".join(bin_list))
347 print("Missing Files:", ", ".join(find_missing_files(bin_list)))
348 sys.exit(1)
Karl Zhangaff558a2020-05-15 14:28:23 +0100349 self.show_cmd()
Minos Galanakisea421232019-06-20 17:11:28 +0100350 self.pids.append(self.run_fpv())
351 self.pids.append(self.run_monitor())
352 print("Spawned Proccesses with PID %s" % repr(self.pids)[1:-1])
353 return self
354
355 def stop(self):
356 """ Stop all threads, proccesses and make sure there are no leaks """
357
358 self.stop_all = True
359
360 # Send the gratious shutdown signal
361 self.monitor_proc.terminate()
362 self.fvp_proc.terminate()
363 sleep(1)
364 # List the Zombies
365 # TODO remove debug output
366 for pid in sorted(self.pids):
367 if check_pid_status(pid, ["zombie", ]):
368 pass
369 # print("Warning. Defunc proccess %s" % pid)
370
371 def test(self):
372 """ Parse the output terminal file and evaluate status of tests """
373
374 # read the output file
375 with open(self.term_file, "r") as F:
376 terminal_log = F.read()
377
378 pass_text = "PASSED"
379 # create a filtering regex
380 rex = re.compile(self.test_rex)
381
382 # Extract tests status as a tuple list
383 tests = rex.findall(terminal_log)
384
385 try:
386 if isinstance(tests, list):
387 if len(tests):
388 # when test regex is in format [(test_name, RESULT),...]
389 if isinstance(tests[0], tuple):
390 # Convert result into a dictionary
391 tests = dict(zip(*list(zip(*tests))))
392 # when regex is in format [(test_name, test_name 2),...]
393 # we just need to verify they exist
394 elif isinstance(tests[0], str):
395 pass_text = "PRESENT"
396 tests = dict(zip(tests,
397 [pass_text for n in range(len(tests))]))
398 else:
399 raise Exception("Incompatible Test Format")
400 else:
401 raise Exception("Incompatible Test Format")
402 else:
403 raise Exception("Incompatible Test Format")
404 except Exception:
405
406 if not self.test_complete:
407 print("Warning! Test did not complete.")
408 else:
409 print("Error", "Invalid tests format: %s type: %s" %
410 (tests, type(tests)))
411 # Pass an empty output to test. Do not exit prematurely
412 tests = {}
413
414 # Run the test and store the report
415 self.test_report = test(self.test_list,
416 tests,
417 pass_text=pass_text,
418 test_name=self.name,
419 error_on_failed=self.fvp_test_error,
420 summary=self.fvp_test_summary)
421 return self
422
423 def get_report(self):
424 """ Return the test report object to caller """
425
426 if not self.test_report:
427 raise Exception("Can not create report from incomplete run cycle!")
428 return self.test_report
429
430 def save_report(self, rep_f=None):
431 """ Export report into a file, set by test name but can be overidden by
432 rep_file"""
433
434 if not self.stop_all or not self.test_report:
435 print("Can not create report from incomplete run cycle!")
436 return
437
438 if not rep_f:
439 rep_f = os.path.join(self.work_dir, "report_%s.json" % self.name)
440 rep_f = os.path.abspath(rep_f)
441 save_json(rep_f, self.test_report)
442 print("Exported test report: %s" % rep_f)
443 return self
444
445 def block_wait(self):
446 """ Block execution flow and wait for the monitor to complete """
447 try:
448 while True:
449 for pid in sorted(self.pids):
450
451 if not check_pid_status(pid, ["running",
452 "sleeping",
453 "disk"]):
454 print("Child proccess of pid: %s has died, exitting!" %
455 pid)
456 self.stop()
457 if self.has_stopped():
458 break
459 else:
460 self.monitor_consumer()
461
462 except KeyboardInterrupt:
463 print("User initiated interrupt")
464 self.stop()
465 # Allows method to be chainloaded
466 return self
467
468
469def get_cmd_args():
470 """ Parse command line arguments """
471
472 # Parse command line arguments to override config
473 parser = argparse.ArgumentParser(description="TFM Fastmodel wrapper.")
474 parser.add_argument("--bin",
475 dest="fvp_bin",
476 action="store",
477 help="Fast Model platform binary file")
478 parser.add_argument("--firmware",
479 dest="fvp_firm",
480 action="store",
481 help="Firmware application file to run")
482 parser.add_argument("--boot",
483 dest="fvp_boot",
484 action="store",
485 help="Fast Model bootloader file")
486 parser.add_argument("--fpv-path",
487 dest="fvp_dir",
488 action="store",
489 help="Directory path containing the Fast Models")
490 parser.add_argument("--work-path",
491 dest="work_dir", action="store",
492 default="./",
493 help="Working directory (Where logs are stored)")
494 parser.add_argument("--time-limit",
495 dest="time", action="store",
496 help="Time in seconds to run the simulation")
497 parser.add_argument("--log-file",
498 dest="termf",
499 action="store",
500 help="Set terminal log file name")
501 parser.add_argument("--error",
502 dest="test_err",
503 action="store",
504 help="raise sys.error = 1 if test failed")
505 parser.add_argument("--config-file",
506 dest="config_file",
507 action="store",
508 help="Path of configuration file")
509 parser.add_argument("--print-config",
510 dest="p_config",
511 action="store_true",
512 help="Print the configuration to console")
513 parser.add_argument("--print-command",
514 dest="p_command",
515 action="store_true",
516 help="Print the FPV launch command to console")
517 return parser.parse_args()
518
519
520def main(user_args):
521 """ Main logic """
522
523 # Create FPV handler
524 F = FastmodelWrapper(fvp_cfg=user_args.config_file,
525 work_dir=user_args.work_dir,
526 fvp_dir=user_args.fvp_dir,
527 fvp_binary=user_args.fvp_bin,
528 fvp_boot=user_args.fvp_boot,
529 fvp_app=user_args.fvp_firm,
530 terminal_file=user_args.termf,
531 fvp_time_out=user_args.time,
532 fvp_test_error=user_args.test_err)
533
534 if user_args.p_config:
535 F.show_config()
536 sys.exit(0)
537
538 if user_args.p_command:
539 F.show_cmd()
540 sys.exit(0)
541
542 # Start the wrapper
543 F.start()
544
545 # Wait for the wrapper to complete
546 F.block_wait()
547
548 print("Shutting Down")
549 # Test the output of the system only after a full execution
550 if F.test_complete:
551 F.test()
552
553
554if __name__ == "__main__":
555 main(get_cmd_args())