| #!/usr/bin/env python3 |
| # |
| # Copyright 2018 The Hafnium Authors. |
| # |
| # Use of this source code is governed by a BSD-style |
| # license that can be found in the LICENSE file or at |
| # https://opensource.org/licenses/BSD-3-Clause. |
| |
| """Script which drives invocation of tests and parsing their output to produce |
| a results report. |
| """ |
| |
| from __future__ import print_function |
| |
| import xml.etree.ElementTree as ET |
| |
| import argparse |
| from abc import ABC, abstractmethod |
| import collections |
| import datetime |
| import importlib |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| import time |
| import fdt |
| from telnetlib import Telnet |
| |
| HFTEST_LOG_PREFIX = "[hftest] " |
| HFTEST_LOG_FAILURE_PREFIX = "Failure:" |
| HFTEST_LOG_FINISHED = "FINISHED" |
| |
| HFTEST_CTRL_GET_COMMAND_LINE = "[hftest_ctrl:get_command_line]" |
| HFTEST_CTRL_FINISHED = "[hftest_ctrl:finished]" |
| |
| HF_ROOT = os.path.dirname(os.path.dirname(os.path.dirname( |
| os.path.abspath(__file__)))) |
| DTC_SCRIPT = os.path.join(HF_ROOT, "build", "image", "dtc.py") |
| FVP_BINARY = os.path.join( |
| os.path.dirname(HF_ROOT), "fvp", "Base_RevC_AEMvA_pkg", "models", |
| "Linux64_GCC-9.3", "FVP_Base_RevC-2xAEMvA") |
| HF_PREBUILTS = os.path.join(HF_ROOT, "prebuilts") |
| FVP_PREBUILTS_TFA_TRUSTY_ROOT = os.path.join( |
| HF_PREBUILTS, "linux-aarch64", "trusted-firmware-a-trusty", "fvp") |
| FVP_PREBUILT_DTS = os.path.join( |
| FVP_PREBUILTS_TFA_TRUSTY_ROOT, "fvp-base-gicv3-psci-1t.dts") |
| |
| FVP_PREBUILT_TFA_ROOT = os.path.join( |
| HF_PREBUILTS, "linux-aarch64", "trusted-firmware-a", "fvp") |
| |
| VM_NODE_REGEX = "vm[1-9]" |
| |
| def read_file(path): |
| with open(path, "r") as f: |
| return f.read() |
| |
| def write_file(path, to_write, append=False): |
| with open(path, "a" if append else "w") as f: |
| f.write(to_write) |
| |
| def append_file(path, to_write): |
| write_file(path, to_write, append=True) |
| |
| def join_if_not_None(*args): |
| return " ".join(filter(lambda x: x, args)) |
| |
| def get_vm_node_from_manifest(dts : str): |
| """ Get VM node string from Partition's extension to Partition Manager's |
| manifest.""" |
| match = re.search(VM_NODE_REGEX, dts) |
| if not match: |
| raise Exception("Partition's node is not defined in its manifest.") |
| return match.group() |
| |
| def correct_vm_node(dts: str, node_index : int): |
| """ The vm node is being appended to the Partition Manager manifests. |
| Ideally, these files would be reused accross various test set-ups.""" |
| return dts.replace(get_vm_node_from_manifest(dts), f"vm{node_index}") |
| |
| DT = collections.namedtuple("DT", ["dts", "dtb"]) |
| |
| class ArtifactsManager: |
| """Class which manages folder with test artifacts.""" |
| |
| def __init__(self, log_dir): |
| self.created_files = [] |
| self.log_dir = log_dir |
| |
| # Create directory. |
| try: |
| os.makedirs(self.log_dir) |
| except OSError: |
| if not os.path.isdir(self.log_dir): |
| raise |
| print("Logs saved under", log_dir) |
| |
| # Create files expected by the Sponge test result parser. |
| self.sponge_log_path = self.create_file("sponge_log", ".log") |
| self.sponge_xml_path = self.create_file("sponge_log", ".xml") |
| |
| def gen_file_path(self, basename, extension): |
| """Generate path to a file in the log directory.""" |
| return os.path.join(self.log_dir, basename + extension) |
| |
| def create_file(self, basename, extension): |
| """Create and touch a new file in the log folder. Ensure that no other |
| file of the same name was created by this instance of ArtifactsManager. |
| """ |
| # Determine the path of the file. |
| path = self.gen_file_path(basename, extension) |
| |
| # Check that the path is unique. |
| assert(path not in self.created_files) |
| self.created_files += [ path ] |
| |
| # Touch file. |
| with open(path, "w") as f: |
| pass |
| |
| return path |
| |
| def get_file(self, basename, extension): |
| """Return path to a file in the log folder. Assert that it was created |
| by this instance of ArtifactsManager.""" |
| path = self.gen_file_path(basename, extension) |
| assert(path in self.created_files) |
| return path |
| |
| |
| # Tuple holding the arguments common to all driver constructors. |
| # This is to avoid having to pass arguments from subclasses to superclasses. |
| DriverArgs = collections.namedtuple("DriverArgs", [ |
| "artifacts", |
| "hypervisor", |
| "spmc", |
| "initrd", |
| "vm_args", |
| "cpu", |
| "partitions", |
| "global_run_name", |
| ]) |
| |
| # State shared between the common Driver class and its subclasses during |
| # a single invocation of the target platform. |
| class DriverRunState: |
| def __init__(self, log_path): |
| self.log_path = log_path |
| self.ret_code = 0 |
| |
| def set_ret_code(self, ret_code): |
| self.ret_code = ret_code |
| |
| class DriverRunException(Exception): |
| """Exception thrown if subprocess invoked by a driver returned non-zero |
| status code. Used to fast-exit from a driver command sequence.""" |
| pass |
| |
| |
| class Driver: |
| """Parent class of drivers for all testable platforms.""" |
| |
| def __init__(self, args): |
| self.args = args |
| |
| def get_run_log(self, run_name): |
| """Return path to the main log of a given test run.""" |
| return self.args.artifacts.get_file(run_name, ".log") |
| |
| def start_run(self, run_name): |
| """Hook called by Driver subclasses before they invoke the target |
| platform.""" |
| return DriverRunState(self.args.artifacts.create_file(run_name, ".log")) |
| |
| def exec_logged(self, run_state, exec_args, cwd=None): |
| """Run a subprocess on behalf of a Driver subclass and append its |
| stdout and stderr to the main log.""" |
| assert(run_state.ret_code == 0) |
| with open(run_state.log_path, "a") as f: |
| f.write("$ {}\r\n".format(" ".join(exec_args))) |
| f.flush() |
| ret_code = subprocess.call(exec_args, stdout=f, stderr=f, cwd=cwd) |
| if ret_code != 0: |
| run_state.set_ret_code(ret_code) |
| raise DriverRunException() |
| |
| def finish_run(self, run_state): |
| """Hook called by Driver subclasses after they finished running the |
| target platform. `ret_code` argument is the return code of the main |
| command run by the driver. A corresponding log message is printed.""" |
| # Decode return code and add a message to the log. |
| with open(run_state.log_path, "a") as f: |
| if run_state.ret_code == 124: |
| f.write("\r\n{}{} timed out\r\n".format( |
| HFTEST_LOG_PREFIX, HFTEST_LOG_FAILURE_PREFIX)) |
| elif run_state.ret_code != 0: |
| f.write("\r\n{}{} process return code {}\r\n".format( |
| HFTEST_LOG_PREFIX, HFTEST_LOG_FAILURE_PREFIX, |
| run_state.ret_code)) |
| |
| # Append log of this run to full test log. |
| log_content = read_file(run_state.log_path) |
| append_file( |
| self.args.artifacts.sponge_log_path, |
| log_content + "\r\n\r\n") |
| return log_content |
| |
| |
| class QemuDriver(Driver): |
| """Driver which runs tests in QEMU.""" |
| |
| def __init__(self, args, qemu_wd, tfa): |
| Driver.__init__(self, args) |
| self.qemu_wd = qemu_wd |
| self.tfa = tfa |
| |
| def gen_exec_args(self, test_args, is_long_running): |
| """Generate command line arguments for QEMU.""" |
| time_limit = "120s" if is_long_running else "10s" |
| # If no CPU configuration is selected, then test against the maximum |
| # configuration, "max", supported by QEMU. |
| cpu = self.args.cpu or "max" |
| exec_args = [ |
| "timeout", "--foreground", time_limit, |
| os.path.abspath("prebuilts/linux-x64/qemu/qemu-system-aarch64"), |
| "-no-reboot", "-machine", "virt,virtualization=on,gic-version=3", |
| "-cpu", cpu, "-smp", "4", "-m", "1G", |
| "-nographic", "-nodefaults", "-serial", "stdio", |
| "-d", "unimp", "-kernel", os.path.abspath(self.args.hypervisor), |
| ] |
| |
| if self.tfa: |
| bl1_path = os.path.join( |
| HF_PREBUILTS, "linux-aarch64", "trusted-firmware-a-trusty", |
| "qemu", "bl1.bin") |
| exec_args += ["-bios", |
| os.path.abspath(bl1_path), |
| "-machine", "secure=on", "-semihosting-config", |
| "enable=on,target=native"] |
| |
| if self.args.initrd: |
| exec_args += ["-initrd", os.path.abspath(self.args.initrd)] |
| |
| vm_args = join_if_not_None(self.args.vm_args, test_args) |
| if vm_args: |
| exec_args += ["-append", vm_args] |
| |
| return exec_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| """Run test given by `test_args` in QEMU.""" |
| run_state = self.start_run(run_name) |
| |
| try: |
| # Execute test in QEMU.. |
| exec_args = self.gen_exec_args(test_args, is_long_running) |
| self.exec_logged(run_state, exec_args, |
| cwd=self.qemu_wd) |
| except DriverRunException: |
| pass |
| |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| pass |
| |
| class FvpDriver(Driver, ABC): |
| """Base class for driver which runs tests in Arm FVP emulator.""" |
| |
| def __init__(self, args): |
| if args.cpu: |
| raise ValueError("FVP emulator does not support the --cpu option.") |
| super().__init__(args) |
| |
| @property |
| @abstractmethod |
| def CPU_START_ADDRESS(self): |
| pass |
| |
| @property |
| @abstractmethod |
| def FVP_PREBUILT_BL31(self): |
| pass |
| |
| def create_dt(self, run_name : str): |
| """Create DT related files, and return respective paths in a tuple |
| (dts,dtb)""" |
| return DT(self.args.artifacts.create_file(run_name, ".dts"), |
| self.args.artifacts.create_file(run_name, ".dtb")) |
| |
| def compile_dt(self, run_state, dt : DT): |
| """Compile DT calling dtc.""" |
| dtc_args = [ |
| DTC_SCRIPT, "compile", "-i", dt.dts, "-o", dt.dtb, |
| ] |
| self.exec_logged(run_state, dtc_args) |
| |
| def create_uart_log(self, run_name : str, file_name : str): |
| """Create uart log file, and return path""" |
| return self.args.artifacts.create_file(run_name, file_name) |
| |
| def get_img_and_ldadd(self, partitions : dict): |
| ret = [] |
| for i, p in enumerate(partitions): |
| with open(p["dts"], "r") as dt: |
| dts = dt.read() |
| manifest = fdt.parse_dts(dts) |
| vm_node = get_vm_node_from_manifest(dts) |
| load_address = manifest.get_property("load_address", |
| f"/hypervisor/{vm_node}").value |
| ret.append((p["img"], load_address)) |
| return ret |
| |
| def get_manifests_from_json(self, partitions : list): |
| manifests = "" |
| if partitions is not None: |
| for i, p in enumerate(partitions): |
| manifests += correct_vm_node(read_file(p["dts"]), i + 1) |
| return manifests |
| |
| @abstractmethod |
| def gen_dts(self, dt, test_args): |
| """Abstract method to generate dts file. This specific to the use case |
| so should be implemented within derived driver""" |
| pass |
| |
| @abstractmethod |
| def gen_fvp_args( |
| self, is_long_running, uart0_log_path, uart1_log_path, dt): |
| """Generate command line arguments for FVP.""" |
| time_limit = "80s" if is_long_running else "40s" |
| fvp_args = [ |
| "timeout", "--foreground", time_limit, |
| FVP_BINARY, |
| "-C", "pci.pci_smmuv3.mmu.SMMU_AIDR=2", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_IDR0=0x0046123B", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_IDR1=0x00600002", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_IDR3=0x1714", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_IDR5=0xFFFF0472", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_S_IDR1=0xA0000002", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_S_IDR2=0", |
| "-C", "pci.pci_smmuv3.mmu.SMMU_S_IDR3=0", |
| "-C", "pctl.startup=0.0.0.0", |
| "-C", "bp.secure_memory=1", |
| "-C", "cluster0.NUM_CORES=4", |
| "-C", "cluster1.NUM_CORES=4", |
| "-C", "cache_state_modelled=0", |
| "-C", "bp.vis.disable_visualisation=true", |
| "-C", "bp.vis.rate_limit-enable=false", |
| "-C", "bp.terminal_0.start_telnet=false", |
| "-C", "bp.terminal_1.start_telnet=false", |
| "-C", "bp.terminal_2.start_telnet=false", |
| "-C", "bp.terminal_3.start_telnet=false", |
| "-C", "bp.pl011_uart0.untimed_fifos=1", |
| "-C", "bp.pl011_uart0.unbuffered_output=1", |
| "-C", f"cluster0.cpu0.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster0.cpu1.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster0.cpu2.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster0.cpu3.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster1.cpu0.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster1.cpu1.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster1.cpu2.RVBAR={self.CPU_START_ADDRESS}", |
| "-C", f"cluster1.cpu3.RVBAR={self.CPU_START_ADDRESS}", |
| "--data", |
| f"cluster0.cpu0={self.FVP_PREBUILT_BL31}@{self.CPU_START_ADDRESS}", |
| "-C", "bp.ve_sysregs.mmbSiteDefault=0", |
| "-C", "bp.ve_sysregs.exit_on_shutdown=1", |
| "-C", "cluster0.has_arm_v8-5=1", |
| "-C", "cluster1.has_arm_v8-5=1", |
| "-C", "cluster0.has_branch_target_exception=1", |
| "-C", "cluster1.has_branch_target_exception=1", |
| "-C", "cluster0.memory_tagging_support_level=2", |
| "-C", "cluster1.memory_tagging_support_level=2", |
| "-C", "bp.dram_metadata.is_enabled=1", |
| ] |
| |
| if uart0_log_path and uart1_log_path: |
| fvp_args += [ |
| "-C", f"bp.pl011_uart0.out_file={uart0_log_path}", |
| "-C", f"bp.pl011_uart1.out_file={uart1_log_path}", |
| ] |
| return fvp_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| """ Run test """ |
| run_state = self.start_run(run_name) |
| dt = self.create_dt(run_name) |
| uart0_log_path = self.create_uart_log(run_name, ".uart0.log") |
| uart1_log_path = self.create_uart_log(run_name, ".uart1.log") |
| |
| try: |
| self.gen_dts(dt, test_args) |
| self.compile_dt(run_state, dt) |
| fvp_args = self.gen_fvp_args(is_long_running, uart0_log_path, |
| uart1_log_path, dt) |
| self.exec_logged(run_state, fvp_args) |
| except DriverRunException: |
| pass |
| |
| # Append UART0 output to main log. |
| append_file(run_state.log_path, read_file(uart0_log_path)) |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| pass |
| |
| class FvpDriverHypervisor(FvpDriver): |
| """ |
| Driver which runs tests in Arm FVP emulator, with hafnium as hypervisor |
| """ |
| INITRD_START= 0x84000000 |
| INITRD_END = 0x86000000 #Default value, however may change if initrd in args |
| |
| def __init__(self, args): |
| self.vms_in_partitions_json = args.partitions and args.partitions["VMs"] |
| super().__init__(args) |
| |
| @property |
| def CPU_START_ADDRESS(self): |
| return "0x04020000" |
| |
| @property |
| def FVP_PREBUILT_BL31(self): |
| return os.path.join(FVP_PREBUILTS_TFA_TRUSTY_ROOT, "bl31.bin") |
| |
| @property |
| def HYPERVISOR_ADDRESS(self): |
| return "0x80000000" |
| |
| @property |
| def HYPERVISOR_DTB_ADDRESS(self): |
| return "0x82000000" |
| |
| def gen_dts(self, dt, test_args): |
| """Create a DeviceTree source which will be compiled into a DTB and |
| passed to FVP for a test run.""" |
| |
| vm_args = join_if_not_None(self.args.vm_args, test_args) |
| write_file(dt.dts, read_file(FVP_PREBUILT_DTS)) |
| |
| # Write the vm arguments to the partition manifest |
| to_append = f""" |
| / {{ |
| chosen {{ |
| bootargs = "{vm_args}"; |
| stdout-path = "serial0:115200n8"; |
| linux,initrd-start = <{self.INITRD_START if self.args.initrd else 0}>; |
| linux,initrd-end = <{self.INITRD_END if self.args.initrd else 0}>; |
| }}; |
| }};""" |
| if self.vms_in_partitions_json: |
| to_append += self.get_manifests_from_json(self.args.partitions["VMs"]) |
| |
| append_file(dt.dts, to_append) |
| |
| def gen_fvp_args( |
| self, is_long_running, uart0_log_path, uart1_log_path, dt, call_super = True): |
| """Generate command line arguments for FVP.""" |
| common_args = (self, is_long_running, uart0_log_path, uart1_log_path, dt) |
| fvp_args = FvpDriver.gen_fvp_args(*common_args) if call_super else [] |
| |
| fvp_args += [ |
| "--data", f"cluster0.cpu0={dt.dtb}@{self.HYPERVISOR_DTB_ADDRESS}", |
| "--data", f"cluster0.cpu0={self.args.hypervisor}@{self.HYPERVISOR_ADDRESS}", |
| ] |
| |
| if self.vms_in_partitions_json: |
| img_ldadd = self.get_img_and_ldadd(self.args.partitions["VMs"]) |
| for img, ldadd in img_ldadd: |
| fvp_args += ["--data", f"cluster0.cpu0={img}@{hex(ldadd)}"] |
| |
| if self.args.initrd: |
| fvp_args += [ |
| "--data", |
| f"cluster0.cpu0={self.args.initrd}@{self.INITRD_START}" |
| ] |
| return fvp_args |
| |
| class FvpDriverSPMC(FvpDriver): |
| """ |
| Driver which runs tests in Arm FVP emulator, with hafnium as SPMC |
| """ |
| FVP_PREBUILT_SECURE_DTS = os.path.join( |
| HF_ROOT, "test", "vmapi", "fvp-base-spmc.dts") |
| HFTEST_CMD_FILE = os.path.join("/tmp/", "hftest_cmds") |
| |
| def __init__(self, args): |
| if args.partitions is None or args.partitions["SPs"] is None: |
| raise Exception("Need to provide SPs in partitions_json") |
| super().__init__(args) |
| |
| @property |
| def CPU_START_ADDRESS(self): |
| return "0x04010000" |
| |
| @property |
| def FVP_PREBUILT_BL31(self): |
| return os.path.join(FVP_PREBUILT_TFA_ROOT, "bl31_spmd.bin") |
| |
| @property |
| def SPMC_ADDRESS(self): |
| return "0x6000000" |
| |
| @property |
| def SPMC_DTB_ADDRESS(self): |
| return "0x0403f000" |
| |
| def gen_dts(self, dt, test_args): |
| """Create a DeviceTree source which will be compiled into a DTB and |
| passed to FVP for a test run.""" |
| to_append = self.get_manifests_from_json(self.args.partitions["SPs"]) |
| write_file(dt.dts, read_file(FvpDriverSPMC.FVP_PREBUILT_SECURE_DTS)) |
| append_file(dt.dts, to_append) |
| |
| def gen_fvp_args( |
| self, is_long_running, uart0_log_path, uart1_log_path, dt, |
| call_super = True, secure_ctrl = True): |
| """Generate command line arguments for FVP.""" |
| common_args = (self, is_long_running, uart0_log_path, uart1_log_path, dt.dtb) |
| fvp_args = FvpDriver.gen_fvp_args(*common_args) if call_super else [] |
| |
| fvp_args += [ |
| "--data", f"cluster0.cpu0={dt.dtb}@{self.SPMC_DTB_ADDRESS}", |
| "--data", f"cluster0.cpu0={self.args.spmc}@{self.SPMC_ADDRESS}", |
| ] |
| |
| if secure_ctrl: |
| fvp_args += [ |
| "-C", f"bp.pl011_uart0.in_file={FvpDriverSPMC.HFTEST_CMD_FILE}", |
| "-C", f"bp.pl011_uart0.shutdown_tag=\"{HFTEST_CTRL_FINISHED}\"", |
| ] |
| |
| img_ldadd = self.get_img_and_ldadd(self.args.partitions["SPs"]) |
| for img, ldadd in img_ldadd: |
| fvp_args += ["--data", f"cluster0.cpu0={img}@{hex(ldadd)}"] |
| |
| return fvp_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| with open(FvpDriverSPMC.HFTEST_CMD_FILE, "w+") as f: |
| vm_args = join_if_not_None(self.args.vm_args, test_args) |
| f.write(f"{vm_args}\n") |
| return super().run(run_name, test_args, is_long_running) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| os.remove(FvpDriverSPMC.HFTEST_CMD_FILE) |
| |
| class FvpDriverBothWorlds(FvpDriverHypervisor, FvpDriverSPMC): |
| def __init__(self, args): |
| FvpDriverHypervisor.__init__(self, args) |
| FvpDriverSPMC.__init__(self, args) |
| |
| @property |
| def CPU_START_ADDRESS(self): |
| return str(0x04010000) |
| |
| @property |
| def FVP_PREBUILT_BL31(self): |
| return str(os.path.join(FVP_PREBUILT_TFA_ROOT, "bl31_spmd.bin")) |
| |
| def create_dt(self, run_name): |
| dt = dict() |
| dt["hypervisor"] = FvpDriver.create_dt(self, run_name + "_hypervisor") |
| dt["spmc"] = FvpDriver.create_dt(self, run_name + "_spmc") |
| return dt |
| |
| @property |
| def HYPERVISOR_ADDRESS(self): |
| return "0x88000000" |
| |
| @property |
| def HYPERVISOR_DTB_ADDRESS(self): |
| return "0x82000000" |
| |
| def compile_dt(self, run_state, dt): |
| FvpDriver.compile_dt(self, run_state, dt["hypervisor"]) |
| FvpDriver.compile_dt(self, run_state, dt["spmc"]) |
| |
| def gen_dts(self, dt, test_args): |
| FvpDriverHypervisor.gen_dts(self, dt["hypervisor"], test_args) |
| FvpDriverSPMC.gen_dts(self, dt["spmc"], test_args) |
| |
| def gen_fvp_args(self, is_long_running, uart0_log_path, uart1_log_path, dt): |
| """Generate command line arguments for FVP.""" |
| common_args = (self, is_long_running, uart0_log_path, uart1_log_path) |
| fvp_args = FvpDriverHypervisor.gen_fvp_args(*common_args, dt["hypervisor"]) |
| fvp_args += FvpDriverSPMC.gen_fvp_args(*common_args, dt["spmc"], False, |
| False) |
| return fvp_args |
| |
| def run(self, run_name, test_args, is_long_running): |
| return FvpDriver.run(self, run_name, test_args, is_long_running) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| FvpDriver.finish(self) |
| |
| class SerialDriver(Driver): |
| """Driver which communicates with a device over the serial port.""" |
| |
| def __init__(self, args, tty_file, baudrate, init_wait): |
| Driver.__init__(self, args) |
| self.tty_file = tty_file |
| self.baudrate = baudrate |
| self.pyserial = importlib.import_module("serial") |
| |
| if init_wait: |
| input("Press ENTER and then reset the device...") |
| |
| def connect(self): |
| return self.pyserial.Serial(self.tty_file, self.baudrate, timeout=10) |
| |
| def run(self, run_name, test_args, is_long_running): |
| """Communicate `test_args` to the device over the serial port.""" |
| run_state = self.start_run(run_name) |
| |
| with self.connect() as ser: |
| with open(run_state.log_path, "a") as f: |
| while True: |
| # Read one line from the serial port. |
| line = ser.readline().decode('utf-8') |
| if len(line) == 0: |
| # Timeout |
| run_state.set_ret_code(124) |
| input("Timeout. " + |
| "Press ENTER and then reset the device...") |
| break |
| # Write the line to the log file. |
| f.write(line) |
| if HFTEST_CTRL_GET_COMMAND_LINE in line: |
| # Device is waiting for `test_args`. |
| ser.write(test_args.encode('ascii')) |
| ser.write(b'\r') |
| elif HFTEST_CTRL_FINISHED in line: |
| # Device has finished running this test and will reboot. |
| break |
| |
| return self.finish_run(run_state) |
| |
| def finish(self): |
| """Clean up after running tests.""" |
| with self.connect() as ser: |
| while True: |
| line = ser.readline().decode('utf-8') |
| if len(line) == 0: |
| input("Timeout. Press ENTER and then reset the device...") |
| elif HFTEST_CTRL_GET_COMMAND_LINE in line: |
| # Device is waiting for a command. Instruct it to exit |
| # the test environment. |
| ser.write("exit".encode('ascii')) |
| ser.write(b'\r') |
| break |
| |
| # Tuple used to return information about the results of running a set of tests. |
| TestRunnerResult = collections.namedtuple("TestRunnerResult", [ |
| "tests_run", |
| "tests_failed", |
| "tests_skipped", |
| ]) |
| |
| class TestRunner: |
| """Class which communicates with a test platform to obtain a list of |
| available tests and driving their execution.""" |
| |
| def __init__(self, artifacts, driver, test_set_up, suite_regex, test_regex, |
| skip_long_running_tests, force_long_running): |
| self.artifacts = artifacts |
| self.driver = driver |
| self.test_set_up = test_set_up |
| self.skip_long_running_tests = skip_long_running_tests |
| self.force_long_running = force_long_running |
| |
| self.suite_re = re.compile(suite_regex or ".*") |
| self.test_re = re.compile(test_regex or ".*") |
| |
| def extract_hftest_lines(self, raw): |
| """Extract hftest-specific lines from a raw output from an invocation |
| of the test platform.""" |
| lines = [] |
| lines_to_process = raw.splitlines() |
| |
| try: |
| # If logs have logs of more than one VM, the loop below to extract |
| # lines won't work. Thus, extracting between starting and ending |
| # logs: HFTEST_CTRL_GET_COMMAND_LINE and HFTEST_CTRL_FINISHED. |
| hftest_start = lines_to_process.index(HFTEST_CTRL_GET_COMMAND_LINE) + 1 |
| hftest_end = lines_to_process.index(HFTEST_CTRL_FINISHED) |
| except ValueError: |
| hftest_start = 0 |
| hftest_end = len(lines_to_process) |
| |
| lines_to_process = lines_to_process[hftest_start : hftest_end] |
| |
| for line in lines_to_process: |
| match = re.search(f"^VM \d+: ", line) |
| if match is not None: |
| line = line[match.end():] |
| if line.startswith(HFTEST_LOG_PREFIX): |
| lines.append(line[len(HFTEST_LOG_PREFIX):]) |
| return lines |
| |
| def get_test_json(self): |
| """Invoke the test platform and request a JSON of available test and |
| test suites.""" |
| out = self.driver.run("json", "json", self.force_long_running) |
| hf_out = "\n".join(self.extract_hftest_lines(out)) |
| try: |
| return json.loads(hf_out) |
| except ValueError as e: |
| print(out) |
| raise e |
| |
| def collect_results(self, fn, it, xml_node): |
| """Run `fn` on every entry in `it` and collect their TestRunnerResults. |
| Insert "tests" and "failures" nodes to `xml_node`.""" |
| tests_run = 0 |
| tests_failed = 0 |
| tests_skipped = 0 |
| start_time = time.perf_counter() |
| for i in it: |
| sub_result = fn(i) |
| assert(sub_result.tests_run >= sub_result.tests_failed) |
| tests_run += sub_result.tests_run |
| tests_failed += sub_result.tests_failed |
| tests_skipped += sub_result.tests_skipped |
| elapsed_time = time.perf_counter() - start_time |
| |
| xml_node.set("tests", str(tests_run + tests_skipped)) |
| xml_node.set("failures", str(tests_failed)) |
| xml_node.set("skipped", str(tests_skipped)) |
| xml_node.set("time", str(elapsed_time)) |
| return TestRunnerResult(tests_run, tests_failed, tests_skipped) |
| |
| def is_passed_test(self, test_out): |
| """Parse the output of a test and return True if it passed.""" |
| return \ |
| len(test_out) > 0 and \ |
| test_out[-1] == HFTEST_LOG_FINISHED and \ |
| not any(l.startswith(HFTEST_LOG_FAILURE_PREFIX) for l in test_out) |
| |
| def get_failure_message(self, test_out): |
| """Parse the output of a test and return the message of the first |
| assertion failure.""" |
| for i, line in enumerate(test_out): |
| if line.startswith(HFTEST_LOG_FAILURE_PREFIX) and i + 1 < len(test_out): |
| # The assertion message is on the line after the 'Failure:' |
| return test_out[i + 1].strip() |
| |
| return None |
| |
| def get_log_name(self, suite, test): |
| """Returns a string with a generated log name for the test.""" |
| log_name = "" |
| |
| cpu = self.driver.args.cpu |
| if cpu: |
| log_name += cpu + "." |
| |
| log_name += suite["name"] + "." + test["name"] |
| |
| return log_name |
| |
| def run_test(self, suite, test, suite_xml): |
| """Invoke the test platform and request to run a given `test` in given |
| `suite`. Create a new XML node with results under `suite_xml`. |
| Test only invoked if it matches the regex given to constructor.""" |
| if not self.test_re.match(test["name"]): |
| return TestRunnerResult(tests_run=0, tests_failed=0, tests_skipped=0) |
| |
| test_xml = ET.SubElement(suite_xml, "testcase") |
| test_xml.set("name", test["name"]) |
| test_xml.set("classname", suite["name"]) |
| |
| if self.skip_long_running_tests and test["is_long_running"]: |
| print(" SKIP", test["name"]) |
| test_xml.set("status", "notrun") |
| skipped_xml = ET.SubElement(test_xml, "skipped") |
| skipped_xml.set("message", "Long running") |
| return TestRunnerResult(tests_run=0, tests_failed=0, tests_skipped=1) |
| |
| print(" RUN", test["name"]) |
| log_name = self.get_log_name(suite, test) |
| |
| test_xml.set("status", "run") |
| |
| start_time = time.perf_counter() |
| out = self.driver.run( |
| log_name, "run {} {}".format(suite["name"], test["name"]), |
| test["is_long_running"] or self.force_long_running) |
| hftest_out = self.extract_hftest_lines(out) |
| elapsed_time = time.perf_counter() - start_time |
| |
| test_xml.set("time", str(elapsed_time)) |
| |
| system_out_xml = ET.SubElement(test_xml, "system-out") |
| system_out_xml.text = out |
| |
| if self.is_passed_test(hftest_out): |
| print(" PASS") |
| return TestRunnerResult(tests_run=1, tests_failed=0, tests_skipped=0) |
| else: |
| print("[x] FAIL --", self.driver.get_run_log(log_name)) |
| failure_xml = ET.SubElement(test_xml, "failure") |
| failure_message = self.get_failure_message(hftest_out) or "Test failed" |
| failure_xml.set("message", failure_message) |
| failure_xml.text = '\n'.join(hftest_out) |
| return TestRunnerResult(tests_run=1, tests_failed=1, tests_skipped=0) |
| |
| def run_suite(self, suite, xml): |
| """Invoke the test platform and request to run all matching tests in |
| `suite`. Create new XML nodes with results under `xml`. |
| Suite skipped if it does not match the regex given to constructor.""" |
| if not self.suite_re.match(suite["name"]): |
| return TestRunnerResult(tests_run=0, tests_failed=0, tests_skipped=0) |
| |
| print(" SUITE", suite["name"]) |
| suite_xml = ET.SubElement(xml, "testsuite") |
| suite_xml.set("name", suite["name"]) |
| properties_xml = ET.SubElement(suite_xml, "properties") |
| |
| property_xml = ET.SubElement(properties_xml, "property") |
| property_xml.set("name", "driver") |
| property_xml.set("value", type(self.driver).__name__) |
| |
| if self.driver.args.cpu: |
| property_xml = ET.SubElement(properties_xml, "property") |
| property_xml.set("name", "cpu") |
| property_xml.set("value", self.driver.args.cpu) |
| |
| return self.collect_results( |
| lambda test: self.run_test(suite, test, suite_xml), |
| suite["tests"], |
| suite_xml) |
| |
| def run_tests(self): |
| """Run all suites and tests matching regexes given to constructor. |
| Write results to sponge log XML. Return the number of run and failed |
| tests.""" |
| |
| test_spec = self.get_test_json() |
| timestamp = datetime.datetime.now().replace(microsecond=0).isoformat() |
| |
| xml = ET.Element("testsuites") |
| xml.set("name", self.test_set_up) |
| xml.set("timestamp", timestamp) |
| |
| result = self.collect_results( |
| lambda suite: self.run_suite(suite, xml), |
| test_spec["suites"], |
| xml) |
| |
| # Write XML to file. |
| ET.ElementTree(xml).write(self.artifacts.sponge_xml_path, |
| encoding='utf-8', xml_declaration=True) |
| |
| if result.tests_failed > 0: |
| print("[x] FAIL:", result.tests_failed, "of", result.tests_run, |
| "tests failed") |
| elif result.tests_run > 0: |
| print(" PASS: all", result.tests_run, "tests passed") |
| |
| # Let the driver clean up. |
| self.driver.finish() |
| |
| return result |
| |
| def Main(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--hypervisor") |
| parser.add_argument("--spmc") |
| parser.add_argument("--log", required=True) |
| parser.add_argument("--out_initrd") |
| parser.add_argument("--out_partitions") |
| parser.add_argument("--initrd") |
| parser.add_argument("--partitions_json") |
| parser.add_argument("--suite") |
| parser.add_argument("--test") |
| parser.add_argument("--vm_args") |
| parser.add_argument("--driver", default="qemu") |
| parser.add_argument("--serial-dev", default="/dev/ttyUSB0") |
| parser.add_argument("--serial-baudrate", type=int, default=115200) |
| parser.add_argument("--serial-no-init-wait", action="store_true") |
| parser.add_argument("--skip-long-running-tests", action="store_true") |
| parser.add_argument("--force-long-running", action="store_true") |
| parser.add_argument("--cpu", |
| help="Selects the CPU configuration for the run environment.") |
| parser.add_argument("--tfa", action="store_true") |
| args = parser.parse_args() |
| |
| # Create class which will manage all test artifacts. |
| if args.hypervisor and args.spmc: |
| test_set_up = "hypervisor_and_spmc" |
| elif args.hypervisor: |
| test_set_up = "hypervisor" |
| elif args.spmc: |
| test_set_up = "spmc" |
| else: |
| raise Exception("No Hafnium image provided!\n") |
| |
| initrd = None |
| if args.hypervisor and args.initrd: |
| initrd_dir = os.path.join(args.out_initrd, "obj", args.initrd) |
| initrd = os.path.join(initrd_dir, "initrd.img") |
| test_set_up += "_" + args.initrd |
| vm_args = args.vm_args or "" |
| |
| partitions = None |
| global_run_name = None |
| if args.driver == "fvp": |
| if args.partitions_json is not None: |
| partitions_dir = os.path.join( |
| args.out_partitions, "obj", args.partitions_json) |
| partitions = json.load(open(partitions_dir, "r")) |
| global_run_name = os.path.basename(args.partitions_json).split(".")[0] |
| elif args.hypervisor: |
| if args.initrd: |
| global_run_name = os.path.basename(args.initrd) |
| else: |
| global_run_name = os.path.basename(args.hypervisor).split(".")[0] |
| |
| # Create class which will manage all test artifacts. |
| log_dir = os.path.join(args.log, test_set_up) |
| artifacts = ArtifactsManager(log_dir) |
| |
| # Create a driver for the platform we want to test on. |
| driver_args = DriverArgs(artifacts, args.hypervisor, args.spmc, initrd, |
| vm_args, args.cpu, partitions, global_run_name) |
| |
| if args.spmc: |
| # So far only FVP supports tests for SPMC. |
| if args.driver != "fvp": |
| raise Exception("Secure tests can only run with fvp driver") |
| |
| if args.hypervisor: |
| driver = FvpDriverBothWorlds(driver_args) |
| else: |
| driver = FvpDriverSPMC(driver_args) |
| elif args.hypervisor: |
| if args.driver == "qemu": |
| out = os.path.dirname(args.hypervisor) |
| driver = QemuDriver(driver_args, out, args.tfa) |
| elif args.driver == "fvp": |
| driver = FvpDriverHypervisor(driver_args) |
| elif args.driver == "serial": |
| driver = SerialDriver(driver_args, args.serial_dev, |
| args.serial_baudrate, not args.serial_no_init_wait) |
| else: |
| raise Exception("Unknown driver name: {}".format(args.driver)) |
| else: |
| raise Exception("No Hafnium image provided!\n") |
| |
| # Create class which will drive test execution. |
| runner = TestRunner(artifacts, driver, test_set_up, args.suite, args.test, |
| args.skip_long_running_tests, args.force_long_running) |
| |
| # Run tests. |
| runner_result = runner.run_tests() |
| |
| # Print error message if no tests were run as this is probably unexpected. |
| # Return suitable error code. |
| if runner_result.tests_run == 0: |
| print("Error: no tests match") |
| return 10 |
| elif runner_result.tests_failed > 0: |
| return 1 |
| else: |
| return 0 |
| |
| if __name__ == "__main__": |
| sys.exit(Main()) |