Skip to content
Snippets Groups Projects
Commit 7295ccd4 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Merge branch 'release/WS-0.1' of https://open-bitbucket.nrao.edu/scm/ssa/data into release/WS-0.1

parents ef77b00a 05dbdb5f
No related branches found
No related tags found
No related merge requests found
......@@ -96,6 +96,23 @@ class WorkflowRequestRestService:
request = self.request.info.create_workflow_request(
self.request.context, self.request.GET.getall("args")
)
return request\
@view_config(request_method="POST", route_name="create_execution_workflow_request")
def create_execution_workflow_request(self):
"""
Create a new workflow request from the name/arguments supplied.
Audience: front-end and CLI
:return:
"""
# all we should have to do here is take the WorkflowRequest from the context and
# hand it to WorkflowInfo to save it, but we're still conflating
# workflows and workflow requests right now
request = self.request.info.create_workflow_request(
self.request.context, self.request.GET.getall("args"), self.request.matchdict['id']
)
return request
@view_config(request_method="POST", route_name="submit_workflow_request")
......@@ -237,6 +254,7 @@ def main(global_config, **settings):
# GET /workflows <- list of workflows
# GET /workflows/null <- info about the null workflow
# POST /workflows/null/requests/create <- create a request for the null workflow
# POST /workflows/null/execution/123/create <- create a request for the null workflow for execution #123
# PUT /workflows/requests/23/files/foo.txt <- attach foo.txt to request #23 on workflow null
# POST /workflows/requests/23/submit <- launch request #23
......@@ -247,6 +265,11 @@ def main(global_config, **settings):
"/workflows/{name}/requests/create",
factory=lookup_workflow,
)
config.add_route(
"create_execution_workflow_request",
"/workflows/{name}/execution/{id}/create",
factory=lookup_workflow,
)
config.add_route(
"workflow_request",
"/workflows/requests/{request_id}",
......
......@@ -351,7 +351,7 @@ class Workflow(Base, WorkflowIF):
def __repr__(self):
return f"<Workflow workflow_name={self.workflow_name}>"
def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]:
def render_templates(self, argument: Dict, files: List[Path] = None) -> List[AbstractFile]:
"""
Render the templates associated with this workflow
:param argument: the workflow argument JSON
......@@ -429,9 +429,7 @@ class WorkflowRequest(Base, WorkflowRequestIF):
)
argument = sa.Column("argument", sa.JSON)
state = sa.Column("state", sa.String)
execution_id = sa.Column(
"execution_id", sa.Integer, sa.ForeignKey("capability_executions.id")
)
execution_id = sa.Column("execution_id", sa.Integer, sa.ForeignKey("capability_execution.execution_id"))
files = relationship("WorkflowRequestFile", backref="request")
@property
......@@ -442,10 +440,10 @@ class WorkflowRequest(Base, WorkflowRequestIF):
def create_request(self, request: AbstractRequest):
self.workflow_name, self.argument = request.name, request.argument
# TODO: create following fields in table
def update_status(self, status: str):
self.state = status
# TODO: create following fields in table
def set_start_time(self, time: str):
self.start_time = time
......@@ -459,8 +457,12 @@ class WorkflowRequest(Base, WorkflowRequestIF):
argument=self.argument,
state=self.state,
execution_id=self.execution_id,
files=self.files
)
def __getitem__(self, item):
return self.files
def __repr__(self):
return f"<WorkflowRequest workflow_request_id= {self.workflow_request_id}>"
......
......@@ -445,21 +445,15 @@ class WorkflowService(WorkflowServiceIF):
# self.channel = workflow_events.listen(self.on_workflow_event)
self.info = info
def execute(self, request: WorkflowRequest):
def execute(self, request: WorkflowRequest, files: List[Path] = None):
"""
Execute a workflow per the supplied parameters.
"""
# 1. look up workflow, returns workflow
definition = self.info.lookup_workflow_definition(request.workflow_name)
# # 2. create and save request
request = self.info.create_workflow_request(
request.workflow_name, request.execution_id, request.argument, request.files
)
# 3. render templates to files, returns list of rendered files
files_list = request.files.split(" ")
workflow_files = definition.render_templates(request.argument, files_list)
# 2. render templates to files, returns list of rendered files
workflow_files = definition.render_templates(request.argument, files)
for file in workflow_files:
self.info.save_file(
request=request, filename=file.filename, content=file.content
......@@ -533,47 +527,38 @@ class WorkflowService(WorkflowServiceIF):
# vulture is a workaround for testing locally without submitting to condor
print("submitting to vulture...")
subprocess.run(
["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())
)
subprocess.run(["vulture", 'job', 'execute', str(condor)], cwd=str(folder.absolute()))
# return the logfile
return logfile
@staticmethod
def on_workflow_event(
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(workflow_events.send)
# 1. send amqp event to workflow channel
decorated_workflow_send(event)
# 2. update request record with corresponding status
if event.type == WorkflowEventType.EXECUTING.name:
# Workflow has begun executing
status: str
# 2. update request record with new status
if event.type == WorkflowEventType.EXECUTING:
status = WorkflowRequestState.Running.name
elif event.type == WorkflowEventType.TERMINATED.name:
# Workflow has terminated
elif event.type == WorkflowEventType.TERMINATED:
if event.retval == 0:
# Workflow completed successfully
status = WorkflowRequestState.Complete.name
else:
# Workflow failed
status = WorkflowRequestState.Failed.name
else:
# Workflow request state does not need to be updated
return
print(
f"Updating state on workflow request {request_record.workflow_request_id} to {WorkflowRequestState.Running}..."
)
status = WorkflowRequestState.Running.name
print(f"Updating state on workflow request {request_record.workflow_request_id} to {status}...")
request_record.update_status(status)
......@@ -595,26 +580,20 @@ class WorkflowInfo(WorkflowInfoIF):
return self.session.query(Workflow).all()
def create_workflow_request(
self,
workflow_name: str,
execution_id: int,
argument: str = None,
files: List[Path] = [],
self, workflow: Workflow, argument: Dict, execution_id: int = None
) -> WorkflowRequest:
"""
Create new workflow request and save to database
:param workflow_name: Name of workflow to run
:param execution_id: ID of the execution that made this request
:param argument: Workflow arguments
:param files: Files that the workflow will use
:param workflow: workflow to run
:param argument: workflow arguments
:param execution_id: execution id of parent capability execution (optional for now)
:return: new WorkflowRequest
"""
request = WorkflowRequest(
workflow_name=workflow_name,
workflow_name=workflow.workflow_name,
argument=argument,
state=WorkflowRequestState.Created.name,
execution_id=execution_id,
files=" ".join(files),
execution_id=execution_id
)
self.save_request(request)
return request
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment