Skip to content
Snippets Groups Projects

WS-677: Multi-stage workflow research & development

Merged Nathan Hertz requested to merge WS-677-dag-support-for-workflows into main
1 file
+ 82
0
Compare changes
  • Side-by-side
  • Inline
""" Tests for the workflow service """
import json
import os
import pathlib
import shutil
from typing import List
# pylint: disable=E0401, R0201, W0212
@@ -133,6 +135,7 @@ class TestWorkflowService:
},
"type": "workflow-complete",
}
assert request.state == "Ready"
with patch("workspaces.workflow.schema.WorkflowRequest.update_status") as mock_update:
@@ -175,6 +178,85 @@ class TestWorkflowService:
usable = mock_workflow_service._determine_usable_files(request, files)
assert len(usable) == 3
def test_execute_prepared(self, mock_workflow_service: WorkflowService, tmp_path: pathlib.Path):
# Create fake folder and DAG file
fake_folder = tmp_path / "test"
fake_folder.mkdir()
fake_dagfile = fake_folder / "test.dag"
fake_dagfile.touch()
orig_submit_dag = mock_workflow_service._submit_dag
mock_workflow_service._submit_dag = MagicMock()
# Test DAG submission
mock_workflow_service._execute_prepared(fake_folder)
mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile)
# DAG cleanup
mock_workflow_service._submit_dag = orig_submit_dag
os.remove(fake_dagfile)
# Create fake submit file
fake_jobfile = fake_folder / "test.condor"
fake_jobfile.touch()
orig_submit_job = mock_workflow_service._submit_job
mock_workflow_service._submit_job = MagicMock()
# Test job submission
mock_workflow_service._execute_prepared(fake_folder)
mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile)
# Cleanup
mock_workflow_service._submit_job = orig_submit_job
shutil.rmtree(tmp_path)
def test_submit_job(self, mock_workflow_service: WorkflowService):
fake_folder = pathlib.Path("fake_folder")
fake_jobfile = pathlib.Path("fake.submit")
fake_jobfile.touch()
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile)
mock_sp_run.assert_called_with(
["condor_submit", fake_jobfile.name],
cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser,
)
assert log_path.name == "fake.log"
os.remove(fake_jobfile)
def test_submit_dag(self, mock_workflow_service: WorkflowService):
fake_folder = pathlib.Path("fake_folder")
fake_dagfile = pathlib.Path("fake.dag")
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile)
mock_sp_run.assert_called_with(
["condor_submit_dag", "-f", fake_dagfile.name],
cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser,
)
assert log_path.name == "fake.dag.dagman.log"
def test_get_jobfile_log(self, tmp_path: pathlib.Path, mock_workflow_service: WorkflowService):
with open(tmp_path / "fake.submit", "w") as fake_jobfile:
# Test with file that doesn't containe "log = " entry
log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit")
assert log_path.name == "fake.log"
# Write "log = " entry
fake_jobfile.write("log = test.log")
# Test with same file that now contains "log = " entry
log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit")
assert log_path.name == "test.log"
# Remove tmp_path just in case
shutil.rmtree(tmp_path)
def test_get_dagfile_log(self, mock_workflow_service: WorkflowService):
log_path = mock_workflow_service._get_dag_logfile_name(pathlib.Path("fake.dag"))
assert log_path.name == "fake.dag.dagman.log"
@pytest.mark.skip("test broken")
@patch("workspaces.workflow.services.workflow_service.Router")
def test_send_carta_url_to_aat(self, mock_router: MagicMock, mock_workflow_service: WorkflowService):
Loading