Skip to content
Snippets Groups Projects
test_workflow_service.py 973 B
Newer Older
import pytest
import requests
from pycapo import CapoConfig


@pytest.mark.skip(
    reason="Test needs to be refactored to NOT use wf_monitor (i.e. make it a unit test)"
)
def test_workflow_request_execution():
    """
    Test whether a null workflow request can properly be executed
    """
    settings = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings")
    r1 = requests.post(f"{settings.serviceUrl}/workflows/null/requests/create?args=-g")
    request_id = r1.json()["workflow_request_id"]
    r2 = requests.post(f"{settings.serviceUrl}/workflows/requests/{request_id}/submit")
    # Assert workflow request execution was received with no error
    assert r2.status_code == 200
    r3 = requests.get(f"{settings.serviceUrl}/workflows/requests/{request_id}")
    # Assert results_dir was properly assigned
    assert len(r3.json()["results_dir"]) > 0
    # Assert workflow request is in completed state
    assert r3.json()["state"] == "Complete"