Skip to content
Snippets Groups Projects
Commit 502542e2 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

submit to naasc as almapipe

parent de092317
No related branches found
No related tags found
1 merge request!512submit to naasc as almapipe
This commit is part of merge request !512. Comments created here will be created in the context of that merge request.
......@@ -50,6 +50,11 @@ class CapoInjector:
wf_name=self.request.workflow_name
)
def is_naasc_workflow(self) -> bool:
parameters = self.request.argument
naasc = RemoteProcessingSites.NAASC.value
return naasc in parameters
def identify_subspace(self) -> ListView:
"""
Determine specific capo requirements for a remote workflow
......
......@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF):
# Is this a remote execution workflow?
injector = CapoInjector(request)
remote = injector.is_remote_workflow()
run_cv = injector.is_naasc_workflow()
# if remote is true, create a capo subspace file in the request's directory
if remote:
......@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF):
self.info.save_request(request)
# execute condor and retrieve log file
log_file = self._execute_prepared(Path(request.results_dir))
log_file = self._execute_prepared(Path(request.results_dir), run_cv)
# start reading the log file (background it)
logger.info("Running wf_monitor.")
......@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF):
logger.info("Making %s executable", file)
file.chmod(file.stat().st_mode | stat.S_IEXEC)
def _execute_prepared(self, folder: Path) -> Path:
def _execute_prepared(self, folder: Path, run_cv: bool) -> Path:
"""
Execute HTCondor using the named folder as the source of the files.
......@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF):
dag_files = list(folder.glob("*.dag"))
if dag_files:
logger.info("dagman file %s exists.", dag_files[0])
return self._submit_dag(folder, dag_files[0])
return self._submit_dag(folder, dag_files[0], run_cv)
else:
job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file)
return self._submit_job(folder, job_file, run_cv)
def _submit_job(self, folder: Path, job_file: Path) -> Path:
def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path:
"""
Submit job file to HTCondor
......@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run(
["condor_submit", str(job_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
preexec_fn=self._switch_to_submituser(run_cv),
)
# return the logfile
return logfile
def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path:
"""
Submit DAG file to HTCondor
......@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run(
["condor_submit_dag", "-f", str(dag_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
preexec_fn=self._switch_to_submituser(run_cv),
)
# return the logfile
return logfile
@staticmethod
def _switch_to_submituser():
def _switch_to_submituser(run_cv: bool):
"""
Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`;
sets gid and uid of subprocess to those of the submituser so that jobs can be
properly submitted
"""
submituser_gid = 6000
submituser_uid = 6000
if run_cv:
# run as almapipe
submituser_gid = 9233
submituser_uid = 9233
os.setgid(submituser_gid)
os.setuid(submituser_uid)
os.setgid(submituser_gid)
os.setuid(submituser_uid)
else:
# run as vlapipe
submituser_gid = 6000
submituser_uid = 6000
os.setgid(submituser_gid)
os.setuid(submituser_uid)
@staticmethod
def _get_job_logfile_name(jobfile_name: Path) -> Path:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment