diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index 28ba047d23c0560bb9fe685b6338da34895bcc7a..fba75de36b5c91d3534da6ac8477743ac91782e6 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -114,9 +114,10 @@ class WorkflowRequestRestService: """ print(f"Submitting workflow {self.request.context}") - return self.request.workflows.execute( - self.request.context, self.request.json_body["files"] - ) + # return self.request.workflows.execute( + # self.request.context, self.request.json_body["files"] + # ) + return self.request.workflows.execute(self.request.context, []) @view_defaults(route_name="workflow_request_files", renderer="json") @@ -135,9 +136,7 @@ class WorkflowFilesRestService: Audience: front-end and CLI """ - print( - f"Adding file {self.request.matchdict['filename']} to {self.request.context}" - ) + print(f"Adding file {self.request.matchdict['filename']} to {self.request.context}") file = self.request.info.save_file( request=self.request.context, filename=self.request.matchdict["filename"], @@ -237,9 +236,7 @@ def main(global_config, **settings): reify=True, ) # make workflow_service available for use in Pyramid - config.add_request_method( - lambda r: WorkflowService(r.info), "workflows", reify=True - ) + config.add_request_method(lambda r: WorkflowService(r.info), "workflows", reify=True) # GET /workflows <- list of workflows # GET /workflows/null <- info about the null workflow diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 1aafef7430de88a5c67850c67475e341d0c1ee9c..534ff5b527c1f0177153efdd902bf3be92daf182 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -4,7 +4,7 @@ import subprocess import threading from pathlib import Path from tempfile import mkdtemp -from typing import List, Union, Dict +from typing import Dict, List, Union import requests from channels.amqp_helpers import Channel, WorkflowEventChannel @@ -17,17 +17,13 @@ from workspaces.workflow.schema import ( WorkflowRequest, WorkflowRequestFile, ) -from workspaces.workflow.schema_interfaces import WorkflowRequestIF, WorkflowIF +from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF class WorkflowServiceRESTClient(WorkflowServiceIF): def __init__(self): - self.url = ( - CapoConfig() - .settings("edu.nrao.archive.workspaces.WorkflowSettings") - .serviceUrl - ) + self.url = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings").serviceUrl def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]): # step 1: if necessary, pass the files up for this request @@ -38,15 +34,16 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ) # step 2: execute the request - requests.post( - f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit" - ) + requests.post(f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit") - def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF: - return requests.post( - f"{self.url}/workflows/{workflow}/requests/create?args={argument}" + def create_workflow_request( + self, workflow: Union[str, WorkflowIF], argument: Dict + ) -> WorkflowRequestIF: + return requests.post( + f"{self.url}/workflows/requests/create", json={"workflow": workflow, "args": argument} ).json() + class WorkflowService(WorkflowServiceIF): """ Executes workflows; should be a freestanding service. @@ -81,16 +78,12 @@ class WorkflowService(WorkflowServiceIF): # 2. render templates to files, returns list of rendered files workflow_files = definition.render_templates(request.argument, files) for file in workflow_files: - self.info.save_file( - request=request, filename=file.filename, content=file.content - ) + self.info.save_file(request=request, filename=file.filename, content=file.content) # 4. prepare files for condor execution if not request.results_dir: # Create temp results directory if the request doesn't already have one (from a previous execution attempt) - request.results_dir = str( - self._prepare_files_for_condor(workflow_files).absolute() - ) + request.results_dir = str(self._prepare_files_for_condor(workflow_files).absolute()) self.info.save_request(request) # 5. execute condor and retrieve log file @@ -157,9 +150,7 @@ class WorkflowService(WorkflowServiceIF): # vulture is a workaround for testing locally without submitting to condor print("submitting to vulture...") - subprocess.run( - ["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute()) - ) + subprocess.run(["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())) # return the logfile return logfile @@ -179,7 +170,5 @@ class WorkflowService(WorkflowServiceIF): else: status = WorkflowRequestState.Running.name - print( - f"Updating state on workflow request {request.workflow_request_id} to {status}..." - ) + print(f"Updating state on workflow request {request.workflow_request_id} to {status}...") request.update_status(status)