Tests: Both worlds tests controlled via UART

In order to reduce SPMC's tests execution time, refactor test driver
'FvpDriverBothWorlds' to communicate with PVM via uart towards
controlling test flow.
Prior to this change, hftest.py would re-spawn the fvp model for every
test, which seemed to be quite slow. Controlling test execution via
uart, would allow for the new test scenario to start straight after a
reboot. This has shown to be quicker, and improves test infrastructure
usability to test the SPMC.

Change-Id: I508b1cd7a6c26c557be667081e534794435b68c4
Signed-off-by: J-Alves <joao.alves@arm.com>
diff --git a/test/hftest/hftest.py b/test/hftest/hftest.py
index ce30a7f..3328a5c 100755
--- a/test/hftest/hftest.py
+++ b/test/hftest/hftest.py
@@ -26,6 +26,7 @@
 import sys
 import time
 import fdt
+from telnetlib import Telnet
 
 HFTEST_LOG_PREFIX = "[hftest] "
 HFTEST_LOG_FAILURE_PREFIX = "Failure:"
@@ -137,7 +138,8 @@
         "initrd",
         "vm_args",
         "cpu",
-        "partitions"
+        "partitions",
+        "global_run_name",
     ])
 
 # State shared between the common Driver class and its subclasses during
@@ -353,8 +355,6 @@
             "-C", "bp.terminal_3.start_telnet=false",
             "-C", "bp.pl011_uart0.untimed_fifos=1",
             "-C", "bp.pl011_uart0.unbuffered_output=1",
-            "-C", f"bp.pl011_uart0.out_file={uart0_log_path}",
-            "-C", f"bp.pl011_uart1.out_file={uart1_log_path}",
             "-C", f"cluster0.cpu0.RVBAR={self.CPU_START_ADDRESS}",
             "-C", f"cluster0.cpu1.RVBAR={self.CPU_START_ADDRESS}",
             "-C", f"cluster0.cpu2.RVBAR={self.CPU_START_ADDRESS}",
@@ -368,6 +368,12 @@
             "-C", "bp.ve_sysregs.mmbSiteDefault=0",
             "-C", "bp.ve_sysregs.exit_on_shutdown=1",
         ]
+
+        if uart0_log_path and uart1_log_path:
+            fvp_args += [
+                "-C", f"bp.pl011_uart0.out_file={uart0_log_path}",
+                "-C", f"bp.pl011_uart1.out_file={uart1_log_path}",
+            ]
         return fvp_args
 
     def run(self, run_name, test_args, is_long_running):
@@ -504,7 +510,7 @@
 
     def gen_fvp_args(
         self, is_long_running, uart0_log_path, uart1_log_path, dt,
-        call_super = True):
+        call_super = True, secure_ctrl = True):
         """Generate command line arguments for FVP."""
         common_args = (self, is_long_running, uart0_log_path, uart1_log_path, dt.dtb)
         fvp_args = FvpDriver.gen_fvp_args(*common_args) if call_super else []
@@ -518,10 +524,14 @@
             "-C", "cluster1.has_branch_target_exception=1",
             "-C", "cluster0.restriction_on_speculative_execution=2",
             "-C", "cluster1.restriction_on_speculative_execution=2",
-            "-C", f"bp.pl011_uart0.in_file={FvpDriverSPMC.HFTEST_CMD_FILE}",
-            "-C", f"bp.pl011_uart0.shutdown_tag=\"{HFTEST_CTRL_FINISHED}\"",
         ]
 
+        if secure_ctrl:
+            fvp_args += [
+                "-C", f"bp.pl011_uart0.in_file={FvpDriverSPMC.HFTEST_CMD_FILE}",
+                "-C", f"bp.pl011_uart0.shutdown_tag=\"{HFTEST_CTRL_FINISHED}\"",
+            ]
+
         img_ldadd = self.get_img_and_ldadd(self.args.partitions["SPs"])
         for img, ldadd in img_ldadd:
             fvp_args += ["--data", f"cluster0.cpu0={img}@{hex(ldadd)}"]
@@ -543,6 +553,22 @@
         FvpDriverHypervisor.__init__(self, args)
         FvpDriverSPMC.__init__(self, args)
 
+        # Create and build dt
+        dt = self.create_dt(args.global_run_name)
+        self.gen_dts(dt, "")
+        run_state = self.start_run(args.global_run_name + "_dt_compile")
+        self.compile_dt(run_state, dt)
+
+        # Create file to capture model stdout and stderr
+        fvp_out = args.artifacts.create_file(args.global_run_name, ".model.log")
+        self.fvp_out_f = open(fvp_out, "a")
+
+        # Generate the FVP model arguments
+        self.fvp_args = self.gen_fvp_args(None, None, dt)
+
+        self.process = None
+
+
     @property
     def CPU_START_ADDRESS(self):
         return str(0x04010000)
@@ -573,17 +599,79 @@
         FvpDriverHypervisor.gen_dts(self, dt["hypervisor"], test_args)
         FvpDriverSPMC.gen_dts(self, dt["spmc"], test_args)
 
-    def gen_fvp_args(
-        self, is_long_running, uart0_log_path, uart1_log_path, dt):
+    def gen_fvp_args(self, uart0_log_path, uart1_log_path, dt):
         """Generate command line arguments for FVP."""
-        common_args = (self, is_long_running, uart0_log_path, uart1_log_path)
-        fvp_args = FvpDriverSPMC.gen_fvp_args(*common_args, dt["spmc"])
-        fvp_args += FvpDriverHypervisor.gen_fvp_args(*common_args, dt["hypervisor"], False)
-        return fvp_args
+        common_args = (self, None, uart0_log_path, uart1_log_path)
+        fvp_args = FvpDriverHypervisor.gen_fvp_args(*common_args, dt["hypervisor"])
+        fvp_args += FvpDriverSPMC.gen_fvp_args(*common_args, dt["spmc"], False,
+                                               False)
+        # Timeout arguments are expected to be at the beggining of fvp args.
+        # With this driver, the timeouts are going to be managed via the telnet
+        # APIs. Therefore, removing from list of command arguments:
+        return fvp_args[3:]
+
+    def process_start(self):
+        self.process = subprocess.Popen(self.fvp_args,
+                                        stdout = self.fvp_out_f,
+                                        stderr = self.fvp_out_f)
+        # Sleep 1 sec so connect to model via telnet doesn't fail
+        time.sleep(1.0)
+
+    def process_terminate(self):
+        """ Terminate fvp model's process, and reset internal field """
+        self.process.terminate()
+        # To give the system time to terminate the process
+        time.sleep(1.0)
+        self.process = None
+
+    def run(self, run_name, test_args, is_long_running):
+        """ Run test """
+        run_state = self.start_run(run_name)
+        assert(run_state.ret_code == 0)
+
+        if self.process is None:
+            self.process_start()
+
+        test_log = f"{' '.join(self.fvp_args)}\n"
+
+        try:
+            with Telnet("localhost", 5000) as comm:
+                # Obtaining HFTEST_CTRL_GET_COMMAND_LINE in logs should be quick
+                test_log += comm.read_until(
+                    HFTEST_CTRL_GET_COMMAND_LINE.encode("ascii"),
+                    timeout=5.0).decode("ascii")
+
+                if HFTEST_CTRL_GET_COMMAND_LINE in test_log:
+                    # Send command to instruct partition to execute test
+                    comm.write(f"{test_args}\n".encode("ascii"))
+
+                    timeout = 80.0 if is_long_running else 10.0
+                    test_log += comm.read_until(HFTEST_CTRL_FINISHED.encode("ascii"),
+                                                timeout=timeout).decode("ascii")
+                else:
+                    print("VM not ready to fetch test command")
+        except ConnectionError as e:
+            self.finish()
+            raise e
+
+        # Check wether test went well:
+        if HFTEST_CTRL_FINISHED not in test_log:
+            # Terminate process, so it is restarted on the next call to this
+            # function. In this case, the test binaries didn't reset/reboot the
+            # system for the execution of the next test.
+            self.process_terminate()
+            run_state.set_ret_code(124)
+
+        with open(run_state.log_path, "a") as f:
+            f.write(test_log)
+
+        return self.finish_run(run_state)
 
     def finish(self):
         """Clean up after running tests."""
-        pass
+        if self.process is not None:
+            self.process_terminate()
+        self.fvp_out_f.close()
 
 class SerialDriver(Driver):
     """Driver which communicates with a device over the serial port."""
@@ -624,6 +712,7 @@
                     elif HFTEST_CTRL_FINISHED in line:
                         # Device has finished running this test and will reboot.
                         break
+
         return self.finish_run(run_state)
 
     def finish(self):
@@ -894,9 +983,18 @@
     vm_args = args.vm_args or ""
 
     partitions = None
-    if args.driver == "fvp" and args.partitions_json is not None:
-        partitions_dir = os.path.join(args.out_partitions, "obj", args.partitions_json)
-        partitions = json.load(open(partitions_dir, "r"))
+    global_run_name = None
+    if args.driver == "fvp":
+        if args.partitions_json is not None:
+            partitions_dir = os.path.join(
+                args.out_partitions, "obj", args.partitions_json)
+            partitions = json.load(open(partitions_dir, "r"))
+            global_run_name = os.path.basename(args.partitions_json).split(".")[0]
+        elif args.hypervisor:
+            if args.initrd:
+                global_run_name = os.path.basename(args.initrd)
+            else:
+                global_run_name = os.path.basename(args.hypervisor).split(".")[0]
 
     # Create class which will manage all test artifacts.
     log_dir = os.path.join(args.log, test_set_up)
@@ -904,7 +1002,7 @@
 
     # Create a driver for the platform we want to test on.
     driver_args = DriverArgs(artifacts, args.hypervisor, args.spmc, initrd,
-                             vm_args, args.cpu, partitions)
+                             vm_args, args.cpu, partitions, global_run_name)
 
     if args.spmc:
         # So far only FVP supports tests for SPMC.