diff --git a/shared/workspaces/test/test_workflow_service.py b/shared/workspaces/test/test_workflow_service.py index d05100095d7d398d2f26edfd5fc8d0eaa351618b..eb4d3100a13434e1ba19b15b16b8667bbf8b78d2 100644 --- a/shared/workspaces/test/test_workflow_service.py +++ b/shared/workspaces/test/test_workflow_service.py @@ -189,8 +189,8 @@ class TestWorkflowService: mock_workflow_service._submit_dag = MagicMock() # Test DAG submission - mock_workflow_service._execute_prepared(fake_folder) - mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile) + mock_workflow_service._execute_prepared(fake_folder, False) + mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile, False) # DAG cleanup mock_workflow_service._submit_dag = orig_submit_dag @@ -204,8 +204,8 @@ class TestWorkflowService: mock_workflow_service._submit_job = MagicMock() # Test job submission - mock_workflow_service._execute_prepared(fake_folder) - mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile) + mock_workflow_service._execute_prepared(fake_folder, False) + mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile, False) # Cleanup mock_workflow_service._submit_job = orig_submit_job @@ -216,11 +216,11 @@ class TestWorkflowService: fake_jobfile = pathlib.Path("fake.submit") fake_jobfile.touch() with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: - log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile) + log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile, False) mock_sp_run.assert_called_with( ["condor_submit", fake_jobfile.name], cwd=str(fake_folder.absolute()), - preexec_fn=mock_workflow_service._switch_to_submituser, + preexec_fn=mock_workflow_service._switch_to_submituser(False), ) assert log_path.name == "fake.log" @@ -230,11 +230,11 @@ class TestWorkflowService: fake_folder = pathlib.Path("fake_folder") fake_dagfile = pathlib.Path("fake.dag") with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: - log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile) + log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile, False) mock_sp_run.assert_called_with( ["condor_submit_dag", "-f", fake_dagfile.name], cwd=str(fake_folder.absolute()), - preexec_fn=mock_workflow_service._switch_to_submituser, + preexec_fn=mock_workflow_service._switch_to_submituser(False), ) assert log_path.name == "fake.dag.dagman.log" diff --git a/shared/workspaces/workspaces/system/services/remote_processing_service.py b/shared/workspaces/workspaces/system/services/remote_processing_service.py index 4c50fb5695b7356066cb58da811a768cb7b86c80..873650f33b226bcc860afd0db2a0351b6ab98478 100644 --- a/shared/workspaces/workspaces/system/services/remote_processing_service.py +++ b/shared/workspaces/workspaces/system/services/remote_processing_service.py @@ -50,6 +50,11 @@ class CapoInjector: wf_name=self.request.workflow_name ) + def is_naasc_workflow(self) -> bool: + parameters = self.request.argument + naasc = RemoteProcessingSites.NAASC.value + return naasc in parameters + def identify_subspace(self) -> ListView: """ Determine specific capo requirements for a remote workflow diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index fea0e5f4e1ad55c60b4ec626eb3bab47c83ac28a..c323201a48e8f90ad56e737a6d6b0326747ef201 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF): # Is this a remote execution workflow? injector = CapoInjector(request) remote = injector.is_remote_workflow() + run_cv = injector.is_naasc_workflow() # if remote is true, create a capo subspace file in the request's directory if remote: @@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF): self.info.save_request(request) # execute condor and retrieve log file - log_file = self._execute_prepared(Path(request.results_dir)) + log_file = self._execute_prepared(Path(request.results_dir), run_cv) # start reading the log file (background it) logger.info("Running wf_monitor.") @@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF): logger.info("Making %s executable", file) file.chmod(file.stat().st_mode | stat.S_IEXEC) - def _execute_prepared(self, folder: Path) -> Path: + def _execute_prepared(self, folder: Path, run_cv: bool) -> Path: """ Execute HTCondor using the named folder as the source of the files. @@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF): dag_files = list(folder.glob("*.dag")) if dag_files: logger.info("dagman file %s exists.", dag_files[0]) - return self._submit_dag(folder, dag_files[0]) + return self._submit_dag(folder, dag_files[0], run_cv) else: job_file = list(folder.glob("*.condor"))[0] logger.info("condor file %s exists.", job_file) - return self._submit_job(folder, job_file) + return self._submit_job(folder, job_file, run_cv) - def _submit_job(self, folder: Path, job_file: Path) -> Path: + def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path: """ Submit job file to HTCondor @@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit", str(job_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile - def _submit_dag(self, folder: Path, dag_file: Path) -> Path: + def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path: """ Submit DAG file to HTCondor @@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit_dag", "-f", str(dag_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile @staticmethod - def _switch_to_submituser(): + def _switch_to_submituser(run_cv: bool): """ Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`; sets gid and uid of subprocess to those of the submituser so that jobs can be properly submitted """ - submituser_gid = 6000 - submituser_uid = 6000 + if run_cv: + # run as almapipe + submituser_gid = 9233 + submituser_uid = 9233 - os.setgid(submituser_gid) - os.setuid(submituser_uid) + os.setgid(submituser_gid) + os.setuid(submituser_uid) + else: + # run as vlapipe + submituser_gid = 6000 + submituser_uid = 6000 + + os.setgid(submituser_gid) + os.setuid(submituser_uid) @staticmethod def _get_job_logfile_name(jobfile_name: Path) -> Path: