Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Showing
with 37 additions and 21 deletions
def test_placeholder():
pass
\ No newline at end of file
# TODO: Write unit tests for AMQP helpers with mocked AMQP
def test_placeholder():
pass
\ No newline at end of file
def test_placeholder():
pass
......@@ -213,10 +213,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column(
"current_workflow_request_id",
sa.Integer,
sa.ForeignKey(WorkflowRequest.workflow_request_id),
sa.Integer
)
current_workflow_request = relationship(WorkflowRequest)
# current_workflow_request = relationship(WorkflowRequest)
__table_args__ = (
sa.ForeignKeyConstraint(
......@@ -235,6 +234,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
current_workflow_request_id=self.current_workflow_request_id,
)
def update_execution(self, workflow_request_id: int):
self.current_workflow_request_id = workflow_request_id
@property
def capability(self):
return self.version.capability
......
......@@ -43,7 +43,7 @@ class CapabilityExecutionIF:
state: str
current_step: int
version: CapabilityVersionIF
current_workflow_request: WorkflowRequestIF
current_workflow_request_id: int
steps: str
capability: CapabilityIF
capability_request: CapabilityRequestIF
......
......@@ -7,7 +7,6 @@ from workspaces.capability.services.interfaces import (
CapabilityInfoIF,
)
from workspaces.system.schema import AbstractFile
from workspaces.workflow.schema import WorkflowRequest
from workspaces.workflow.services.interfaces import WorkflowServiceIF
......@@ -36,16 +35,21 @@ class CapabilityEngine(CapabilityEngineIF):
cur_step.execute(self, execution)
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
self, execution_id: int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
"""
Submit a workflow request to the workflow service
:param execution_id:
:param workflow_name:
:param workflow_args:
:param files:
"""
workflow_request = WorkflowRequest(
workflow_name=workflow_name, argument=workflow_args
)
workflow_request = self.workflow_service.create_workflow_request(workflow=workflow_name, argument=workflow_args)
execution = self.capability_info.lookup_execution(execution_id)
execution.update_execution(workflow_request['workflow_request_id'])
self.capability_info.save_execution(execution)
self.workflow_service.execute(workflow_request, files)
from typing import List
import transaction
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
......@@ -143,3 +144,4 @@ class CapabilityInfo(CapabilityInfoIF):
def save_execution(self, execution: CapabilityExecutionIF):
self.session.add(execution)
transaction.commit()
......@@ -108,7 +108,7 @@ class CapabilityEngineIF(ABC):
@abstractmethod
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
self, execution_id:int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
pass
......
......@@ -26,6 +26,9 @@ class WorkflowServiceIF(ABC):
"""
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF:
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
class WorkflowInfoIF(ABC):
"""
......
......@@ -4,7 +4,7 @@ import subprocess
import threading
from pathlib import Path
from tempfile import mkdtemp
from typing import List
from typing import List, Union, Dict
import requests
from channels.amqp_helpers import Channel, WorkflowEventChannel
......@@ -17,7 +17,7 @@ from workspaces.workflow.schema import (
WorkflowRequest,
WorkflowRequestFile,
)
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
from workspaces.workflow.schema_interfaces import WorkflowRequestIF, WorkflowIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
......@@ -30,23 +30,22 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
)
def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]):
# step 1: create the request
req = requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/create?args={request.argument}"
).json()
# step 2: if necessary, pass the files up for this request
# step 1: if necessary, pass the files up for this request
for file in files:
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/files/{file.filename}",
f"{self.url}/workflows/requests/{request['workflow_request_id']}/files/{file.filename}",
body=file.content,
)
# step 3: execute the request
# step 2: execute the request
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/submit"
f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit"
)
def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
return requests.post(
f"{self.url}/workflows/{workflow}/requests/create?args={argument}"
).json()
class WorkflowService(WorkflowServiceIF):
"""
......