blob: 7607ba3e3cbe4eba89dfe5a35badae9182b974a9 [file] [log] [blame]
Andrew Scullb4b6d4a2019-01-02 15:54:55 +00001#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26
27
28class PluginMgrTestFail(Exception):
29 def __init__(self, stage, output, message):
30 self.stage = stage
31 self.output = output
32 self.message = message
33
34class PluginMgr:
35 def __init__(self, argparser):
36 super().__init__()
37 self.plugins = {}
38 self.plugin_instances = []
39 self.args = []
40 self.argparser = argparser
41
42 # TODO, put plugins in order
43 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
44 for dirpath, dirnames, filenames in os.walk(plugindir):
45 for fn in filenames:
46 if (fn.endswith('.py') and
47 not fn == '__init__.py' and
48 not fn.startswith('#') and
49 not fn.startswith('.#')):
50 mn = fn[0:-3]
51 foo = importlib.import_module('plugins.' + mn)
52 self.plugins[mn] = foo
53 self.plugin_instances.append(foo.SubPlugin())
54
55 def call_pre_suite(self, testcount, testidlist):
56 for pgn_inst in self.plugin_instances:
57 pgn_inst.pre_suite(testcount, testidlist)
58
59 def call_post_suite(self, index):
60 for pgn_inst in reversed(self.plugin_instances):
61 pgn_inst.post_suite(index)
62
63 def call_pre_case(self, test_ordinal, testid):
64 for pgn_inst in self.plugin_instances:
65 try:
66 pgn_inst.pre_case(test_ordinal, testid)
67 except Exception as ee:
68 print('exception {} in call to pre_case for {} plugin'.
69 format(ee, pgn_inst.__class__))
70 print('test_ordinal is {}'.format(test_ordinal))
71 print('testid is {}'.format(testid))
72 raise
73
74 def call_post_case(self):
75 for pgn_inst in reversed(self.plugin_instances):
76 pgn_inst.post_case()
77
78 def call_pre_execute(self):
79 for pgn_inst in self.plugin_instances:
80 pgn_inst.pre_execute()
81
82 def call_post_execute(self):
83 for pgn_inst in reversed(self.plugin_instances):
84 pgn_inst.post_execute()
85
86 def call_add_args(self, parser):
87 for pgn_inst in self.plugin_instances:
88 parser = pgn_inst.add_args(parser)
89 return parser
90
91 def call_check_args(self, args, remaining):
92 for pgn_inst in self.plugin_instances:
93 pgn_inst.check_args(args, remaining)
94
95 def call_adjust_command(self, stage, command):
96 for pgn_inst in self.plugin_instances:
97 command = pgn_inst.adjust_command(stage, command)
98 return command
99
100 @staticmethod
101 def _make_argparser(args):
102 self.argparser = argparse.ArgumentParser(
103 description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107 """
108 For a given executable command, substitute any known
109 variables contained within NAMES with the correct values
110 """
111 tcmd = Template(cmd)
112 subcmd = tcmd.safe_substitute(NAMES)
113 return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117 """
118 Perform any required modifications on an executable command, then run
119 it in a subprocess and return the results.
120 """
121 if len(command.strip()) == 0:
122 return None, None
123 if '$' in command:
124 command = replace_keywords(command)
125
126 command = pm.call_adjust_command(stage, command)
127 if args.verbose > 0:
128 print('command "{}"'.format(command))
129 proc = subprocess.Popen(command,
130 shell=True,
131 stdout=subprocess.PIPE,
132 stderr=subprocess.PIPE,
133 env=ENVIR)
134 (rawout, serr) = proc.communicate()
135
136 if proc.returncode != 0 and len(serr) > 0:
137 foutput = serr.decode("utf-8", errors="ignore")
138 else:
139 foutput = rawout.decode("utf-8", errors="ignore")
140
141 proc.stdout.close()
142 proc.stderr.close()
143 return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147 """
148 Execute the setup/teardown commands for a test case.
149 Optionally terminate test execution if the command fails.
150 """
151 if args.verbose > 0:
152 print('{}'.format(prefix))
153 for cmdinfo in cmdlist:
154 if isinstance(cmdinfo, list):
155 exit_codes = cmdinfo[1:]
156 cmd = cmdinfo[0]
157 else:
158 exit_codes = [0]
159 cmd = cmdinfo
160
161 if not cmd:
162 continue
163
164 (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166 if proc and (proc.returncode not in exit_codes):
167 print('', file=sys.stderr)
168 print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169 file=sys.stderr)
170 print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171 file=sys.stderr)
172 print("returncode {}; expected {}".format(proc.returncode,
173 exit_codes))
174 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
175 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
176 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
177 raise PluginMgrTestFail(
178 stage, output,
179 '"{}" did not complete successfully'.format(prefix))
180
181def run_one_test(pm, args, index, tidx):
182 global NAMES
183 result = True
184 tresult = ""
185 tap = ""
186 if args.verbose > 0:
187 print("\t====================\n=====> ", end="")
188 print("Test " + tidx["id"] + ": " + tidx["name"])
189
190 # populate NAMES with TESTID for this test
191 NAMES['TESTID'] = tidx['id']
192
193 pm.call_pre_case(index, tidx['id'])
194 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
195
196 if (args.verbose > 0):
197 print('-----> execute stage')
198 pm.call_pre_execute()
199 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
200 if p:
201 exit_code = p.returncode
202 else:
203 exit_code = None
204
205 pm.call_post_execute()
206
207 if (exit_code is None or exit_code != int(tidx["expExitCode"])):
208 result = False
209 print("exit: {!r}".format(exit_code))
210 print("exit: {}".format(int(tidx["expExitCode"])))
211 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
212 print(procout)
213 else:
214 if args.verbose > 0:
215 print('-----> verify stage')
216 match_pattern = re.compile(
217 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
218 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
219 if procout:
220 match_index = re.findall(match_pattern, procout)
221 if len(match_index) != int(tidx["matchCount"]):
222 result = False
223 elif int(tidx["matchCount"]) != 0:
224 result = False
225
226 if not result:
227 tresult += 'not '
228 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
229 tap += tresult
230
231 if result == False:
232 if procout:
233 tap += procout
234 else:
235 tap += 'No output!\n'
236
237 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
238 pm.call_post_case()
239
240 index += 1
241
242 # remove TESTID from NAMES
243 del(NAMES['TESTID'])
244 return tap
245
246def test_runner(pm, args, filtered_tests):
247 """
248 Driver function for the unit tests.
249
250 Prints information about the tests being run, executes the setup and
251 teardown commands and the command under test itself. Also determines
252 success/failure based on the information in the test case and generates
253 TAP output accordingly.
254 """
255 testlist = filtered_tests
256 tcount = len(testlist)
257 index = 1
258 tap = ''
259 badtest = None
260 stage = None
261 emergency_exit = False
262 emergency_exit_message = ''
263
264 if args.notap:
265 if args.verbose:
266 tap = 'notap requested: omitting test plan\n'
267 else:
268 tap = str(index) + ".." + str(tcount) + "\n"
269 try:
270 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
271 except Exception as ee:
272 ex_type, ex, ex_tb = sys.exc_info()
273 print('Exception {} {} (caught in pre_suite).'.
274 format(ex_type, ex))
275 # when the extra print statements are uncommented,
276 # the traceback does not appear between them
277 # (it appears way earlier in the tdc.py output)
278 # so don't bother ...
279 # print('--------------------(')
280 # print('traceback')
281 traceback.print_tb(ex_tb)
282 # print('--------------------)')
283 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
284 emergency_exit = True
285 stage = 'pre-SUITE'
286
287 if emergency_exit:
288 pm.call_post_suite(index)
289 return emergency_exit_message
290 if args.verbose > 1:
291 print('give test rig 2 seconds to stabilize')
292 time.sleep(2)
293 for tidx in testlist:
294 if "flower" in tidx["category"] and args.device == None:
295 if args.verbose > 1:
296 print('Not executing test {} {} because DEV2 not defined'.
297 format(tidx['id'], tidx['name']))
298 continue
299 try:
300 badtest = tidx # in case it goes bad
301 tap += run_one_test(pm, args, index, tidx)
302 except PluginMgrTestFail as pmtf:
303 ex_type, ex, ex_tb = sys.exc_info()
304 stage = pmtf.stage
305 message = pmtf.message
306 output = pmtf.output
307 print(message)
308 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
309 format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
310 print('---------------')
311 print('traceback')
312 traceback.print_tb(ex_tb)
313 print('---------------')
314 if stage == 'teardown':
315 print('accumulated output for this test:')
316 if pmtf.output:
317 print(pmtf.output)
318 print('---------------')
319 break
320 index += 1
321
322 # if we failed in setup or teardown,
323 # fill in the remaining tests with ok-skipped
324 count = index
325 if not args.notap:
326 tap += 'about to flush the tap output if tests need to be skipped\n'
327 if tcount + 1 != index:
328 for tidx in testlist[index - 1:]:
329 msg = 'skipped - previous {} failed'.format(stage)
330 tap += 'ok {} - {} # {} {} {}\n'.format(
331 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
332 count += 1
333
334 tap += 'done flushing skipped test tap output\n'
335
336 if args.pause:
337 print('Want to pause\nPress enter to continue ...')
338 if input(sys.stdin):
339 print('got something on stdin')
340
341 pm.call_post_suite(index)
342
343 return tap
344
345def has_blank_ids(idlist):
346 """
347 Search the list for empty ID fields and return true/false accordingly.
348 """
349 return not(all(k for k in idlist))
350
351
352def load_from_file(filename):
353 """
354 Open the JSON file containing the test cases and return them
355 as list of ordered dictionary objects.
356 """
357 try:
358 with open(filename) as test_data:
359 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
360 except json.JSONDecodeError as jde:
361 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde))
362 testlist = list()
363 else:
364 idlist = get_id_list(testlist)
365 if (has_blank_ids(idlist)):
366 for k in testlist:
367 k['filename'] = filename
368 return testlist
369
370
371def args_parse():
372 """
373 Create the argument parser.
374 """
375 parser = argparse.ArgumentParser(description='Linux TC unit tests')
376 return parser
377
378
379def set_args(parser):
380 """
381 Set the command line arguments for tdc.
382 """
383 parser.add_argument(
384 '-p', '--path', type=str,
385 help='The full path to the tc executable to use')
386 sg = parser.add_argument_group(
387 'selection', 'select which test cases: ' +
388 'files plus directories; filtered by categories plus testids')
389 ag = parser.add_argument_group(
390 'action', 'select action to perform on selected test cases')
391
392 sg.add_argument(
393 '-D', '--directory', nargs='+', metavar='DIR',
394 help='Collect tests from the specified directory(ies) ' +
395 '(default [tc-tests])')
396 sg.add_argument(
397 '-f', '--file', nargs='+', metavar='FILE',
398 help='Run tests from the specified file(s)')
399 sg.add_argument(
400 '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
401 help='Run tests only from the specified category/ies, ' +
402 'or if no category/ies is/are specified, list known categories.')
403 sg.add_argument(
404 '-e', '--execute', nargs='+', metavar='ID',
405 help='Execute the specified test cases with specified IDs')
406 ag.add_argument(
407 '-l', '--list', action='store_true',
408 help='List all test cases, or those only within the specified category')
409 ag.add_argument(
410 '-s', '--show', action='store_true', dest='showID',
411 help='Display the selected test cases')
412 ag.add_argument(
413 '-i', '--id', action='store_true', dest='gen_id',
414 help='Generate ID numbers for new test cases')
415 parser.add_argument(
416 '-v', '--verbose', action='count', default=0,
417 help='Show the commands that are being run')
418 parser.add_argument(
419 '-N', '--notap', action='store_true',
420 help='Suppress tap results for command under test')
421 parser.add_argument('-d', '--device',
422 help='Execute the test case in flower category')
423 parser.add_argument(
424 '-P', '--pause', action='store_true',
425 help='Pause execution just before post-suite stage')
426 return parser
427
428
429def check_default_settings(args, remaining, pm):
430 """
431 Process any arguments overriding the default settings,
432 and ensure the settings are correct.
433 """
434 # Allow for overriding specific settings
435 global NAMES
436
437 if args.path != None:
438 NAMES['TC'] = args.path
439 if args.device != None:
440 NAMES['DEV2'] = args.device
441 if not os.path.isfile(NAMES['TC']):
442 print("The specified tc path " + NAMES['TC'] + " does not exist.")
443 exit(1)
444
445 pm.call_check_args(args, remaining)
446
447
448def get_id_list(alltests):
449 """
450 Generate a list of all IDs in the test cases.
451 """
452 return [x["id"] for x in alltests]
453
454
455def check_case_id(alltests):
456 """
457 Check for duplicate test case IDs.
458 """
459 idl = get_id_list(alltests)
460 return [x for x in idl if idl.count(x) > 1]
461
462
463def does_id_exist(alltests, newid):
464 """
465 Check if a given ID already exists in the list of test cases.
466 """
467 idl = get_id_list(alltests)
468 return (any(newid == x for x in idl))
469
470
471def generate_case_ids(alltests):
472 """
473 If a test case has a blank ID field, generate a random hex ID for it
474 and then write the test cases back to disk.
475 """
476 import random
477 for c in alltests:
478 if (c["id"] == ""):
479 while True:
480 newid = str('{:04x}'.format(random.randrange(16**4)))
481 if (does_id_exist(alltests, newid)):
482 continue
483 else:
484 c['id'] = newid
485 break
486
487 ufilename = []
488 for c in alltests:
489 if ('filename' in c):
490 ufilename.append(c['filename'])
491 ufilename = get_unique_item(ufilename)
492 for f in ufilename:
493 testlist = []
494 for t in alltests:
495 if 'filename' in t:
496 if t['filename'] == f:
497 del t['filename']
498 testlist.append(t)
499 outfile = open(f, "w")
500 json.dump(testlist, outfile, indent=4)
501 outfile.write("\n")
502 outfile.close()
503
504def filter_tests_by_id(args, testlist):
505 '''
506 Remove tests from testlist that are not in the named id list.
507 If id list is empty, return empty list.
508 '''
509 newlist = list()
510 if testlist and args.execute:
511 target_ids = args.execute
512
513 if isinstance(target_ids, list) and (len(target_ids) > 0):
514 newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
515 return newlist
516
517def filter_tests_by_category(args, testlist):
518 '''
519 Remove tests from testlist that are not in a named category.
520 '''
521 answer = list()
522 if args.category and testlist:
523 test_ids = list()
524 for catg in set(args.category):
525 if catg == '+c':
526 continue
527 print('considering category {}'.format(catg))
528 for tc in testlist:
529 if catg in tc['category'] and tc['id'] not in test_ids:
530 answer.append(tc)
531 test_ids.append(tc['id'])
532
533 return answer
534
535def get_test_cases(args):
536 """
537 If a test case file is specified, retrieve tests from that file.
538 Otherwise, glob for all json files in subdirectories and load from
539 each one.
540 Also, if requested, filter by category, and add tests matching
541 certain ids.
542 """
543 import fnmatch
544
545 flist = []
546 testdirs = ['tc-tests']
547
548 if args.file:
549 # at least one file was specified - remove the default directory
550 testdirs = []
551
552 for ff in args.file:
553 if not os.path.isfile(ff):
554 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
555 else:
556 flist.append(os.path.abspath(ff))
557
558 if args.directory:
559 testdirs = args.directory
560
561 for testdir in testdirs:
562 for root, dirnames, filenames in os.walk(testdir):
563 for filename in fnmatch.filter(filenames, '*.json'):
564 candidate = os.path.abspath(os.path.join(root, filename))
565 if candidate not in testdirs:
566 flist.append(candidate)
567
568 alltestcases = list()
569 for casefile in flist:
570 alltestcases = alltestcases + (load_from_file(casefile))
571
572 allcatlist = get_test_categories(alltestcases)
573 allidlist = get_id_list(alltestcases)
574
575 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
576 idtestcases = filter_tests_by_id(args, alltestcases)
577 cattestcases = filter_tests_by_category(args, alltestcases)
578
579 cat_ids = [x['id'] for x in cattestcases]
580 if args.execute:
581 if args.category:
582 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
583 else:
584 alltestcases = idtestcases
585 else:
586 if cat_ids:
587 alltestcases = cattestcases
588 else:
589 # just accept the existing value of alltestcases,
590 # which has been filtered by file/directory
591 pass
592
593 return allcatlist, allidlist, testcases_by_cats, alltestcases
594
595
596def set_operation_mode(pm, args):
597 """
598 Load the test case data and process remaining arguments to determine
599 what the script should do for this run, and call the appropriate
600 function.
601 """
602 ucat, idlist, testcases, alltests = get_test_cases(args)
603
604 if args.gen_id:
605 if (has_blank_ids(idlist)):
606 alltests = generate_case_ids(alltests)
607 else:
608 print("No empty ID fields found in test files.")
609 exit(0)
610
611 duplicate_ids = check_case_id(alltests)
612 if (len(duplicate_ids) > 0):
613 print("The following test case IDs are not unique:")
614 print(str(set(duplicate_ids)))
615 print("Please correct them before continuing.")
616 exit(1)
617
618 if args.showID:
619 for atest in alltests:
620 print_test_case(atest)
621 exit(0)
622
623 if isinstance(args.category, list) and (len(args.category) == 0):
624 print("Available categories:")
625 print_sll(ucat)
626 exit(0)
627
628 if args.list:
629 if args.list:
630 list_test_cases(alltests)
631 exit(0)
632
633 if len(alltests):
634 catresults = test_runner(pm, args, alltests)
635 else:
636 catresults = 'No tests found\n'
637 if args.notap:
638 print('Tap output suppression requested\n')
639 else:
640 print('All test results: \n\n{}'.format(catresults))
641
642def main():
643 """
644 Start of execution; set up argument parser and get the arguments,
645 and start operations.
646 """
647 parser = args_parse()
648 parser = set_args(parser)
649 pm = PluginMgr(parser)
650 parser = pm.call_add_args(parser)
651 (args, remaining) = parser.parse_known_args()
652 args.NAMES = NAMES
653 check_default_settings(args, remaining, pm)
654 if args.verbose > 2:
655 print('args is {}'.format(args))
656
657 set_operation_mode(pm, args)
658
659 exit(0)
660
661
662if __name__ == "__main__":
663 main()