Merge pull request #8560 from lpy4105/issue/8423/optimize-analyze_outcomes_py

Optimize analyze_outcomes.py
This commit is contained in:
Gilles Peskine 2023-11-29 14:51:41 +00:00 committed by GitHub
commit 18eab984c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -12,9 +12,36 @@ import traceback
import re import re
import subprocess import subprocess
import os import os
import typing
import check_test_cases import check_test_cases
# `ComponentOutcomes` is a named tuple which is defined as:
# ComponentOutcomes(
# successes = {
# "<suite_case>",
# ...
# },
# failures = {
# "<suite_case>",
# ...
# }
# )
# suite_case = "<suite>;<case>"
ComponentOutcomes = typing.NamedTuple('ComponentOutcomes',
[('successes', typing.Set[str]),
('failures', typing.Set[str])])
# `Outcomes` is a representation of the outcomes file,
# which defined as:
# Outcomes = {
# "<component>": ComponentOutcomes,
# ...
# }
Outcomes = typing.Dict[str, ComponentOutcomes]
class Results: class Results:
"""Process analysis results.""" """Process analysis results."""
@ -40,35 +67,12 @@ class Results:
def _print_line(fmt, *args, **kwargs): def _print_line(fmt, *args, **kwargs):
sys.stderr.write((fmt + '\n').format(*args, **kwargs)) sys.stderr.write((fmt + '\n').format(*args, **kwargs))
class TestCaseOutcomes: def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \
"""The outcomes of one test case across many configurations.""" outcome_file: str) -> None:
# pylint: disable=too-few-public-methods
def __init__(self):
# Collect a list of witnesses of the test case succeeding or failing.
# Currently we don't do anything with witnesses except count them.
# The format of a witness is determined by the read_outcome_file
# function; it's the platform and configuration joined by ';'.
self.successes = []
self.failures = []
def hits(self):
"""Return the number of times a test case has been run.
This includes passes and failures, but not skips.
"""
return len(self.successes) + len(self.failures)
def execute_reference_driver_tests(results: Results, ref_component, driver_component, \
outcome_file):
"""Run the tests specified in ref_component and driver_component. Results """Run the tests specified in ref_component and driver_component. Results
are stored in the output_file and they will be used for the following are stored in the output_file and they will be used for the following
coverage analysis""" coverage analysis"""
# If the outcome file already exists, we assume that the user wants to results.new_section("Test {} and {}", ref_component, driver_component)
# perform the comparison analysis again without repeating the tests.
if os.path.exists(outcome_file):
results.info("Outcome file ({}) already exists. Tests will be skipped.", outcome_file)
return
shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \ shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \
" " + ref_component + " " + driver_component " " + ref_component + " " + driver_component
@ -78,24 +82,28 @@ def execute_reference_driver_tests(results: Results, ref_component, driver_compo
if ret_val != 0: if ret_val != 0:
results.error("failed to run reference/driver components") results.error("failed to run reference/driver components")
def analyze_coverage(results, outcomes, allow_list, full_coverage): def analyze_coverage(results: Results, outcomes: Outcomes,
allow_list: typing.List[str], full_coverage: bool) -> None:
"""Check that all available test cases are executed at least once.""" """Check that all available test cases are executed at least once."""
available = check_test_cases.collect_available_test_cases() available = check_test_cases.collect_available_test_cases()
for key in available: for suite_case in available:
hits = outcomes[key].hits() if key in outcomes else 0 hit = any(suite_case in comp_outcomes.successes or
if hits == 0 and key not in allow_list: suite_case in comp_outcomes.failures
for comp_outcomes in outcomes.values())
if not hit and suite_case not in allow_list:
if full_coverage: if full_coverage:
results.error('Test case not executed: {}', key) results.error('Test case not executed: {}', suite_case)
else: else:
results.warning('Test case not executed: {}', key) results.warning('Test case not executed: {}', suite_case)
elif hits != 0 and key in allow_list: elif hit and suite_case in allow_list:
# Test Case should be removed from the allow list. # Test Case should be removed from the allow list.
if full_coverage: if full_coverage:
results.error('Allow listed test case was executed: {}', key) results.error('Allow listed test case was executed: {}', suite_case)
else: else:
results.warning('Allow listed test case was executed: {}', key) results.warning('Allow listed test case was executed: {}', suite_case)
def name_matches_pattern(name, str_or_re): def name_matches_pattern(name: str, str_or_re) -> bool:
"""Check if name matches a pattern, that may be a string or regex. """Check if name matches a pattern, that may be a string or regex.
- If the pattern is a string, name must be equal to match. - If the pattern is a string, name must be equal to match.
- If the pattern is a regex, name must fully match. - If the pattern is a regex, name must fully match.
@ -103,24 +111,34 @@ def name_matches_pattern(name, str_or_re):
# The CI's python is too old for re.Pattern # The CI's python is too old for re.Pattern
#if isinstance(str_or_re, re.Pattern): #if isinstance(str_or_re, re.Pattern):
if not isinstance(str_or_re, str): if not isinstance(str_or_re, str):
return str_or_re.fullmatch(name) return str_or_re.fullmatch(name) is not None
else: else:
return str_or_re == name return str_or_re == name
def analyze_driver_vs_reference(results: Results, outcomes, def analyze_driver_vs_reference(results: Results, outcomes: Outcomes,
component_ref, component_driver, component_ref: str, component_driver: str,
ignored_suites, ignored_tests=None): ignored_suites: typing.List[str], ignored_tests=None) -> None:
"""Check that all tests executed in the reference component are also """Check that all tests passing in the reference component are also
executed in the corresponding driver component. passing in the corresponding driver component.
Skip: Skip:
- full test suites provided in ignored_suites list - full test suites provided in ignored_suites list
- only some specific test inside a test suite, for which the corresponding - only some specific test inside a test suite, for which the corresponding
output string is provided output string is provided
""" """
seen_reference_passing = False ref_outcomes = outcomes.get("component_" + component_ref)
for key in outcomes: driver_outcomes = outcomes.get("component_" + component_driver)
# key is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = key.split(';') if ref_outcomes is None or driver_outcomes is None:
results.error("required components are missing: bad outcome file?")
return
if not ref_outcomes.successes:
results.error("no passing test in reference component: bad outcome file?")
return
for suite_case in ref_outcomes.successes:
# suite_case is like "test_suite_foo.bar;Description of test case"
(full_test_suite, test_string) = suite_case.split(';')
test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name
# Immediately skip fully-ignored test suites # Immediately skip fully-ignored test suites
@ -136,67 +154,48 @@ def analyze_driver_vs_reference(results: Results, outcomes,
if name_matches_pattern(test_string, str_or_re): if name_matches_pattern(test_string, str_or_re):
ignored = True ignored = True
# Search for tests that run in reference component and not in driver component if not ignored and not suite_case in driver_outcomes.successes:
driver_test_passed = False results.error("PASS -> SKIP/FAIL: {}", suite_case)
reference_test_passed = False if ignored and suite_case in driver_outcomes.successes:
for entry in outcomes[key].successes: results.error("uselessly ignored: {}", suite_case)
if component_driver in entry:
driver_test_passed = True
if component_ref in entry:
reference_test_passed = True
seen_reference_passing = True
if reference_test_passed and not driver_test_passed and not ignored:
results.error("PASS -> SKIP/FAIL: {}", key)
if ignored and driver_test_passed:
results.error("uselessly ignored: {}", key)
if not seen_reference_passing: def analyze_outcomes(results: Results, outcomes: Outcomes, args) -> None:
results.error("no passing test in reference component: bad outcome file?")
def analyze_outcomes(results: Results, outcomes, args):
"""Run all analyses on the given outcome collection.""" """Run all analyses on the given outcome collection."""
analyze_coverage(results, outcomes, args['allow_list'], analyze_coverage(results, outcomes, args['allow_list'],
args['full_coverage']) args['full_coverage'])
def read_outcome_file(outcome_file): def read_outcome_file(outcome_file: str) -> Outcomes:
"""Parse an outcome file and return an outcome collection. """Parse an outcome file and return an outcome collection.
An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects.
The keys are the test suite name and the test case description, separated
by a semicolon.
""" """
outcomes = {} outcomes = {}
with open(outcome_file, 'r', encoding='utf-8') as input_file: with open(outcome_file, 'r', encoding='utf-8') as input_file:
for line in input_file: for line in input_file:
(platform, config, suite, case, result, _cause) = line.split(';') (_platform, component, suite, case, result, _cause) = line.split(';')
key = ';'.join([suite, case]) # Note that `component` is not unique. If a test case passes on Linux
setup = ';'.join([platform, config]) # and fails on FreeBSD, it'll end up in both the successes set and
if key not in outcomes: # the failures set.
outcomes[key] = TestCaseOutcomes() suite_case = ';'.join([suite, case])
if component not in outcomes:
outcomes[component] = ComponentOutcomes(set(), set())
if result == 'PASS': if result == 'PASS':
outcomes[key].successes.append(setup) outcomes[component].successes.add(suite_case)
elif result == 'FAIL': elif result == 'FAIL':
outcomes[key].failures.append(setup) outcomes[component].failures.add(suite_case)
return outcomes return outcomes
def do_analyze_coverage(results: Results, outcome_file, args): def do_analyze_coverage(results: Results, outcomes: Outcomes, args) -> None:
"""Perform coverage analysis.""" """Perform coverage analysis."""
results.new_section("Analyze coverage") results.new_section("Analyze coverage")
outcomes = read_outcome_file(outcome_file)
analyze_outcomes(results, outcomes, args) analyze_outcomes(results, outcomes, args)
def do_analyze_driver_vs_reference(results: Results, outcome_file, args): def do_analyze_driver_vs_reference(results: Results, outcomes: Outcomes, args) -> None:
"""Perform driver vs reference analyze.""" """Perform driver vs reference analyze."""
results.new_section("Analyze driver {} vs reference {}", results.new_section("Analyze driver {} vs reference {}",
args['component_driver'], args['component_ref']) args['component_driver'], args['component_ref'])
execute_reference_driver_tests(results, args['component_ref'], \
args['component_driver'], outcome_file)
ignored_suites = ['test_suite_' + x for x in args['ignored_suites']] ignored_suites = ['test_suite_' + x for x in args['ignored_suites']]
outcomes = read_outcome_file(outcome_file)
analyze_driver_vs_reference(results, outcomes, analyze_driver_vs_reference(results, outcomes,
args['component_ref'], args['component_driver'], args['component_ref'], args['component_driver'],
ignored_suites, args['ignored_tests']) ignored_suites, args['ignored_tests'])
@ -542,10 +541,31 @@ def main():
KNOWN_TASKS['analyze_coverage']['args']['full_coverage'] = options.full_coverage KNOWN_TASKS['analyze_coverage']['args']['full_coverage'] = options.full_coverage
# If the outcome file exists, parse it once and share the result
# among tasks to improve performance.
# Otherwise, it will be generated by execute_reference_driver_tests.
if not os.path.exists(options.outcomes):
if len(tasks_list) > 1:
sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n")
sys.exit(2)
task_name = tasks_list[0]
task = KNOWN_TASKS[task_name]
if task['test_function'] != do_analyze_driver_vs_reference: # pylint: disable=comparison-with-callable
sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name))
sys.exit(2)
execute_reference_driver_tests(main_results,
task['args']['component_ref'],
task['args']['component_driver'],
options.outcomes)
outcomes = read_outcome_file(options.outcomes)
for task in tasks_list: for task in tasks_list:
test_function = KNOWN_TASKS[task]['test_function'] test_function = KNOWN_TASKS[task]['test_function']
test_args = KNOWN_TASKS[task]['args'] test_args = KNOWN_TASKS[task]['args']
test_function(main_results, options.outcomes, test_args) test_function(main_results, outcomes, test_args)
main_results.info("Overall results: {} warnings and {} errors", main_results.info("Overall results: {} warnings and {} errors",
main_results.warning_count, main_results.error_count) main_results.warning_count, main_results.error_count)