From 25ddef150def98a868c3ab070471b2f67117c1b1 Mon Sep 17 00:00:00 2001 From: Charlotte Hausman <chausman@nrao.edu> Date: Thu, 16 Sep 2021 15:36:10 -0400 Subject: [PATCH] submit to naasc as almapipe --- .../workspaces/test/test_workflow_service.py | 16 ++++----- .../services/remote_processing_service.py | 5 +++ .../workflow/services/workflow_service.py | 36 ++++++++++++------- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/shared/workspaces/test/test_workflow_service.py b/shared/workspaces/test/test_workflow_service.py index d05100095..eb4d3100a 100644 --- a/shared/workspaces/test/test_workflow_service.py +++ b/shared/workspaces/test/test_workflow_service.py @@ -189,8 +189,8 @@ class TestWorkflowService: mock_workflow_service._submit_dag = MagicMock() # Test DAG submission - mock_workflow_service._execute_prepared(fake_folder) - mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile) + mock_workflow_service._execute_prepared(fake_folder, False) + mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile, False) # DAG cleanup mock_workflow_service._submit_dag = orig_submit_dag @@ -204,8 +204,8 @@ class TestWorkflowService: mock_workflow_service._submit_job = MagicMock() # Test job submission - mock_workflow_service._execute_prepared(fake_folder) - mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile) + mock_workflow_service._execute_prepared(fake_folder, False) + mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile, False) # Cleanup mock_workflow_service._submit_job = orig_submit_job @@ -216,11 +216,11 @@ class TestWorkflowService: fake_jobfile = pathlib.Path("fake.submit") fake_jobfile.touch() with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: - log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile) + log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile, False) mock_sp_run.assert_called_with( ["condor_submit", fake_jobfile.name], cwd=str(fake_folder.absolute()), - preexec_fn=mock_workflow_service._switch_to_submituser, + preexec_fn=mock_workflow_service._switch_to_submituser(False), ) assert log_path.name == "fake.log" @@ -230,11 +230,11 @@ class TestWorkflowService: fake_folder = pathlib.Path("fake_folder") fake_dagfile = pathlib.Path("fake.dag") with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: - log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile) + log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile, False) mock_sp_run.assert_called_with( ["condor_submit_dag", "-f", fake_dagfile.name], cwd=str(fake_folder.absolute()), - preexec_fn=mock_workflow_service._switch_to_submituser, + preexec_fn=mock_workflow_service._switch_to_submituser(False), ) assert log_path.name == "fake.dag.dagman.log" diff --git a/shared/workspaces/workspaces/system/services/remote_processing_service.py b/shared/workspaces/workspaces/system/services/remote_processing_service.py index 4c50fb569..873650f33 100644 --- a/shared/workspaces/workspaces/system/services/remote_processing_service.py +++ b/shared/workspaces/workspaces/system/services/remote_processing_service.py @@ -50,6 +50,11 @@ class CapoInjector: wf_name=self.request.workflow_name ) + def is_naasc_workflow(self) -> bool: + parameters = self.request.argument + naasc = RemoteProcessingSites.NAASC.value + return naasc in parameters + def identify_subspace(self) -> ListView: """ Determine specific capo requirements for a remote workflow diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index fea0e5f4e..c323201a4 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF): # Is this a remote execution workflow? injector = CapoInjector(request) remote = injector.is_remote_workflow() + run_cv = injector.is_naasc_workflow() # if remote is true, create a capo subspace file in the request's directory if remote: @@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF): self.info.save_request(request) # execute condor and retrieve log file - log_file = self._execute_prepared(Path(request.results_dir)) + log_file = self._execute_prepared(Path(request.results_dir), run_cv) # start reading the log file (background it) logger.info("Running wf_monitor.") @@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF): logger.info("Making %s executable", file) file.chmod(file.stat().st_mode | stat.S_IEXEC) - def _execute_prepared(self, folder: Path) -> Path: + def _execute_prepared(self, folder: Path, run_cv: bool) -> Path: """ Execute HTCondor using the named folder as the source of the files. @@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF): dag_files = list(folder.glob("*.dag")) if dag_files: logger.info("dagman file %s exists.", dag_files[0]) - return self._submit_dag(folder, dag_files[0]) + return self._submit_dag(folder, dag_files[0], run_cv) else: job_file = list(folder.glob("*.condor"))[0] logger.info("condor file %s exists.", job_file) - return self._submit_job(folder, job_file) + return self._submit_job(folder, job_file, run_cv) - def _submit_job(self, folder: Path, job_file: Path) -> Path: + def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path: """ Submit job file to HTCondor @@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit", str(job_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile - def _submit_dag(self, folder: Path, dag_file: Path) -> Path: + def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path: """ Submit DAG file to HTCondor @@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit_dag", "-f", str(dag_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile @staticmethod - def _switch_to_submituser(): + def _switch_to_submituser(run_cv: bool): """ Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`; sets gid and uid of subprocess to those of the submituser so that jobs can be properly submitted """ - submituser_gid = 6000 - submituser_uid = 6000 + if run_cv: + # run as almapipe + submituser_gid = 9233 + submituser_uid = 9233 - os.setgid(submituser_gid) - os.setuid(submituser_uid) + os.setgid(submituser_gid) + os.setuid(submituser_uid) + else: + # run as vlapipe + submituser_gid = 6000 + submituser_uid = 6000 + + os.setgid(submituser_gid) + os.setuid(submituser_uid) @staticmethod def _get_job_logfile_name(jobfile_name: Path) -> Path: -- GitLab