Skip to content
Snippets Groups Projects
Commit 16c10e5f authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Changed the way workflow requests are created and submitted slightly to

make things work
parent 0faf6789
No related branches found
No related tags found
1 merge request!70Turned T H E B U T T O N into two buttons
Pipeline #479 passed
......@@ -114,9 +114,10 @@ class WorkflowRequestRestService:
"""
print(f"Submitting workflow {self.request.context}")
return self.request.workflows.execute(
self.request.context, self.request.json_body["files"]
)
# return self.request.workflows.execute(
# self.request.context, self.request.json_body["files"]
# )
return self.request.workflows.execute(self.request.context, [])
@view_defaults(route_name="workflow_request_files", renderer="json")
......@@ -135,9 +136,7 @@ class WorkflowFilesRestService:
Audience: front-end and CLI
"""
print(
f"Adding file {self.request.matchdict['filename']} to {self.request.context}"
)
print(f"Adding file {self.request.matchdict['filename']} to {self.request.context}")
file = self.request.info.save_file(
request=self.request.context,
filename=self.request.matchdict["filename"],
......@@ -237,9 +236,7 @@ def main(global_config, **settings):
reify=True,
)
# make workflow_service available for use in Pyramid
config.add_request_method(
lambda r: WorkflowService(r.info), "workflows", reify=True
)
config.add_request_method(lambda r: WorkflowService(r.info), "workflows", reify=True)
# GET /workflows <- list of workflows
# GET /workflows/null <- info about the null workflow
......
......@@ -4,7 +4,7 @@ import subprocess
import threading
from pathlib import Path
from tempfile import mkdtemp
from typing import List, Union, Dict
from typing import Dict, List, Union
import requests
from channels.amqp_helpers import Channel, WorkflowEventChannel
......@@ -17,17 +17,13 @@ from workspaces.workflow.schema import (
WorkflowRequest,
WorkflowRequestFile,
)
from workspaces.workflow.schema_interfaces import WorkflowRequestIF, WorkflowIF
from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
class WorkflowServiceRESTClient(WorkflowServiceIF):
def __init__(self):
self.url = (
CapoConfig()
.settings("edu.nrao.archive.workspaces.WorkflowSettings")
.serviceUrl
)
self.url = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings").serviceUrl
def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]):
# step 1: if necessary, pass the files up for this request
......@@ -38,15 +34,16 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
)
# step 2: execute the request
requests.post(
f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit"
)
requests.post(f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit")
def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
return requests.post(
f"{self.url}/workflows/{workflow}/requests/create?args={argument}"
def create_workflow_request(
self, workflow: Union[str, WorkflowIF], argument: Dict
) -> WorkflowRequestIF:
return requests.post(
f"{self.url}/workflows/requests/create", json={"workflow": workflow, "args": argument}
).json()
class WorkflowService(WorkflowServiceIF):
"""
Executes workflows; should be a freestanding service.
......@@ -81,16 +78,12 @@ class WorkflowService(WorkflowServiceIF):
# 2. render templates to files, returns list of rendered files
workflow_files = definition.render_templates(request.argument, files)
for file in workflow_files:
self.info.save_file(
request=request, filename=file.filename, content=file.content
)
self.info.save_file(request=request, filename=file.filename, content=file.content)
# 4. prepare files for condor execution
if not request.results_dir:
# Create temp results directory if the request doesn't already have one (from a previous execution attempt)
request.results_dir = str(
self._prepare_files_for_condor(workflow_files).absolute()
)
request.results_dir = str(self._prepare_files_for_condor(workflow_files).absolute())
self.info.save_request(request)
# 5. execute condor and retrieve log file
......@@ -157,9 +150,7 @@ class WorkflowService(WorkflowServiceIF):
# vulture is a workaround for testing locally without submitting to condor
print("submitting to vulture...")
subprocess.run(
["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())
)
subprocess.run(["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute()))
# return the logfile
return logfile
......@@ -179,7 +170,5 @@ class WorkflowService(WorkflowServiceIF):
else:
status = WorkflowRequestState.Running.name
print(
f"Updating state on workflow request {request.workflow_request_id} to {status}..."
)
print(f"Updating state on workflow request {request.workflow_request_id} to {status}...")
request.update_status(status)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment