Newer
Older

Nathan Hertz
committed
import pytest
import requests
from pycapo import CapoConfig
@pytest.mark.skip(
reason="Test needs to be refactored to NOT use wf_monitor (i.e. make it a unit test)"
)
def test_workflow_request_execution():
"""
Test whether a null workflow request can properly be executed
"""
settings = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings")
r1 = requests.post(f"{settings.serviceUrl}/workflows/null/requests/create?args=-g")
request_id = r1.json()["workflow_request_id"]
r2 = requests.post(f"{settings.serviceUrl}/workflows/requests/{request_id}/submit")
# Assert workflow request execution was received with no error
assert r2.status_code == 200
r3 = requests.get(f"{settings.serviceUrl}/workflows/requests/{request_id}")
# Assert results_dir was properly assigned
assert len(r3.json()["results_dir"]) > 0
# Assert workflow request is in completed state
assert r3.json()["state"] == "Complete"