Skip to content
Snippets Groups Projects
Commit 25ddef15 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

submit to naasc as almapipe

parent de092317
No related branches found
No related tags found
1 merge request!512submit to naasc as almapipe
Pipeline #2939 passed
...@@ -189,8 +189,8 @@ class TestWorkflowService: ...@@ -189,8 +189,8 @@ class TestWorkflowService:
mock_workflow_service._submit_dag = MagicMock() mock_workflow_service._submit_dag = MagicMock()
# Test DAG submission # Test DAG submission
mock_workflow_service._execute_prepared(fake_folder) mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile) mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile, False)
# DAG cleanup # DAG cleanup
mock_workflow_service._submit_dag = orig_submit_dag mock_workflow_service._submit_dag = orig_submit_dag
...@@ -204,8 +204,8 @@ class TestWorkflowService: ...@@ -204,8 +204,8 @@ class TestWorkflowService:
mock_workflow_service._submit_job = MagicMock() mock_workflow_service._submit_job = MagicMock()
# Test job submission # Test job submission
mock_workflow_service._execute_prepared(fake_folder) mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile) mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile, False)
# Cleanup # Cleanup
mock_workflow_service._submit_job = orig_submit_job mock_workflow_service._submit_job = orig_submit_job
...@@ -216,11 +216,11 @@ class TestWorkflowService: ...@@ -216,11 +216,11 @@ class TestWorkflowService:
fake_jobfile = pathlib.Path("fake.submit") fake_jobfile = pathlib.Path("fake.submit")
fake_jobfile.touch() fake_jobfile.touch()
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile) log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile, False)
mock_sp_run.assert_called_with( mock_sp_run.assert_called_with(
["condor_submit", fake_jobfile.name], ["condor_submit", fake_jobfile.name],
cwd=str(fake_folder.absolute()), cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser, preexec_fn=mock_workflow_service._switch_to_submituser(False),
) )
assert log_path.name == "fake.log" assert log_path.name == "fake.log"
...@@ -230,11 +230,11 @@ class TestWorkflowService: ...@@ -230,11 +230,11 @@ class TestWorkflowService:
fake_folder = pathlib.Path("fake_folder") fake_folder = pathlib.Path("fake_folder")
fake_dagfile = pathlib.Path("fake.dag") fake_dagfile = pathlib.Path("fake.dag")
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile) log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile, False)
mock_sp_run.assert_called_with( mock_sp_run.assert_called_with(
["condor_submit_dag", "-f", fake_dagfile.name], ["condor_submit_dag", "-f", fake_dagfile.name],
cwd=str(fake_folder.absolute()), cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser, preexec_fn=mock_workflow_service._switch_to_submituser(False),
) )
assert log_path.name == "fake.dag.dagman.log" assert log_path.name == "fake.dag.dagman.log"
......
...@@ -50,6 +50,11 @@ class CapoInjector: ...@@ -50,6 +50,11 @@ class CapoInjector:
wf_name=self.request.workflow_name wf_name=self.request.workflow_name
) )
def is_naasc_workflow(self) -> bool:
parameters = self.request.argument
naasc = RemoteProcessingSites.NAASC.value
return naasc in parameters
def identify_subspace(self) -> ListView: def identify_subspace(self) -> ListView:
""" """
Determine specific capo requirements for a remote workflow Determine specific capo requirements for a remote workflow
......
...@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF):
# Is this a remote execution workflow? # Is this a remote execution workflow?
injector = CapoInjector(request) injector = CapoInjector(request)
remote = injector.is_remote_workflow() remote = injector.is_remote_workflow()
run_cv = injector.is_naasc_workflow()
# if remote is true, create a capo subspace file in the request's directory # if remote is true, create a capo subspace file in the request's directory
if remote: if remote:
...@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF):
self.info.save_request(request) self.info.save_request(request)
# execute condor and retrieve log file # execute condor and retrieve log file
log_file = self._execute_prepared(Path(request.results_dir)) log_file = self._execute_prepared(Path(request.results_dir), run_cv)
# start reading the log file (background it) # start reading the log file (background it)
logger.info("Running wf_monitor.") logger.info("Running wf_monitor.")
...@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF):
logger.info("Making %s executable", file) logger.info("Making %s executable", file)
file.chmod(file.stat().st_mode | stat.S_IEXEC) file.chmod(file.stat().st_mode | stat.S_IEXEC)
def _execute_prepared(self, folder: Path) -> Path: def _execute_prepared(self, folder: Path, run_cv: bool) -> Path:
""" """
Execute HTCondor using the named folder as the source of the files. Execute HTCondor using the named folder as the source of the files.
...@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF):
dag_files = list(folder.glob("*.dag")) dag_files = list(folder.glob("*.dag"))
if dag_files: if dag_files:
logger.info("dagman file %s exists.", dag_files[0]) logger.info("dagman file %s exists.", dag_files[0])
return self._submit_dag(folder, dag_files[0]) return self._submit_dag(folder, dag_files[0], run_cv)
else: else:
job_file = list(folder.glob("*.condor"))[0] job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file) logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file) return self._submit_job(folder, job_file, run_cv)
def _submit_job(self, folder: Path, job_file: Path) -> Path: def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path:
""" """
Submit job file to HTCondor Submit job file to HTCondor
...@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run( subprocess.run(
["condor_submit", str(job_file)], ["condor_submit", str(job_file)],
cwd=str(folder.absolute()), cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser, preexec_fn=self._switch_to_submituser(run_cv),
) )
# return the logfile # return the logfile
return logfile return logfile
def _submit_dag(self, folder: Path, dag_file: Path) -> Path: def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path:
""" """
Submit DAG file to HTCondor Submit DAG file to HTCondor
...@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF): ...@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run( subprocess.run(
["condor_submit_dag", "-f", str(dag_file)], ["condor_submit_dag", "-f", str(dag_file)],
cwd=str(folder.absolute()), cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser, preexec_fn=self._switch_to_submituser(run_cv),
) )
# return the logfile # return the logfile
return logfile return logfile
@staticmethod @staticmethod
def _switch_to_submituser(): def _switch_to_submituser(run_cv: bool):
""" """
Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`; Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`;
sets gid and uid of subprocess to those of the submituser so that jobs can be sets gid and uid of subprocess to those of the submituser so that jobs can be
properly submitted properly submitted
""" """
submituser_gid = 6000 if run_cv:
submituser_uid = 6000 # run as almapipe
submituser_gid = 9233
submituser_uid = 9233
os.setgid(submituser_gid) os.setgid(submituser_gid)
os.setuid(submituser_uid) os.setuid(submituser_uid)
else:
# run as vlapipe
submituser_gid = 6000
submituser_uid = 6000
os.setgid(submituser_gid)
os.setuid(submituser_uid)
@staticmethod @staticmethod
def _get_job_logfile_name(jobfile_name: Path) -> Path: def _get_job_logfile_name(jobfile_name: Path) -> Path:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment