Skip to content
Snippets Groups Projects
Commit 68abf872 authored by Janet Goldstein's avatar Janet Goldstein
Browse files

WS-436: Test coverage audit

parent 44c5fcbf
No related branches found
No related tags found
1 merge request!448WS-436: Test coverage audit
Pipeline #2666 passed
# Coverage Audit
This package contains the **coverage-audit**, which fetches and parses the `coverage_report.xml`
test coverage report produced by the gitlab CI, then prints a human-readable summary to the console.
## Usage
```coverage-audit [target_dir]```
the `coverage-audit` command downloads the latest coverage report from gitlab,
as a `.zip`, extracts the coverage report file, parses it, and prints a
summary of workspaces modules whose test coverage is under 100%. If a target
directory is supplied, the coverage report and related files are written
to `target_dir/coverage/`.
### Example output
```
The following workspaces modules have test coverage of < 100%:
apps/cli/executables/pexable/carta_envoy/carta_envoy/carta.py: line-rate = 0.6441; branch-rate = 0.2222
apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py: line-rate = 0.6531; branch-rate = 0.5
apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py: line-rate = 0.6707; branch-rate = 0.45
...
```
___version___ = "4.0.0a1.dev1"
""" Detailed audit of incomplete test coverage reported by gitlab CI """
import ast
import http
import os
import zipfile
from pathlib import Path
from typing import List
from xml.etree import ElementTree as ET
from xml.etree.ElementTree import Element
# pylint: disable=E0401, R0903, R1721
import requests
COVERAGE_REPORT_XML = "coverage.xml"
class CoverageReportSummary:
"""Wraps items parsed out of a coverage report"""
def __init__(self):
self.classes = self.functions = self.other_items = []
def add_item(self, item):
"""
Add this ast item to the appropriate list.
:param item: ast thing, e.g., ClassDef, FunctionDef, If, IfExp
:return:
"""
if isinstance(item, ast.ClassDef):
self.classes.append(item)
elif isinstance(item, ast.FunctionDef):
self.functions.append(item)
else:
self._add_other_item(item)
def _add_other_item(self, item):
"""
Examine this ast item and, if it's something we're interested in,
add it to the other_items list.
:param item: some ast thing that isn't a ClassDef nor a FunctionDef
:return:
"""
if isinstance(item, ast.If) or isinstance(item, ast.IfExp):
self.other_items.append(item)
elif not isinstance(item, ast.Import) and not isinstance(item, ast.ImportFrom):
# (we're not interested in `import` lines)
val = item.value
if hasattr(val, "n"):
content = item.value.n
else:
content = item.value
if not isinstance(content, str) and not isinstance(
item, ast.Assign
):
self.other_items.append(item)
class ModuleMetadata:
"""Represents a Python module to be audited for test coverage."""
def __init__(self, path: Path, summary: CoverageReportSummary):
self.path = path
self.summary = summary
class ElementCoverageMetadata:
"""Encapsulates metadata from the coverage report for a module we're going to audit
for test coverage"""
def __init__(self, path: Path, line_rate: float, branch_rate: float):
self.path = path
self.line_rate = line_rate
self.branch_rate = branch_rate
self.modules = []
def __str__(self):
return f"{self.path}: line-rate = {self.line_rate}; branch-rate = {self.branch_rate}"
def __eq__(self, other):
if isinstance(other, ElementCoverageMetadata):
return (
other.path == self.path
and other.line_rate == self.line_rate
and other.branch_rate == self.branch_rate
and other.modules == self.modules
)
return False
def add_module(self, module: ModuleMetadata):
"""
Add this module to the metadata.
:param module: module to add
:return:
"""
self.modules.append(module)
class CoverageReportParser:
"""Turn a gitlab coverage report into audit targets we can examine"""
def __init__(self, report_path: Path):
self.xml_file = report_path
self.project_root = find_project_root()
self.targets = []
def parse(self):
"""
Find the audit targets in this coverage report
:return: a list of metadata about classes to be audited
"""
for target in self._read_coverage_report():
self.targets.append(target)
def _read_coverage_report(self) -> List[ElementCoverageMetadata]:
"""
parse the coverage report generated by gitlab
:return: a list of metadata about classes to be audited
"""
targets = []
coverage = ET.parse(self.xml_file)
for package in coverage.findall(".//package"):
complexity = int(package.attrib["complexity"])
if complexity > 0:
print(
f">>> (FYI: nonzero complexity of {complexity} for {package.attrib['name']})"
)
for element in package.findall(".//class"):
target = ElementDigger(self.project_root, element).dig()
if target:
targets.append(target)
return targets
class ElementDigger:
"""Excavates an element parsed out of the coverage report"""
def __init__(self, project_root: Path, element: Element):
self.project_root = project_root
self.element = element
def dig(self) -> ElementCoverageMetadata:
"""
Parse the class metadata for this "class" element of the coverage report.
:return:
"""
target = ElementCoverageMetadata(
path=Path(self.element.attrib["filename"]),
line_rate=float(self.element.attrib["line-rate"]),
branch_rate=float(self.element.attrib["branch-rate"]),
)
add_target = False
for lines in self.element.findall(".//lines"):
for line in lines:
attribs = line.attrib
if "branch" in attribs and "condition-coverage" in attribs:
condition_coverage = attribs["condition-coverage"]
# it will be a % string; convert to an int
cov_percent = int(condition_coverage.split("%")[0])
if cov_percent < 100:
add_target = True
module = self._create_audit_module(target)
target.add_module(module)
if add_target:
return target
def _create_audit_module(self, target: ElementCoverageMetadata) -> ModuleMetadata:
"""
Find the Python module at this target's path, scoop up the functions and classes,
and return an AuditModule
:param target: a target of code test audit
:return:
"""
summary = CoverageReportSummary()
module_path = self.project_root / target.path
with open(module_path, "r") as infile:
content = infile.read()
dumped = ast.Dict(ast.parse(content))
for item in dumped.keys.body:
summary.add_item(item)
return ModuleMetadata(path=module_path, summary=summary)
class CoverageReportGrabber:
"""Download gitlab's latest coverage report"""
def __init__(self, destination: Path = None):
if destination:
self.destination = destination
else:
self.destination = Path.cwd()
def grab(self):
"""
Issue http GET request for coverage.xml, which will be contained n a .zip.
Retrieve it to the current directory and unzip it.
:return: the coverage report
"""
url = "https://gitlab.nrao.edu/ssa/workspaces/-/jobs/artifacts/main/download?job=unit+test+coverage"
response = requests.get(url)
if response.status_code == http.HTTPStatus.OK:
# write the zip file, then extract the .xml we want
return self._write_cov_rpt_xml(response.content)
raise RuntimeError(f"RETRIEVAL FAILURE: {response}")
def _write_cov_rpt_xml(self, content: bytes) -> Path:
"""
Write the coverage report extracted from the downloaded .zip
:param content: file contents
:return: COVERAGE_REPORT_XML file at self.destination
"""
target_path = self.destination / "coverage.zip"
with open(target_path, "wb") as cov_zip:
cov_zip.write(content)
xml_file = self.destination / COVERAGE_REPORT_XML
with zipfile.ZipFile(target_path, "r") as zip_ref:
zip_ref.extract(COVERAGE_REPORT_XML, self.destination)
target_path.unlink()
return xml_file
class Reporter:
"""Create a digest of not-fully-covered items found in the coverage report"""
def __init__(self, targets: List[ElementCoverageMetadata]):
self.targets = targets
def report(self):
"""
Print results to the console in human-readable format
:return:
"""
if len(self.targets) == 0:
print("All workspaces modules are fully covered by current tests.")
return
print("The following workspaces modules have test coverage of < 100%:")
for target in self.targets:
print(target)
def we_are_in_docker() -> bool:
"""
Are we executing inside a Docker container?
(If not, we assume it's local testing.)
:return: is she or isn't she? only her hairdresser knows for sure
"""
return Path("/packages").is_dir()
def find_project_root() -> Path:
"""
Under which top level do we start looking for tests?
:return: project root
"""
if we_are_in_docker():
# we're testing in the workflow container
return Path("/packages")
# someone is testing locally
possible_roots = [
file for file in Path(os.environ["HOME"]).rglob("workspaces") if file.is_dir()
]
if len(possible_roots) > 0:
workspaces = possible_roots[0]
return workspaces
raise FileNotFoundError(f"project root not found under {os.environ['HOME' ]}")
def main(argv: List[str] = None):
"""
yes, you -can- have it all
:return:
"""
if argv:
target_path = Path(argv[0])
else:
target_path = Path.cwd()
latest_report = CoverageReportGrabber(target_path).grab()
parser = CoverageReportParser(latest_report)
parser.parse()
reporter = Reporter(parser.targets)
reporter.report()
if __name__ == "main":
main()
"""A setuptools-based setup module for coverage_audit.
See:
https://packaging.python.org/en/latest/distributing.html
https://github.com/pypa/sampleproject
"""
# For matching the version string.
import re
from os import path
# To use a consistent encoding
from codecs import open
# Always prefer setuptools over distutils
from pathlib import Path
from setuptools import find_packages, setup
THIS_MODULE = "coverage_audit"
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
requires = [
"pytest>=5.4,<6.0",
]
def read(*parts):
"""
Read _version.py.
:param parts: paths to search for _version.py
:return:
"""
with open(path.join(here, *parts), "r") as fp:
return fp.read()
def find_version() -> str:
"""
What version are we now?
:return: string representing current version of this app
"""
version_file = read(Path.cwd() / "_version.py")
if not version_file:
raise RuntimeError("Unable to find version file.")
version_match = re.search(r"^___version___ = ['\"]([^'\"]*)['\"]", version_file, re.M)
if version_match:
return version_match.group(1)
raise RuntimeError(f"Unable to find version string in {version_file}.")
setup(
name="ssa-" + THIS_MODULE,
# Versions should comply with PEP440. For a discussion on single-sourcing
# the version across setup.py and the project code, see
# https://packaging.python.org/en/latest/single_source_version.html
version=find_version(),
description="Coverage Audit: the Workspaces Testing Coverage Audit",
long_description=long_description,
# Author details
author="Science Support and Archive",
author_email="ssa-announcements@nrao.edu",
# Choose your license
license="GPL",
# See https://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
# How mature is this project? Common values are
# 3 - Alpha
# 4 - Beta
# 5 - Production/Stable
"Development Status :: 4 - Beta",
# Indicate who your project is intended for
"Intended Audience :: Developers",
"Topic :: Software Development :: Build Tools",
# Pick your license as you wish (should match "license" above)
"License :: OSI Approved :: GPL License",
# Specify the Python versions you support here. In particular, ensure
# that you indicate whether you support Python 2, Python 3 or both.
"Programming Language :: Python :: 3.8",
],
packages=find_packages(),
entry_points={"console_scripts": ["coverage-audit = coverage_audit.coverage_audit:main"]},
)
This diff is collapsed.
""" Coverage audit proof-of-concept masquerading as tests """
import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import patch
# pylint: disable=E0401, R1721, W0621
import pytest
import coverage_audit
from coverage_audit import (
CoverageReportParser,
find_project_root,
Reporter,
COVERAGE_REPORT_XML,
CoverageReportGrabber,
)
RUN_ALL = True
@pytest.fixture(scope="module")
def project_root() -> Path:
"""
Get the project root once and use it in all tests
:return: top level under which audit targets will be found
"""
return find_project_root()
@pytest.fixture(scope="module")
def test_coverage_report() -> Path:
"""
Find the testing copy of the gitlab coverage report.
:return: path to report file
"""
cur_dir = Path.cwd()
xmls = [file for file in cur_dir.parent.rglob(COVERAGE_REPORT_XML)]
for xml in xmls:
if "coverage_audit/test" in str(xml):
return xml
def test_main_launches_cov_audit(capsys):
"""
Is coverage_audit.main() functional?
:return:
"""
tmpdir = Path(tempfile.mkdtemp())
orig_dir = Path.cwd()
# try it in current directory first
os.chdir(tmpdir)
coverage_audit.main()
cov_rpt_file = tmpdir / COVERAGE_REPORT_XML
assert cov_rpt_file.exists()
result = capsys.readouterr().out
lines = result.strip().split("\n")
num_lines = len(lines)
assert num_lines > 1
# go back to where we were, then pass in the destination
os.chdir(orig_dir)
cov_rpt_file.unlink()
coverage_audit.main([str(tmpdir)])
assert cov_rpt_file.exists()
result = capsys.readouterr().out
lines = result.strip().split("\n")
assert len(lines) == num_lines
shutil.rmtree(tmpdir)
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_expected_targets_created(test_coverage_report):
"""
Do the audit targets in the testing coverage.xml have the metadata we expect?
:return:
"""
crp = CoverageReportParser(test_coverage_report)
crp.parse()
# we're expecting this many "classes" to have coverage < 100%
assert len(crp.targets) == 39
for target in crp.targets:
assert target.branch_rate >= 0.0
assert target.line_rate > 0.0
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_all_targets_analyzed(test_coverage_report):
"""
Have we dug into every target to get its 411?
:return:
"""
crp = CoverageReportParser(test_coverage_report)
with patch("coverage_audit.ElementDigger.dig") as mock:
crp.parse()
assert mock.call_count == len(crp.targets)
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_parses_target_accurately(test_coverage_report):
"""
Do we find the attributes we expect in a given target created from
an element of the coverage report?
:return:
"""
crp = CoverageReportParser(test_coverage_report)
crp.parse()
module_found = False
for target in crp.targets:
if target.path.name.endswith("delivery.py"):
assert target.line_rate == 0.8077
assert target.branch_rate == 0.5
assert len(target.modules) == 2
for module in target.modules:
assert module.path.name.endswith("delivery.py")
module_found = True
break
assert module_found
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_creates_expected_digest(test_coverage_report: Path, capsys):
"""
For the test copy of coverage.xml, do we get the output we should?
:return:
"""
parser = CoverageReportParser(test_coverage_report)
parser.parse()
reporter = Reporter(parser.targets)
reporter.report()
result = capsys.readouterr().out
lines = result.strip().split("\n")
assert len(lines) == len(parser.targets) + 1
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_fetches_latest_coverage_report():
"""
Can we retrieve the latest coverage report from gitlab with an http request?
:return:
"""
tmpdir = Path(tempfile.mkdtemp())
latest_coverage_report = CoverageReportGrabber(tmpdir).grab()
assert latest_coverage_report.exists()
shutil.rmtree(tmpdir)
@pytest.mark.skipif(not RUN_ALL, reason="temporary skip")
def test_parses_latest_coverage_report(test_coverage_report: Path):
"""
Do we get plausible results from the latest gitlab coverage report
(and are we indeed parsing the real one, not the test copy)?
:return:
"""
tmpdir = Path(tempfile.mkdtemp())
latest_report = CoverageReportGrabber(tmpdir).grab()
parser = CoverageReportParser(latest_report)
parser.parse()
results = parser.targets
assert len(results) > 0
# results should be different from those of parsing our test copy
parser = CoverageReportParser(test_coverage_report)
parser.parse()
test_results = parser.targets
# if number of results is same, be sure at least one of the results is different;
# otherwise, we can be confident we're reading the real coverage report....
diff_found = False
if len(results) == len(test_results):
for result in results:
if result not in test_results:
diff_found = True
break
else:
diff_found = True
assert diff_found
# ... but let's confirm that: make the lists equal length and compare results
len_diff = abs(len(test_results) - len(results))
if len(results) > len(test_results):
results = results[: len(results) - len_diff]
else:
test_results = results[: len(test_results) - len_diff]
diff_found = False
for result in results:
if result not in test_results:
diff_found = True
break
assert diff_found
shutil.rmtree(tmpdir)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment