From 502542e23141ea49fae5e79d9ea30e3af59ffe28 Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Thu, 16 Sep 2021 11:16:47 -0600 Subject: [PATCH] submit to naasc as almapipe --- .../services/remote_processing_service.py | 5 +++ .../workflow/services/workflow_service.py | 36 ++++++++++++------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/shared/workspaces/workspaces/system/services/remote_processing_service.py b/shared/workspaces/workspaces/system/services/remote_processing_service.py index 4c50fb569..873650f33 100644 --- a/shared/workspaces/workspaces/system/services/remote_processing_service.py +++ b/shared/workspaces/workspaces/system/services/remote_processing_service.py @@ -50,6 +50,11 @@ class CapoInjector: wf_name=self.request.workflow_name ) + def is_naasc_workflow(self) -> bool: + parameters = self.request.argument + naasc = RemoteProcessingSites.NAASC.value + return naasc in parameters + def identify_subspace(self) -> ListView: """ Determine specific capo requirements for a remote workflow diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index fea0e5f4e..c323201a4 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF): # Is this a remote execution workflow? injector = CapoInjector(request) remote = injector.is_remote_workflow() + run_cv = injector.is_naasc_workflow() # if remote is true, create a capo subspace file in the request's directory if remote: @@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF): self.info.save_request(request) # execute condor and retrieve log file - log_file = self._execute_prepared(Path(request.results_dir)) + log_file = self._execute_prepared(Path(request.results_dir), run_cv) # start reading the log file (background it) logger.info("Running wf_monitor.") @@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF): logger.info("Making %s executable", file) file.chmod(file.stat().st_mode | stat.S_IEXEC) - def _execute_prepared(self, folder: Path) -> Path: + def _execute_prepared(self, folder: Path, run_cv: bool) -> Path: """ Execute HTCondor using the named folder as the source of the files. @@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF): dag_files = list(folder.glob("*.dag")) if dag_files: logger.info("dagman file %s exists.", dag_files[0]) - return self._submit_dag(folder, dag_files[0]) + return self._submit_dag(folder, dag_files[0], run_cv) else: job_file = list(folder.glob("*.condor"))[0] logger.info("condor file %s exists.", job_file) - return self._submit_job(folder, job_file) + return self._submit_job(folder, job_file, run_cv) - def _submit_job(self, folder: Path, job_file: Path) -> Path: + def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path: """ Submit job file to HTCondor @@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit", str(job_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile - def _submit_dag(self, folder: Path, dag_file: Path) -> Path: + def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path: """ Submit DAG file to HTCondor @@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF): subprocess.run( ["condor_submit_dag", "-f", str(dag_file)], cwd=str(folder.absolute()), - preexec_fn=self._switch_to_submituser, + preexec_fn=self._switch_to_submituser(run_cv), ) # return the logfile return logfile @staticmethod - def _switch_to_submituser(): + def _switch_to_submituser(run_cv: bool): """ Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`; sets gid and uid of subprocess to those of the submituser so that jobs can be properly submitted """ - submituser_gid = 6000 - submituser_uid = 6000 + if run_cv: + # run as almapipe + submituser_gid = 9233 + submituser_uid = 9233 - os.setgid(submituser_gid) - os.setuid(submituser_uid) + os.setgid(submituser_gid) + os.setuid(submituser_uid) + else: + # run as vlapipe + submituser_gid = 6000 + submituser_uid = 6000 + + os.setgid(submituser_gid) + os.setuid(submituser_uid) @staticmethod def _get_job_logfile_name(jobfile_name: Path) -> Path: -- GitLab