Skip to content
Snippets Groups Projects
Commit 86d198d9 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

SWS-5: fix capability persistence

parent 5e59c109
No related branches found
No related tags found
1 merge request!61SWS-5: fix capability persistence
Pipeline #448 passed
......@@ -213,10 +213,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column(
"current_workflow_request_id",
sa.Integer,
sa.ForeignKey(WorkflowRequest.workflow_request_id),
sa.Integer
)
current_workflow_request = relationship(WorkflowRequest)
# current_workflow_request = relationship(WorkflowRequest)
__table_args__ = (
sa.ForeignKeyConstraint(
......@@ -235,6 +234,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
current_workflow_request_id=self.current_workflow_request_id,
)
def update_execution(self, workflow_request_id: int):
self.current_workflow_request_id = workflow_request_id
@property
def capability(self):
return self.version.capability
......
......@@ -43,7 +43,7 @@ class CapabilityExecutionIF:
state: str
current_step: int
version: CapabilityVersionIF
current_workflow_request: WorkflowRequestIF
current_workflow_request_id: int
steps: str
capability: CapabilityIF
capability_request: CapabilityRequestIF
......
......@@ -7,7 +7,6 @@ from workspaces.capability.services.interfaces import (
CapabilityInfoIF,
)
from workspaces.system.schema import AbstractFile
from workspaces.workflow.schema import WorkflowRequest
from workspaces.workflow.services.interfaces import WorkflowServiceIF
......@@ -36,16 +35,21 @@ class CapabilityEngine(CapabilityEngineIF):
cur_step.execute(self, execution)
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
self, execution_id: int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
"""
Submit a workflow request to the workflow service
:param execution_id:
:param workflow_name:
:param workflow_args:
:param files:
"""
workflow_request = WorkflowRequest(
workflow_name=workflow_name, argument=workflow_args
)
workflow_request = self.workflow_service.create_workflow_request(workflow=workflow_name, argument=workflow_args)
execution = self.capability_info.lookup_execution(execution_id)
execution.update_execution(workflow_request['workflow_request_id'])
self.capability_info.save_execution(execution)
self.workflow_service.execute(workflow_request, files)
from typing import List
import transaction
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
......@@ -143,3 +144,4 @@ class CapabilityInfo(CapabilityInfoIF):
def save_execution(self, execution: CapabilityExecutionIF):
self.session.add(execution)
transaction.commit()
......@@ -108,7 +108,7 @@ class CapabilityEngineIF(ABC):
@abstractmethod
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
self, execution_id:int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
pass
......
......@@ -26,6 +26,9 @@ class WorkflowServiceIF(ABC):
"""
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF:
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
class WorkflowInfoIF(ABC):
"""
......
......@@ -4,7 +4,7 @@ import subprocess
import threading
from pathlib import Path
from tempfile import mkdtemp
from typing import List
from typing import List, Union, Dict
import requests
from channels.amqp_helpers import Channel, WorkflowEventChannel
......@@ -17,7 +17,7 @@ from workspaces.workflow.schema import (
WorkflowRequest,
WorkflowRequestFile,
)
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
from workspaces.workflow.schema_interfaces import WorkflowRequestIF, WorkflowIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
......@@ -30,23 +30,22 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
)
def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]):
# step 1: create the request
req = requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/create?args={request.argument}"
).json()
# step 2: if necessary, pass the files up for this request
# step 1: if necessary, pass the files up for this request
for file in files:
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/files/{file.filename}",
f"{self.url}/workflows/requests/{request['workflow_request_id']}/files/{file.filename}",
body=file.content,
)
# step 3: execute the request
# step 2: execute the request
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/submit"
f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit"
)
def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
return requests.post(
f"{self.url}/workflows/{workflow}/requests/create?args={argument}"
).json()
class WorkflowService(WorkflowServiceIF):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment