blob: 6ace23fe9e70f89083d87623e59618a6e5c3eb82 [file] [log] [blame]
Minos Galanakisea421232019-06-20 17:11:28 +01001#!/usr/bin/env python3
2
3""" fastmodel_wrapper.py:
4
5 Wraps around Fast models which will execute in headless model
6 producing serial output to a defined log file. It will spawn two Proccesses
7 and one thread to monitor the output of the simulation and end it when a
8 user defined condition is matched. It will perform a set of tests and will
9 change the script exit code based on the output of the test """
10
11from __future__ import print_function
12
13__copyright__ = """
14/*
Karl Zhang08681e62020-10-30 13:56:03 +080015 * Copyright (c) 2018-2020, Arm Limited. All rights reserved.
Minos Galanakisea421232019-06-20 17:11:28 +010016 *
17 * SPDX-License-Identifier: BSD-3-Clause
18 *
19 */
20 """
Karl Zhang08681e62020-10-30 13:56:03 +080021
22__author__ = "tf-m@lists.trustedfirmware.org"
Minos Galanakisea421232019-06-20 17:11:28 +010023__project__ = "Trusted Firmware-M Open CI"
Karl Zhang08681e62020-10-30 13:56:03 +080024__version__ = "1.2.0"
Minos Galanakisea421232019-06-20 17:11:28 +010025
26import os
27import re
28import sys
29import argparse
30from time import sleep
31from pprint import pprint
32from copy import deepcopy
33from threading import Thread
34from queue import Queue, Empty
35from subprocess import Popen, PIPE, STDOUT
36
37try:
38 from tfm_ci_pylib.utils import find_missing_files, \
39 detect_python3, test, check_pid_status, save_json, save_dict_json, \
40 load_json
41except ImportError:
42 dir_path = os.path.dirname(os.path.realpath(__file__))
43 sys.path.append(os.path.join(dir_path, "../"))
44 from tfm_ci_pylib.utils import find_missing_files, \
45 detect_python3, test, check_pid_status, save_json, save_dict_json, \
46 load_json
47
48
49class FastmodelWrapper(object):
50 """ Controlling Class that wraps around an ARM Fastmodel and controls
51 execution, adding regex flow controls, and headless testing """
52
53 def __init__(self,
54 fvp_cfg=None,
55 work_dir="./",
56 fvp_dir=None,
57 fvp_binary=None,
58 fvp_app=None,
59 fvp_boot=None,
60 terminal_file=None,
61 fvp_time_out=None,
62 fvp_test_error=None):
63
64 # Required by other methods, always set working directory first
65 self.work_dir = os.path.abspath(work_dir)
66
67 # Load the configuration from object or file
68 self.config, self.name = self.load_config(fvp_cfg)
69
70 self.show_config()
71
72 # Print a header
73 ln = int((62 - len(self.name) + 1) / 2)
74 print("\n%s Running Test: %s %s\n" % ("#" * ln, self.name, "#" * ln))
75
76 # consume the configuration parameters not related to FPV
77 # Extract test cases
78 self.test_list = self.config.pop("test_cases")
79 self.test_end_string = self.config.pop("test_end_string")
80 self.test_rex = self.config.pop("test_rex")
81
82 # Command line arguments overrides
83 # When those arguments are provided they override config entries
84 f_dir = self.config.pop("directory")
85 if fvp_dir:
86 self.fvp_dir = os.path.abspath(fvp_dir)
87 else:
88 self.fvp_dir = os.path.abspath(f_dir)
89
90 ef = self.config.pop("error_on_failed")
91 if fvp_test_error:
92 self.fvp_test_error = fvp_test_error
93 else:
94 self.fvp_test_error = ef
95
96 tf = self.config.pop("terminal_log")
97 if terminal_file:
98 self.term_file = os.path.abspath(terminal_file)
99 else:
100 tf = os.path.join(self.work_dir, tf)
101 self.term_file = os.path.abspath(tf)
102
103 # Override config entries directly
104 if fvp_binary:
105 self.config["bin"] = fvp_binary
106
107 if fvp_boot:
108 if re.match(r'[\S]+.axf$', fvp_boot):
109 self.config["application"] = "cpu0=" +\
110 os.path.abspath(fvp_boot)
111 else:
112 print("Invalid bootloader %s. Expecting .axf file" % fvp_app)
113 sys.exit(1)
114
115 # Ensure that the firmware is copied at the appropriate memory region
116 # perfect mathc regx for future ref r'^(?:cpu=)[\S]+.bin@0x10080000$'
117 # TODO remove that when other platforms are added
118 if fvp_app:
119 if re.match(r'[\S]+.bin$', fvp_app):
120 self.config["data"] = "cpu0=" +\
121 os.path.abspath(fvp_app) +\
122 "@0x10080000"
123 else:
124 print("Invalid firmware %s. Expecting .bin file" % fvp_app)
125 sys.exit(1)
126
127 if fvp_time_out:
128 self.fvp_time_out = fvp_time_out
129 self.config["simlimit"] = fvp_time_out
130
131 self.monitor_q = Queue()
132 self.stop_all = False
133 self.pids = []
134 self.fvp_test_summary = False
135
136 # Asserted only after a complete test run,including end string matching
137 self.test_complete = False
138
139 self.test_report = None
140
141 # Change to working directory
142 os.chdir(self.work_dir)
143 print("Switching to working directory: %s" % self.work_dir)
144 # Clear the file it it has been created before
145 with open(self.term_file, "w") as F:
146 F.write("")
147
148 def show_config(self):
149 """ print the configuration to console """
150
151 print("\n%s config:\n" % self.name)
152 pprint(self.config)
153
154 def load_config(self, config):
155 """ Load the configuration from a json file or a memory map"""
156
157 try:
158 # If config is an dictionary object use it as is
159 if isinstance(config, dict):
160 ret_config = config
161 elif isinstance(config, str):
162 # if the file provided is not detected attempt to look for it
163 # in working directory
164 if not os.path.isfile(config):
165 # remove path from file
166 cfg_file_2 = os.path.split(config)[-1]
167 # look in the current working directory
168 cfg_file_2 = os.path.join(self.work_dir, cfg_file_2)
169 if not os.path.isfile(cfg_file_2):
170 m = "Could not find cfg in %s or %s " % (config,
171 cfg_file_2)
172 raise Exception(m)
173 # If fille exists in working directory
174 else:
175 config = cfg_file_2
176 # Attempt to load the configuration from File
177 ret_config = load_json(config)
178 else:
179 raise Exception("Need to provide a valid config name or file."
180 "Please use --config/--config-file parameter.")
181
182 except Exception as e:
183 print("Error! Could not load config. Quitting")
184 sys.exit(1)
185
186 # Generate Test name (Used in test report) from terminal file.
187 tname = ret_config["terminal_log"].replace("terminal_", "")\
188 .split(".")[0].lower()
189
190 return deepcopy(ret_config), tname
191
192 def save_config(self, config_file="fvp_tfm_config.json"):
193 """ Safe current configuration to a json file """
194
195 # Add stripped information to config
196 exp_cfg = deepcopy(self.config)
197
198 exp_cfg["terminal_log"] = self.term_file
199 exp_cfg["error_on_failed"] = self.fvp_test_error
200 exp_cfg["directory"] = self.fvp_dir
201 exp_cfg["test_cases"] = self.test_list
202 exp_cfg["test_end_string"] = self.test_end_string
203 exp_cfg["test_rex"] = self.test_rex
204
205 cfg_f = os.path.join(self.work_dir, config_file)
206 save_dict_json(cfg_f, exp_cfg, exp_cfg.get_sort_order())
207 print("Configuration %s exported." % cfg_f)
208
209 def compile_cmd(self):
210 """ Compile all the FPV realted information into a command that can
211 be executed manually """
212
213 cmd = ""
214 for name, value in self.config.items():
215 # Place executable to the beggining of the machine
216 if name == "bin":
217 cmd = value + cmd
218 elif name == "parameters":
219 cmd += " " + " ".join(["--parameter %s" % p for p in value])
220 # Allows setting a second binary file as data field
221 elif name == "application" and ".bin@0x0" in value:
222 cmd += " --data %s" % value
223 else:
224 cmd += " --%s %s" % (name, value)
225
226 # Add the path to the command
227 cmd = os.path.join(self.fvp_dir, cmd)
228
229 # Add the log file to the command (optional)
230 cmd = cmd.replace("$TERM_FILE", self.term_file)
231 return cmd
232
233 def show_cmd(self):
234 """ print the FPV command to console """
235
236 print(self.compile_cmd())
237
238 def run_fpv(self):
239 """ Run the Fast Model test in a different proccess and return
240 the pid for housekeeping puproses """
241
242 def fpv_stdout_parser(dstream, queue):
243 """ THREAD: Read STDOUT/STDERR and stop if proccess is done """
244
245 for line in iter(dstream.readline, b''):
246 if self.stop_all:
247 break
248 else:
249 # Python2 ignores byte literals, P3 requires parsing
250 if detect_python3():
251 line = line.decode("utf-8")
252 if "Info: /OSCI/SystemC: Simulation stopped by user" in line:
253 print("/OSCI/SystemC: Simulation stopped")
254 self.stop()
255 break
256
257 # Convert to list
258 cmd = self.compile_cmd().split(" ")
Karl Zhangaff558a2020-05-15 14:28:23 +0100259 print("fvp cmd ", self.compile_cmd())
Minos Galanakisea421232019-06-20 17:11:28 +0100260
261 # Run it as subproccess
262 self.fvp_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
263 self._fvp_thread = Thread(target=fpv_stdout_parser,
264 args=(self.fvp_proc.stdout,
265 self.monitor_q))
266 self._fvp_thread.daemon = True
267 self._fvp_thread.start()
268 return self.fvp_proc.pid
269
270 def run_monitor(self):
271 """ Run a parallel threaded proccess that monitors the output of
272 the FPV and stops it when the a user specified string is found.
273 It returns the pid of the proccess for housekeeping """
274
275 def monitor_producer(dstream, queue):
276 """ THREAD: Read STDOUT and push data into a queue """
277
278 for line in iter(dstream.readline, b''):
279 if self.stop_all:
280 break
281 else:
282 # Python2 ignores byte literals, P3 requires parsing
283 if detect_python3():
284 line = line.decode("utf-8")
285
286 queue.put(line)
287
288 # If the text end string is found terminate
Karl Zhangaff558a2020-05-15 14:28:23 +0100289 if str(line).find(self.test_end_string) > 0:
Minos Galanakisea421232019-06-20 17:11:28 +0100290
291 queue.put("Found End String \"%s\"" % self.test_end_string)
Karl Zhangaff558a2020-05-15 14:28:23 +0100292 print("Found End String \"%s\"" % self.test_end_string)
Minos Galanakisea421232019-06-20 17:11:28 +0100293 self.test_complete = True
294 self.stop()
295 break
296 # If the FPV stopps by iteself (i.e simlimit reached) terminate
297 if "SystemC: Simulation stopped by user" in str(line):
298
299 queue.put("Simulation Ended \"%s\"" % self.test_end_string)
300 self.stop()
301 break
302
303 dstream.close()
304 return
305
306 # Run the tail as a separate proccess
307 cmd = ["tail", "-f", self.term_file]
308 self.monitor_proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=False)
309
310 self._fvp_mon_thread = Thread(target=monitor_producer,
311 args=(self.monitor_proc.stdout,
312 self.monitor_q))
313 self._fvp_mon_thread.daemon = True
314 self._fvp_mon_thread.start()
315 return self.monitor_proc.pid
316
317 def monitor_consumer(self):
318 """ Read the ouptut of the monitor thread and print the queue entries
319 one entry at the time (One line per call) """
320 try:
321 line = self.monitor_q.get_nowait()
322 except Empty:
323 pass
324 else:
325 print(line.rstrip())
326
327 def has_stopped(self):
328 """Retrun status of stop flag. True indicated stopped state """
329
330 return self.stop_all
331
332 def start(self):
333 """ Start the FPV and the montor procccesses and keep
334 track of their pids"""
335
336 # Do not spawn fpv unless everything is in place if
337 bin_list = [os.path.join(self.fvp_dir, self.config["bin"]),
338 self.config["application"].replace("cpu0=", "")
339 .replace("@0x0", ""),
340 self.config["data"].replace("@0x10080000", "")
341 .replace("@0x00100000", "")
342 .replace("cpu0=", "")]
343
344 if find_missing_files(bin_list):
345 print("Could not find all binaries from %s" % ", ".join(bin_list))
346 print("Missing Files:", ", ".join(find_missing_files(bin_list)))
347 sys.exit(1)
Karl Zhangaff558a2020-05-15 14:28:23 +0100348 self.show_cmd()
Minos Galanakisea421232019-06-20 17:11:28 +0100349 self.pids.append(self.run_fpv())
350 self.pids.append(self.run_monitor())
351 print("Spawned Proccesses with PID %s" % repr(self.pids)[1:-1])
352 return self
353
354 def stop(self):
355 """ Stop all threads, proccesses and make sure there are no leaks """
356
357 self.stop_all = True
358
359 # Send the gratious shutdown signal
360 self.monitor_proc.terminate()
361 self.fvp_proc.terminate()
362 sleep(1)
363 # List the Zombies
364 # TODO remove debug output
365 for pid in sorted(self.pids):
366 if check_pid_status(pid, ["zombie", ]):
367 pass
368 # print("Warning. Defunc proccess %s" % pid)
369
370 def test(self):
371 """ Parse the output terminal file and evaluate status of tests """
372
373 # read the output file
374 with open(self.term_file, "r") as F:
375 terminal_log = F.read()
376
377 pass_text = "PASSED"
378 # create a filtering regex
379 rex = re.compile(self.test_rex)
380
381 # Extract tests status as a tuple list
382 tests = rex.findall(terminal_log)
383
384 try:
385 if isinstance(tests, list):
386 if len(tests):
387 # when test regex is in format [(test_name, RESULT),...]
388 if isinstance(tests[0], tuple):
389 # Convert result into a dictionary
390 tests = dict(zip(*list(zip(*tests))))
391 # when regex is in format [(test_name, test_name 2),...]
392 # we just need to verify they exist
393 elif isinstance(tests[0], str):
394 pass_text = "PRESENT"
395 tests = dict(zip(tests,
396 [pass_text for n in range(len(tests))]))
397 else:
398 raise Exception("Incompatible Test Format")
399 else:
400 raise Exception("Incompatible Test Format")
401 else:
402 raise Exception("Incompatible Test Format")
403 except Exception:
404
405 if not self.test_complete:
406 print("Warning! Test did not complete.")
407 else:
408 print("Error", "Invalid tests format: %s type: %s" %
409 (tests, type(tests)))
410 # Pass an empty output to test. Do not exit prematurely
411 tests = {}
412
413 # Run the test and store the report
414 self.test_report = test(self.test_list,
415 tests,
416 pass_text=pass_text,
417 test_name=self.name,
418 error_on_failed=self.fvp_test_error,
419 summary=self.fvp_test_summary)
420 return self
421
422 def get_report(self):
423 """ Return the test report object to caller """
424
425 if not self.test_report:
426 raise Exception("Can not create report from incomplete run cycle!")
427 return self.test_report
428
429 def save_report(self, rep_f=None):
430 """ Export report into a file, set by test name but can be overidden by
431 rep_file"""
432
433 if not self.stop_all or not self.test_report:
434 print("Can not create report from incomplete run cycle!")
435 return
436
437 if not rep_f:
438 rep_f = os.path.join(self.work_dir, "report_%s.json" % self.name)
439 rep_f = os.path.abspath(rep_f)
440 save_json(rep_f, self.test_report)
441 print("Exported test report: %s" % rep_f)
442 return self
443
444 def block_wait(self):
445 """ Block execution flow and wait for the monitor to complete """
446 try:
447 while True:
448 for pid in sorted(self.pids):
449
450 if not check_pid_status(pid, ["running",
451 "sleeping",
452 "disk"]):
453 print("Child proccess of pid: %s has died, exitting!" %
454 pid)
455 self.stop()
456 if self.has_stopped():
457 break
458 else:
459 self.monitor_consumer()
460
461 except KeyboardInterrupt:
462 print("User initiated interrupt")
463 self.stop()
464 # Allows method to be chainloaded
465 return self
466
467
468def get_cmd_args():
469 """ Parse command line arguments """
470
471 # Parse command line arguments to override config
472 parser = argparse.ArgumentParser(description="TFM Fastmodel wrapper.")
473 parser.add_argument("--bin",
474 dest="fvp_bin",
475 action="store",
476 help="Fast Model platform binary file")
477 parser.add_argument("--firmware",
478 dest="fvp_firm",
479 action="store",
480 help="Firmware application file to run")
481 parser.add_argument("--boot",
482 dest="fvp_boot",
483 action="store",
484 help="Fast Model bootloader file")
485 parser.add_argument("--fpv-path",
486 dest="fvp_dir",
487 action="store",
488 help="Directory path containing the Fast Models")
489 parser.add_argument("--work-path",
490 dest="work_dir", action="store",
491 default="./",
492 help="Working directory (Where logs are stored)")
493 parser.add_argument("--time-limit",
494 dest="time", action="store",
495 help="Time in seconds to run the simulation")
496 parser.add_argument("--log-file",
497 dest="termf",
498 action="store",
499 help="Set terminal log file name")
500 parser.add_argument("--error",
501 dest="test_err",
502 action="store",
503 help="raise sys.error = 1 if test failed")
504 parser.add_argument("--config-file",
505 dest="config_file",
506 action="store",
507 help="Path of configuration file")
508 parser.add_argument("--print-config",
509 dest="p_config",
510 action="store_true",
511 help="Print the configuration to console")
512 parser.add_argument("--print-command",
513 dest="p_command",
514 action="store_true",
515 help="Print the FPV launch command to console")
516 return parser.parse_args()
517
518
519def main(user_args):
520 """ Main logic """
521
522 # Create FPV handler
523 F = FastmodelWrapper(fvp_cfg=user_args.config_file,
524 work_dir=user_args.work_dir,
525 fvp_dir=user_args.fvp_dir,
526 fvp_binary=user_args.fvp_bin,
527 fvp_boot=user_args.fvp_boot,
528 fvp_app=user_args.fvp_firm,
529 terminal_file=user_args.termf,
530 fvp_time_out=user_args.time,
531 fvp_test_error=user_args.test_err)
532
533 if user_args.p_config:
534 F.show_config()
535 sys.exit(0)
536
537 if user_args.p_command:
538 F.show_cmd()
539 sys.exit(0)
540
541 # Start the wrapper
542 F.start()
543
544 # Wait for the wrapper to complete
545 F.block_wait()
546
547 print("Shutting Down")
548 # Test the output of the system only after a full execution
549 if F.test_complete:
550 F.test()
551
552
553if __name__ == "__main__":
554 main(get_cmd_args())