Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (2)
......@@ -6,10 +6,18 @@ the NRAO Archive or upon completion of a Workspaces image processing request.
## Usage
`carta_envoy [-h] -d DIRECTORY [-n NOTIFY] [-r READY] [--parallel] [--single] [-wf WORKFLOW_REQUEST] [-i IMAGE]`
## Example
To run carta envoy from command line: example on wirth-vml
## Examples
### To run carta envoy from command line: example on wirth-vml
1. download image(s) from webtest
2. `ssh vlapipe@wirth-vml`
3. `activate_profile dsoc-dev`
4. cd download location on lustre, e.g., */lustre/aoc/cluster/pipeline/dsoc-dev/spool/820190220*
5. `/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces/sbin/carta_envoy -d ./VLASS1.1.ql.T01t01.J000228-363000.10.2048.v1/rawdata -r true -n jgoldste@nrao.edu`
### To run CARTA from the workspaces UI on dev
Pick a completed capability request from the database that has image products and open the request page for it;
there will be a "launch carta" button.
https://ws-dev.nrao.edu/workspaces/request-status/21
![img.png](img.png)
\ No newline at end of file
apps/cli/executables/pexable/carta_envoy/img.png

159 KiB

......@@ -59,13 +59,13 @@ class TestCapabilityService:
:return:
"""
wf_request_id = -1
wf_request_id = -2
carta_url = "decartes_image_carta_url"
fake_carta_ready_msg = {
"service": "capability",
"routing_key": "capability",
"carta_url": carta_url,
"subject": {"workflow_request_id": wf_request_id},
"subject": {"workflow_request_id": wf_request_id, "argument": {"parent_wf_request_id": -1}},
"type": "carta-ready",
}
......
......@@ -189,8 +189,8 @@ class TestWorkflowService:
mock_workflow_service._submit_dag = MagicMock()
# Test DAG submission
mock_workflow_service._execute_prepared(fake_folder)
mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile)
mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile, False)
# DAG cleanup
mock_workflow_service._submit_dag = orig_submit_dag
......@@ -204,8 +204,8 @@ class TestWorkflowService:
mock_workflow_service._submit_job = MagicMock()
# Test job submission
mock_workflow_service._execute_prepared(fake_folder)
mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile)
mock_workflow_service._execute_prepared(fake_folder, False)
mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile, False)
# Cleanup
mock_workflow_service._submit_job = orig_submit_job
......@@ -216,11 +216,11 @@ class TestWorkflowService:
fake_jobfile = pathlib.Path("fake.submit")
fake_jobfile.touch()
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile)
log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile, False)
mock_sp_run.assert_called_with(
["condor_submit", fake_jobfile.name],
cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser,
preexec_fn=mock_workflow_service._switch_to_submituser(False),
)
assert log_path.name == "fake.log"
......@@ -230,11 +230,11 @@ class TestWorkflowService:
fake_folder = pathlib.Path("fake_folder")
fake_dagfile = pathlib.Path("fake.dag")
with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile)
log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile, False)
mock_sp_run.assert_called_with(
["condor_submit_dag", "-f", fake_dagfile.name],
cwd=str(fake_folder.absolute()),
preexec_fn=mock_workflow_service._switch_to_submituser,
preexec_fn=mock_workflow_service._switch_to_submituser(False),
)
assert log_path.name == "fake.dag.dagman.log"
......
......@@ -171,9 +171,10 @@ class CapabilityService(CapabilityServiceIF):
"""
logger.info(f"RECEIVED CARTA READY MESSAGE: {message}")
wf_request_id = int(message["subject"]["workflow_request_id"])
subject = message["subject"]
parent_wf_request_id = int(subject["argument"]["parent_wf_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(wf_request_id)
execution = self.capability_info.lookup_execution_by_workflow_request_id(parent_wf_request_id)
request_version = execution.version
request_version.workflow_metadata = {"carta_url": message["carta_url"]}
......
......@@ -173,6 +173,12 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def save_execution(self, execution: CapabilityExecutionIF):
pass
......
......@@ -50,6 +50,11 @@ class CapoInjector:
wf_name=self.request.workflow_name
)
def is_naasc_workflow(self) -> bool:
parameters = self.request.argument
naasc = RemoteProcessingSites.NAASC.value
return naasc in parameters
def identify_subspace(self) -> ListView:
"""
Determine specific capo requirements for a remote workflow
......
......@@ -227,6 +227,7 @@ class WorkflowService(WorkflowServiceIF):
# Is this a remote execution workflow?
injector = CapoInjector(request)
remote = injector.is_remote_workflow()
run_cv = injector.is_naasc_workflow()
# if remote is true, create a capo subspace file in the request's directory
if remote:
......@@ -262,7 +263,7 @@ class WorkflowService(WorkflowServiceIF):
self.info.save_request(request)
# execute condor and retrieve log file
log_file = self._execute_prepared(Path(request.results_dir))
log_file = self._execute_prepared(Path(request.results_dir), run_cv)
# start reading the log file (background it)
logger.info("Running wf_monitor.")
......@@ -410,7 +411,7 @@ class WorkflowService(WorkflowServiceIF):
logger.info("Making %s executable", file)
file.chmod(file.stat().st_mode | stat.S_IEXEC)
def _execute_prepared(self, folder: Path) -> Path:
def _execute_prepared(self, folder: Path, run_cv: bool) -> Path:
"""
Execute HTCondor using the named folder as the source of the files.
......@@ -423,13 +424,13 @@ class WorkflowService(WorkflowServiceIF):
dag_files = list(folder.glob("*.dag"))
if dag_files:
logger.info("dagman file %s exists.", dag_files[0])
return self._submit_dag(folder, dag_files[0])
return self._submit_dag(folder, dag_files[0], run_cv)
else:
job_file = list(folder.glob("*.condor"))[0]
logger.info("condor file %s exists.", job_file)
return self._submit_job(folder, job_file)
return self._submit_job(folder, job_file, run_cv)
def _submit_job(self, folder: Path, job_file: Path) -> Path:
def _submit_job(self, folder: Path, job_file: Path, run_cv: bool) -> Path:
"""
Submit job file to HTCondor
......@@ -447,13 +448,13 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run(
["condor_submit", str(job_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
preexec_fn=self._switch_to_submituser(run_cv),
)
# return the logfile
return logfile
def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
def _submit_dag(self, folder: Path, dag_file: Path, run_cv: bool) -> Path:
"""
Submit DAG file to HTCondor
......@@ -471,24 +472,33 @@ class WorkflowService(WorkflowServiceIF):
subprocess.run(
["condor_submit_dag", "-f", str(dag_file)],
cwd=str(folder.absolute()),
preexec_fn=self._switch_to_submituser,
preexec_fn=self._switch_to_submituser(run_cv),
)
# return the logfile
return logfile
@staticmethod
def _switch_to_submituser():
def _switch_to_submituser(run_cv: bool):
"""
Helper function that is meant to be called as a `preexec_fn` for `subprocess.run`;
sets gid and uid of subprocess to those of the submituser so that jobs can be
properly submitted
"""
submituser_gid = 6000
submituser_uid = 6000
if run_cv:
# run as almapipe
submituser_gid = 9233
submituser_uid = 9233
os.setgid(submituser_gid)
os.setuid(submituser_uid)
os.setgid(submituser_gid)
os.setuid(submituser_uid)
else:
# run as vlapipe
submituser_gid = 6000
submituser_uid = 6000
os.setgid(submituser_gid)
os.setuid(submituser_uid)
@staticmethod
def _get_job_logfile_name(jobfile_name: Path) -> Path:
......