Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (2)
...@@ -6,10 +6,18 @@ the NRAO Archive or upon completion of a Workspaces image processing request. ...@@ -6,10 +6,18 @@ the NRAO Archive or upon completion of a Workspaces image processing request.
## Usage ## Usage
`carta_envoy [-h] -d DIRECTORY [-n NOTIFY] [-r READY] [--parallel] [--single] [-wf WORKFLOW_REQUEST] [-i IMAGE]` `carta_envoy [-h] -d DIRECTORY [-n NOTIFY] [-r READY] [--parallel] [--single] [-wf WORKFLOW_REQUEST] [-i IMAGE]`
## Example ## Examples
To run carta envoy from command line: example on wirth-vml
### To run carta envoy from command line: example on wirth-vml
1. download image(s) from webtest 1. download image(s) from webtest
2. `ssh vlapipe@wirth-vml` 2. `ssh vlapipe@wirth-vml`
3. `activate_profile dsoc-dev` 3. `activate_profile dsoc-dev`
4. cd download location on lustre, e.g., */lustre/aoc/cluster/pipeline/dsoc-dev/spool/820190220* 4. cd download location on lustre, e.g., */lustre/aoc/cluster/pipeline/dsoc-dev/spool/820190220*
5. `/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces/sbin/carta_envoy -d ./VLASS1.1.ql.T01t01.J000228-363000.10.2048.v1/rawdata -r true -n jgoldste@nrao.edu` 5. `/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces/sbin/carta_envoy -d ./VLASS1.1.ql.T01t01.J000228-363000.10.2048.v1/rawdata -r true -n jgoldste@nrao.edu`
### To run CARTA from the workspaces UI on dev
Pick a completed capability request from the database that has image products and open the request page for it;
there will be a "launch carta" button.
https://ws-dev.nrao.edu/workspaces/request-status/21
![img.png](img.png)
\ No newline at end of file
apps/cli/executables/pexable/carta_envoy/img.png

159 KiB

...@@ -59,13 +59,13 @@ class TestCapabilityService: ...@@ -59,13 +59,13 @@ class TestCapabilityService:
:return: :return:
""" """
wf_request_id = -1 wf_request_id = -2
carta_url = "decartes_image_carta_url" carta_url = "decartes_image_carta_url"
fake_carta_ready_msg = { fake_carta_ready_msg = {
"service": "capability", "service": "capability",
"routing_key": "capability", "routing_key": "capability",
"carta_url": carta_url, "carta_url": carta_url,
"subject": {"workflow_request_id": wf_request_id}, "subject": {"workflow_request_id": wf_request_id, "argument": {"parent_wf_request_id": -1}},
"type": "carta-ready", "type": "carta-ready",
} }
......
...@@ -189,8 +189,8 @@ class TestWorkflowService: ...@@ -189,8 +189,8 @@ class TestWorkflowService:
mock_workflow_service._submit_dag = MagicMock() mock_workflow_service._submit_dag = MagicMock()
# Test DAG submission # Test DAG submission
mock_workflow_service._execute_prepared(fake_folder) mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile) mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile, False)
# DAG cleanup # DAG cleanup
mock_workflow_service._submit_dag = orig_submit_dag mock_workflow_service._submit_dag = orig_submit_dag
...@@ -204,8 +204,8 @@ class TestWorkflowService: ...@@ -204,8 +204,8 @@ class TestWorkflowService:
mock_workflow_service._submit_job = MagicMock() mock_workflow_service._submit_job = MagicMock()
# Test job submission # Test job submission
mock_workflow_service._execute_prepared(fake_folder) mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile) mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile, False)
# Cleanup # Cleanup
mock_workflow_service._submit_job = orig_submit_job mock_workflow_service._submit_job = orig_submit_job
...@@ -216,11 +216,11 @@ class TestWorkflowService: ...@@ -216,11 +216,11 @@ class TestWorkflowService:
fake_jobfile = pathlib.Path("fake.submit") fake_jobfile = pathlib.Path("fake.submit")
fake_jobfile.touch() fake_jobfile.touch()
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile) log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile, False)
mock_sp_run.assert_called_with( mock_sp_run.assert_called_with(
["condor_submit", fake_jobfile.name], ["condor_submit", fake_jobfile.name],
cwd=str(fake_folder.absolute()), cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser, preexec_fn=mock_workflow_service._switch_to_submituser(False),
) )
assert log_path.name == "fake.log" assert log_path.name == "fake.log"
...@@ -230,11 +230,11 @@ class TestWorkflowService: ...@@ -230,11 +230,11 @@ class TestWorkflowService:
fake_folder = pathlib.Path("fake_folder") fake_folder = pathlib.Path("fake_folder")
fake_dagfile = pathlib.Path("fake.dag") fake_dagfile = pathlib.Path("fake.dag")
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run: with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile) log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile, False)
mock_sp_run.assert_called_with( mock_sp_run.assert_called_with(
["condor_submit_dag", "-f", fake_dagfile.name], ["condor_submit_dag", "-f", fake_dagfile.name],
cwd=str(fake_folder.absolute()), cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser, preexec_fn=mock_workflow_service._switch_to_submituser(False),
) )
assert log_path.name == "fake.dag.dagman.log" assert log_path.name == "fake.dag.dagman.log"
......
...@@ -171,9 +171,10 @@ class CapabilityService(CapabilityServiceIF): ...@@ -171,9 +171,10 @@ class CapabilityService(CapabilityServiceIF):
""" """
logger.info(f"RECEIVED CARTA READY MESSAGE: {message}") logger.info(f"RECEIVED CARTA READY MESSAGE: {message}")
wf_request_id = int(message["subject"]["workflow_request_id"]) subject = message["subject"]
parent_wf_request_id = int(subject["argument"]["parent_wf_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(wf_request_id) execution = self.capability_info.lookup_execution_by_workflow_request_id(parent_wf_request_id)
request_version = execution.version request_version = execution.version
request_version.workflow_metadata = {"carta_url": message["carta_url"]} request_version.workflow_metadata = {"carta_url": message["carta_url"]}
......
...@@ -173,6 +173,12 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta): ...@@ -173,6 +173,12 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod @abstractmethod
def save_execution(self, execution: CapabilityExecutionIF): def save_execution(self, execution: CapabilityExecutionIF):
pass pass
......
...@@ -50,6 +50,11 @@ class CapoInjector: ...@@ -50,6 +50,11 @@ class CapoInjector:
wf_name=self.request.workflow_name wf_name=self.request.workflow_name
) )
def is_naasc_workflow(self) -> bool:
parameters = self.request.argument
naasc = RemoteProcessingSites.NAASC.value
return naasc in parameters
def identify_subspace(self) -> ListView: def identify_subspace(self) -> ListView:
""" """
Determine specific capo requirements for a remote workflow Determine specific capo requirements for a remote workflow
......
...@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF):
# Is this a remote execution workflow? # Is this a remote execution workflow?
injector = CapoInjector(request) injector = CapoInjector(request)
remote = injector.is_remote_workflow() remote = injector.is_remote_workflow()
run_cv = injector.is_naasc_workflow()
# if remote is true, create a capo subspace file in the request's directory # if remote is true, create a capo subspace file in the request's directory
if remote: if remote:
...@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF):
self.info.save_request(request) self.info.save_request(request)
# execute condor and retrieve log file # execute condor and retrieve log file
log_file = self._execute_prepared(Path(request.results_dir)) log_file = self._execute_prepared(Path(request.results_dir), run_cv)
# start reading the log file (background it) # start reading the log file (background it)
logger.info("Running wf_monitor.") logger.info("Running wf_monitor.")
...@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF):
logger.info("Making %s executable", file) logger.info("Making %s executable", file)
file.chmod(file.stat().st_mode | stat.S_IEXEC) file.chmod(file.stat().st_mode | stat.S_IEXEC)
def _execute_prepared(self, folder: Path) -> Path: def _execute_prepared(self, folder: Path, run_cv: bool) -> Path:
""" """
Execute HTCondor using the named folder as the source of the files. Execute HTCondor using the named folder as the source of the files.
...@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF):
dag_files = list(folder.glob("*.dag")) dag_files = list(folder.glob("*.dag"))
if dag_files: if dag_files:
logger.info("dagman file %s exists.", dag_files[0]) logger.info("dagman file %s exists.", dag_files[0])
return self._submit_dag(folder, dag_files[0]) return self._submit_dag(folder, dag_files[0], run_cv)
else: else:
job_file = list(folder.glob("*.condor"))[0] job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file) logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file) return self._submit_job(folder, job_file, run_cv)
def _submit_job(self, folder: Path, job_file: Path) -> Path: def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path:
""" """
Submit job file to HTCondor Submit job file to HTCondor
...@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run( subprocess.run(
["condor_submit", str(job_file)], ["condor_submit", str(job_file)],
cwd=str(folder.absolute()), cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser, preexec_fn=self._switch_to_submituser(run_cv),
) )
# return the logfile # return the logfile
return logfile return logfile
def _submit_dag(self, folder: Path, dag_file: Path) -> Path: def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path:
""" """
Submit DAG file to HTCondor Submit DAG file to HTCondor
...@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF): ...@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run( subprocess.run(
["condor_submit_dag", "-f", str(dag_file)], ["condor_submit_dag", "-f", str(dag_file)],
cwd=str(folder.absolute()), cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser, preexec_fn=self._switch_to_submituser(run_cv),
) )
# return the logfile # return the logfile
return logfile return logfile
@staticmethod @staticmethod
def _switch_to_submituser(): def _switch_to_submituser(run_cv: bool):
""" """
Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`; Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`;
sets gid and uid of subprocess to those of the submituser so that jobs can be sets gid and uid of subprocess to those of the submituser so that jobs can be
properly submitted properly submitted
""" """
submituser_gid = 6000 if run_cv:
submituser_uid = 6000 # run as almapipe
submituser_gid = 9233
submituser_uid = 9233
os.setgid(submituser_gid) os.setgid(submituser_gid)
os.setuid(submituser_uid) os.setuid(submituser_uid)
else:
# run as vlapipe
submituser_gid = 6000
submituser_uid = 6000
os.setgid(submituser_gid)
os.setuid(submituser_uid)
@staticmethod @staticmethod
def _get_job_logfile_name(jobfile_name: Path) -> Path: def _get_job_logfile_name(jobfile_name: Path) -> Path:
......