Skip to content
Snippets Groups Projects
Commit 2934902a authored by Daniel Lyons's avatar Daniel Lyons
Browse files

removing unused testing stuff

parent 8d082ebb
No related branches found
No related tags found
1 merge request!77removing unused testing stuff
Pipeline #542 passed
Showing
with 0 additions and 868 deletions
import pytest
def test_croak_and_die():
pytest.fail("I'm melting! I'm melting!")
def test_but_but_its_true():
assert 'Janet' == 'Codemeister'
def test_this_one_should_pass():
print('"I am the walrus"')
def test_this_is_obvious():
assert 3 == 3
def test_this_is_obvious_too():
assert "You, sir" != "Jack Kennedy"
import random
def test_numbers_dont_lie():
assert 2 > 1.5
def test_whats_for_dinner():
entrees = ['taco plate', 'spinach salad', 'spam special',
'chocolate fondue', 'fried grasshopper']
index = random.randint(0, len(entrees) - 1)
dinner = entrees[index]
assert dinner in entrees
# Integration Tests
Here lie integration tests for various components of the system. They were born disguised as unit tests throughout the repo in our various `test` directories and were quarantined here in The Great [SSA-6826](https://open-jira.nrao.edu/browse/SSA-6826?filter=-1) Test Audit of 2021.
Currently, these tests are in limbo; meaning, they shouldn't be getting executed and are only here so we have a starting point for when/if we do decide to formally begin writing integration tests.
-- Nathan
import pytest
from channels.amqp_helpers import WorkflowEventChannel, workflow_events
from workspaces.workflow.enum import WorkflowEventType
from workspaces.workflow.schema import WorkflowEvent
@pytest.mark.skip(reason="Test requires AMQP, therefore it's an integration test")
def test_workflow_event_sending():
"""
Tests that a workflow event can be properly sent to the AMQP exchange, routed through a queue, and get received
on the other side
"""
workflow_events.connect(profile="docker")
channel = workflow_events.chan.connection.channel()
queue = channel.queue_declare(queue="", exclusive=True).method.queue
channel.queue_bind(
queue=queue, exchange=WorkflowEventChannel.EXCHANGE, routing_key="#"
)
workflow_events.send(
WorkflowEvent(
"-1", "name", "-1", WorkflowEventType.OTHER, "2020-01-01", "Nothing to log"
)
)
method, header, body = channel.basic_get(queue)
if method:
print(body)
from typing import List
import pytest
from workspaces.capability.enums import CapabilityStepType, ExecutionState
from workspaces.capability.helpers import CapabilityStep
from workspaces.capability.helpers_interfaces import ParameterIF
from workspaces.capability.schema import Capability, CapabilityExecution
from workspaces.capability.schema_interfaces import (
CapabilityExecutionIF,
CapabilityIF,
CapabilityRequestIF,
)
from workspaces.capability.services.capability_engine import CapabilityEngine
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.capability.services.interfaces import CapabilityInfoIF, CapabilityName
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.workflow.services.workflow_service import WorkflowService
from shared.workspaces.test.test_workflow_service import FakeWorkflowInfo
class FakeCapabilityInfo(CapabilityInfoIF):
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
capabilities = [
Capability(
id=1,
name="null",
steps="prepare and run workflow null\nawait workflow\n",
max_jobs=2,
)
]
def create_capability(
self, name: CapabilityName, steps: List[CapabilityStepType], max_jobs: int
) -> CapabilityIF:
capability = Capability(len(self.capabilities), name, steps, max_jobs)
self.capabilities.append(capability)
return capability
def create_capability_request(
self,
capability_name: str,
parameters: List[ParameterIF] = None,
future_products: List[FutureProductIF] = None,
) -> CapabilityRequestIF:
capability = self.lookup_capability(capability_name)
request = capability.create_request(parameters, future_products)
return request
def create_execution(self, request: CapabilityRequestIF) -> CapabilityExecutionIF:
most_recent_version = request.versions[-1]
execution = CapabilityExecution(
id=len(most_recent_version.executions) + 1,
state=ExecutionState.Ready.name,
version=most_recent_version,
current_step=0,
# ensure that we have a copy of the step sequence as it was when the execution started
steps=request.capability.steps,
)
most_recent_version.executions.append(execution)
return execution
def lookup_capability(self, capability_name: str) -> CapabilityIF:
return next(c for c in self.capabilities if c.name == capability_name)
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
for capability in self.capabilities:
for version in capability.versions:
for execution in version.executions:
if execution_id == execution.id:
return execution
def save_execution(self):
pass
capability_info = FakeCapabilityInfo()
workflow_info = FakeWorkflowInfo()
@pytest.mark.skip(
reason="Using a raw CapabilityService like this utilizes wf_monitor, making it an integration test"
)
def test_capability_request_creation():
"""
Tests that a capability request is properly created and persisted
"""
cs = CapabilityService(capability_info, workflow_info)
req = cs.create_request("null")
assert req in capability_info.capabilities[0].requests
@pytest.mark.skip(
reason="Test needs refactoring; also, we're dealing with several different objects, making it an integration test"
)
def test_prepare_and_run():
"""
Tests that a capability request can be submitted and executed
:return:
"""
ws = WorkflowService(workflow_info)
engine = CapabilityEngine(capability_info, ws)
step = CapabilityStep.from_str("prepare-and-run-workflow null {}")
print(step)
execution = CapabilityExecution()
step.execute(engine, execution)
import pendulum
import pytest
from workspaces.workflow.enum import WorkflowEventType
from workspaces.workflow.json import WorkflowEventSchema
from workspaces.workflow.schema import WorkflowEvent
@pytest.mark.skip(
reason="This is testing the entire WF event system, not a single module of that system; therefore, it's an integration test"
)
def test_schema_dumpload():
"""
Tests that a WorflowEvent can be created, dumped to JSON, and loaded into a schema without losing any information
"""
e = WorkflowEvent(
workflow_request_id=0,
job_name="foo",
condor_job_id=1,
event_type=WorkflowEventType.SUBMITTED,
timestamp=str(pendulum.now()),
log="nothing to speak of",
)
schema = WorkflowEventSchema()
event_text = schema.dump(e)
r = schema.load(event_text)
assert e == r
import pytest
from schema import create_session
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.interfaces import CapabilityInfoIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF
from workspaces.workflow.services.workflow_info import WorkflowInfo
SESSION = create_session("SDM")
def get_workflow_info() -> WorkflowInfoIF:
"""Return a WorkflowInfo based on a real database connection"""
return WorkflowInfo(SESSION)
def get_capability_info() -> CapabilityInfoIF:
"""Return a CapabilityInfo based on a real database connection"""
return CapabilityInfo(SESSION)
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_get_requests_for_capability():
"""Ensure we can get all requests for a certain capability"""
info = get_capability_info()
info.create_capability("test_get_requests", None, 0)
info.create_capability_request("test_get_requests")
assert len(info.requests_for_capability("test_get_requests")) > 0
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_persisting_workflow_request():
"""Ensure we can persist a workflow request"""
info = get_workflow_info()
null = info.lookup_workflow_definition("null")
req = info.create_workflow_request(null, {})
assert req.workflow_request_id is not None
assert req.results_dir is not None
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_reading_workflows():
"""Ensure we can get workflows"""
info = get_workflow_info()
workflows = info.all_workflows()
assert len(workflows) > 0
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_reading_workflow_requests():
"""Ensure we can get workflow requests"""
info = get_workflow_info()
workflow_requests = info.all_workflow_requests()
assert len(workflow_requests) > 0
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_reading_capability():
"""Ensure we can get a capability"""
info = get_capability_info()
null = info.lookup_capability("null")
assert null is not None
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_capability_associated_to_workflow():
"""
Tests that the relationship between capability requests and workflows is sound
FIXME: Needs an assertion to actually test something
"""
cinfo = get_capability_info()
winfo = get_workflow_info()
req = cinfo.create_capability_request("null", {})
exec = cinfo.create_execution(req)
workflow_req = winfo.create_workflow_request(
winfo.lookup_workflow_definition("null"), {}
)
exec.current_workflow_request = workflow_req
SESSION.add(exec)
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_download_capability_persisted():
"""
Tests that the download capability is successfully added by its alembic upgrade
"""
cinfo = get_capability_info()
capability = cinfo.lookup_capability("download")
assert capability.name == "download"
@pytest.mark.skip(
reason="This test queries the database, making it an integration test and unsuitable for CI."
)
def test_download_workflow_persisted():
"""
Tests that the download workflow is successfully added by its alembic upgrade
"""
winfo = get_workflow_info()
workflow = winfo.lookup_workflow_definition("download")
templates = winfo.lookup_workflow_templates_for("download")
assert workflow.workflow_name == "download"
assert len(templates) == 3
import pytest
import requests
from pycapo import CapoConfig
@pytest.mark.skip(
reason="Test needs to be refactored to NOT use wf_monitor (i.e. make it a unit test)"
)
def test_workflow_request_execution():
"""
Test whether a null workflow request can properly be executed
"""
settings = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings")
r1 = requests.post(f"{settings.serviceUrl}/workflows/null/requests/create?args=-g")
request_id = r1.json()["workflow_request_id"]
r2 = requests.post(f"{settings.serviceUrl}/workflows/requests/{request_id}/submit")
# Assert workflow request execution was received with no error
assert r2.status_code == 200
r3 = requests.get(f"{settings.serviceUrl}/workflows/requests/{request_id}")
# Assert results_dir was properly assigned
assert len(r3.json()["results_dir"]) > 0
# Assert workflow request is in completed state
assert r3.json()["state"] == "Complete"
# Package Tester
This application executes pytest on specified packages, returning
a summary of passes, failures, skips, and errors. A "verbose" command-
line option includes detail for individual tests.
### A word to the wise
If you're going to execute `pytest` from the command line,
be sure to ignore tests that will always fail (these are
used in PackageTester's own tests):
test/$ pytest -v --showlocals --ignore=dummy_tests
## Usage
Package Tester is invoked by the **package-tester** console script
included in <code>package_tester/setup.py</code>:
package-tester pkg1 [-v|--verbose][pkg2 pkg3 ... pkg N]'
*pkg* arguments are names of workspaces packages to be tested;
e.g., **datafetcher** or **shared.channels**
Package names need not be fully qualified. In the event that more
than one package is found for a given package name, user will be
notified that they must be more specific.
The *verbose* flag results in more detailed output. For example:
regular output:
$ package-tester dummy_tests
5 passes, 0 errors, 2 failures, 0 skipped; elapsed time: 0.04s
verbose output:
$ package-tester -v dummy_tests
============================= test session starts ==============================
platform darwin -- Python 3.8.6, pytest-5.4.3, py-1.10.0, pluggy-0.13.1
rootdir: /Users/jgoldste/Projects/data, inifile: setup.cfg
collected 7 items
../dummy_tests/always_fail/test_always_fails.py FF"I am the walrus"
.
../dummy_tests/always_pass/test_always_passes.py ..
../dummy_tests/always_pass/test_always_passes_too.py ..
=================================== FAILURES ===================================
______________________________ test_croak_and_die ______________________________
def test_croak_and_die():
> pytest.fail("I'm melting! I'm melting!")
E Failed: I'm melting! I'm melting!
../dummy_tests/always_fail/test_always_fails.py:5: Failed
____________________________ test_but_but_its_true _____________________________
def test_but_but_its_true():
> assert 'Janet' == 'Codemeister'
E AssertionError: assert 'Janet' == 'Codemeister'
E - Codemeister
E + Janet
../dummy_tests/always_fail/test_always_fails.py:9: AssertionError
=========================== short test summary info ============================
FAILED ../dummy_tests/always_fail/test_always_fails.py::test_croak_and_die - ...
FAILED ../dummy_tests/always_fail/test_always_fails.py::test_but_but_its_true
========================= 2 failed, 5 passed in 0.04s ==========================
5 passes, 0 errors, 2 failures, 0 skipped; elapsed time: 0.04s
Note that both options print the summary line.
""" Version information for this package; don't put anything else here. """
___version___ = "1.0.0a1.dev1"
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" The Test Runner """
import logging
import sys
from pathlib import Path
from typing import Dict, List
# pylint: disable=C0303, R0903, R1721, W0703, W1203
sys.path.insert(0, str(Path("../src").absolute()))
# Imports may differ on how and from where PackageTester is invoked.
try:
from .package_tester_utils import (
ENTRYPOINT,
PytestResultsParser,
TestedPackageInfo,
execute_shell_command,
)
except ImportError:
print(">>> FYI: using alternative imports")
from .package_tester_utils import (
ENTRYPOINT,
PytestResultsParser,
TestedPackageInfo,
execute_shell_command,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class PackageTester:
"""Runs pytest for every package specified.
This version has been refactored to be more self-contained and
manageable than previous version. It is invoked from an
entrypoint rather than a shell script.
"""
def __init__(self, packages: str, verbose=False):
self.package_names = packages
self.verbose = verbose or False
# to be List[TestPackageInfo]
self.pytested_packages = []
# to be List[str]
self.output = []
# to be Dict[str, Path]
self.locations_by_package = {}
def __str__(self):
"""
If there's output, don't include me
:return:
"""
return ""
def execute(self):
"""
This is where the magic happens. Find the packages the user submitted;
`pytest` each; organize, report, and store results.
:return:
"""
self.locations_by_package = PackageFinder(self.package_names).find()
if len(self.locations_by_package) == 0:
raise PackageTesterException("One or more packages not found.")
if len(self.locations_by_package) < len(self.package_names):
message = "\nOne or more of your packages is ambiguous; "
message += "please try again with a more qualified package."
raise PackageTesterException(message)
self.output.append(
f"{len(self.locations_by_package)}" " Package locations found:"
)
for package, location in self.locations_by_package.items():
self.output.append(f"\t{package}: {location}")
self.pytested_packages = self.process_packages()
self.summarize()
self.report()
def process_packages(self) -> List[TestedPackageInfo]:
"""
Run pytest on each of the packages.
:return:
"""
packages_info = []
for package, location in self.locations_by_package.items():
package_info = TestedPackageInfo(package, location)
# cd to package location, -then- run pytest
command = f'cd {location}; echo "Current directory: $(pwd)"; ' f"pytest -v"
output = execute_shell_command(command)
for line in output:
package_info.test_output.append(line)
packages_info.append(package_info)
return packages_info
def summarize(self):
"""
Summarize test results for each package tested.
:return:
"""
for package_info in self.pytested_packages:
result_line = package_info.test_output[-1]
result = PytestResultsParser(result_line).parse()
package_info.summary = result
def report(self):
"""
Report results.
:return:
"""
for package_info in self.pytested_packages:
if self.verbose:
details = [f"{line}" for line in package_info.test_output]
for line in details:
print(line)
print(f"{package_info.summary}")
def build_usage_message() -> str:
"""
Tell 'em how to use this thing.
:return:
"""
message = f"Usage: {ENTRYPOINT} pkg1 [pkg2 pkg3 ... pkg N]\n\n"
message += "pkg arguments are names of workspaces packages to be tested;\n"
message += 'e.g., "src.apps.cli.executables.datafetcher" or\n'
message += '"shared.channels"'
return message
class PackageFinder:
""" Finds paths corresponding to each submitted package name. """
def __init__(self, packages: List[str]):
# make sure we have something to work with
failure_message = "One or more package names required\n" + build_usage_message()
if (not packages) or len(packages) == 0:
raise PackageTesterException(failure_message)
for package in packages:
if package is None:
raise PackageTesterException(failure_message)
self.packages = packages
def find(self) -> Dict[str, Path]:
"""
Get each package's path.
:return:
"""
locations_by_package = {}
for package in self.packages:
for file in Path("/").rglob(package.replace(".", "/")):
# we don't want the built packages; just the source.
if "build" not in str(file):
locations_by_package[package] = file
break
if len(locations_by_package) != len(self.packages):
raise PackageTesterException(
f"expected {len(self.packages)} "
f"packages; got "
f"{len(locations_by_package)}"
)
return locations_by_package
class PackageTesterException(Exception):
""" Throw this whenever things go south. """
def main():
"""
`package-tester` entrypoint invokes this to launch the tester.
"""
args = sys.argv[1:]
if args is None or len(args) == 0:
failure_message = "One or more package names required\n" + build_usage_message()
logger.error(failure_message)
raise PackageTesterException(failure_message)
# let verbose be first OR last arg
package_names = []
verbose = False
for arg in args:
if arg in ("-v", "--verbose"):
verbose = True
else:
package_names.append(arg)
tester = PackageTester(package_names, verbose)
tester.execute()
if __name__ == "__main__":
main()
""" Utilities and helper classes for PackageTester """
import os
import re
import subprocess
import sys
from enum import Enum
from pathlib import Path
from typing import List
TIMESTAMP_FORMAT = "%Y-%m-%dT%H-%M-%S"
PACKAGE_TESTER = "package_tester.py"
ENTRYPOINT = "package-tester"
# pylint: disable=R0903
sys.path.insert(0, str(Path("../src").absolute()))
def execute_shell_command(command: str) -> List[str]:
"""
Run command in shell and return its output.
:param command:
:return: lines of command output
"""
output_lines = []
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=True,
shell=True,
) as proc:
for line in proc.stdout.readlines():
line = line.strip()
if len(line) > 0:
output_lines.append(line)
return output_lines
def get_project_root() -> Path:
"""
Get the root of this project, TODO: without depending on name of repo
or of Docker container top level.
:return:
"""
my_path = Path(__file__)
path = my_path
while not path.name.endswith("workspaces") and not path.name.endswith("code"):
path = path.parent
return path
def running_in_docker() -> bool:
"""
Are we executing in shell (rather than in a Docker container or IDE)?
:return:
"""
return "SHELL" not in os.environ.keys()
class TestedPackageInfo:
""" Holds metadata about a package's test(s) """
def __init__(self, package_name: str, location: Path):
self.package_name = package_name
self.location = location
self.summary = None
self.test_output = []
class ResultType(Enum):
""" These are statuses that pytest reports. """
# The success string string may be "passed". Thanks, pytest.
PASSES = "passes"
PASSED = "passed"
# Failure string may be "failed" or "failures". Thanks again.
FAILED = "failed"
FAILURES = "failures"
SKIP = "skipped"
ERROR = "error"
class PytestResult:
""" Encapsulates final line of pytest output, which has the results. """
def __init__(self, **kwargs):
self.failures = (
kwargs.get(ResultType.FAILED.value)
or kwargs.get(ResultType.FAILURES.value)
or 0
)
self.skips = kwargs.get(ResultType.SKIP.value) or 0
self.errors = kwargs.get(ResultType.ERROR.value) or 0
self.elapsed_time = kwargs.get("elapsed_time") or ""
self.passes = (
kwargs.get(ResultType.PASSED.value)
or kwargs.get(ResultType.PASSES.value)
or 0
)
def __str__(self):
return (
f"{self.passes} passes, "
f"{self.errors} errors, "
f"{self.failures} failures, "
f"{self.skips} skipped; elapsed time: {self.elapsed_time}"
)
def __eq__(self, other):
if not isinstance(other, PytestResult):
return False
if (
other.passes == self.passes
and other.failures == self.failures
and other.errors == self.errors
and other.skips == self.skips
):
self_elapsed = self.elapsed_time
other_elapsed = other.elapsed_time
# We don't expect an exact match
if self_elapsed is None:
return other_elapsed is None
return other_elapsed is not None
return False
def elapsed_time_numeric(self) -> float:
"""
Grab numeric portion of "elapsed time" string
:return:
"""
match = re.search("([0-9.]+)", self.elapsed_time)
return float(match.group(1)) if match else -1.0
def is_empty(self) -> bool:
"""
Did we actually get results for this package?
:return:
"""
return self.failures == self.passes == self.skips == self.errors == 0
class PytestResultsParser:
""" Trawls final line of pytest output to get the meat. """
def __init__(self, pytest_results_line: str):
self.pytest_results_line = pytest_results_line
def parse(self) -> PytestResult:
"""
Get the 411 out of the pytest summary line.
:return:
"""
counts = {}
for result_type in ResultType:
count = self.get_count(result_type)
if count < 0:
count = 0
counts[result_type] = count
result = PytestResult(
passed=counts[ResultType.PASSED] + counts[ResultType.PASSES],
failed=counts[ResultType.FAILED] + counts[ResultType.FAILURES],
skipped=counts[ResultType.SKIP],
error=counts[ResultType.ERROR],
elapsed_time=self.get_elapsed_time(),
)
return result
def get_count(self, result_type: ResultType) -> int:
"""
Find the count that pytest has reported for this result type.
:param result_type:
:return:
"""
key = result_type.value
if key not in self.pytest_results_line:
return 0
match = re.search(f"([0-9]+) {key}", self.pytest_results_line)
if match:
return int(match.group(1))
return -1
def get_elapsed_time(self) -> str:
"""
Parse elapsed time out of pytest result line.
:return:
"""
match = re.search(r"in ([.0-9]+[a-z])", self.pytest_results_line)
return match.group(1) if match else "-1.0"
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" This is the setup for the PackageTester. """
from pathlib import Path
from setuptools import setup
VERSION = open("package_tester/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
requires = ["pytest>=5.4,<6.0"]
setup(
name="ssa-" + Path().absolute().name,
version=VERSION,
description="NRAO Archive Test Runner",
long_description=README,
author="NRAO SSA Team",
author_email="dms-ssa@nrao.edu",
url="TBD",
license="GPL",
install_requires=requires,
keywords=["test"],
packages=["package_tester"],
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={
"console_scripts": ["package-tester = package_tester.package_tester:main"]
},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment