Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Showing
with 37 additions and 21 deletions
def test_placeholder():
pass
\ No newline at end of file
# TODO: Write unit tests for AMQP helpers with mocked AMQP # TODO: Write unit tests for AMQP helpers with mocked AMQP
def test_placeholder():
pass
\ No newline at end of file
def test_placeholder():
pass
...@@ -213,10 +213,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF): ...@@ -213,10 +213,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
version = relationship(CapabilityVersion, back_populates="executions") version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column( current_workflow_request_id = sa.Column(
"current_workflow_request_id", "current_workflow_request_id",
sa.Integer, sa.Integer
sa.ForeignKey(WorkflowRequest.workflow_request_id),
) )
current_workflow_request = relationship(WorkflowRequest) # current_workflow_request = relationship(WorkflowRequest)
__table_args__ = ( __table_args__ = (
sa.ForeignKeyConstraint( sa.ForeignKeyConstraint(
...@@ -235,6 +234,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF): ...@@ -235,6 +234,9 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
current_workflow_request_id=self.current_workflow_request_id, current_workflow_request_id=self.current_workflow_request_id,
) )
def update_execution(self, workflow_request_id: int):
self.current_workflow_request_id = workflow_request_id
@property @property
def capability(self): def capability(self):
return self.version.capability return self.version.capability
......
...@@ -43,7 +43,7 @@ class CapabilityExecutionIF: ...@@ -43,7 +43,7 @@ class CapabilityExecutionIF:
state: str state: str
current_step: int current_step: int
version: CapabilityVersionIF version: CapabilityVersionIF
current_workflow_request: WorkflowRequestIF current_workflow_request_id: int
steps: str steps: str
capability: CapabilityIF capability: CapabilityIF
capability_request: CapabilityRequestIF capability_request: CapabilityRequestIF
......
...@@ -7,7 +7,6 @@ from workspaces.capability.services.interfaces import ( ...@@ -7,7 +7,6 @@ from workspaces.capability.services.interfaces import (
CapabilityInfoIF, CapabilityInfoIF,
) )
from workspaces.system.schema import AbstractFile from workspaces.system.schema import AbstractFile
from workspaces.workflow.schema import WorkflowRequest
from workspaces.workflow.services.interfaces import WorkflowServiceIF from workspaces.workflow.services.interfaces import WorkflowServiceIF
...@@ -36,16 +35,21 @@ class CapabilityEngine(CapabilityEngineIF): ...@@ -36,16 +35,21 @@ class CapabilityEngine(CapabilityEngineIF):
cur_step.execute(self, execution) cur_step.execute(self, execution)
def submit_workflow_request( def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile] self, execution_id: int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
): ):
""" """
Submit a workflow request to the workflow service Submit a workflow request to the workflow service
:param execution_id:
:param workflow_name: :param workflow_name:
:param workflow_args: :param workflow_args:
:param files: :param files:
""" """
workflow_request = WorkflowRequest( workflow_request = self.workflow_service.create_workflow_request(workflow=workflow_name, argument=workflow_args)
workflow_name=workflow_name, argument=workflow_args
) execution = self.capability_info.lookup_execution(execution_id)
execution.update_execution(workflow_request['workflow_request_id'])
self.capability_info.save_execution(execution)
self.workflow_service.execute(workflow_request, files) self.workflow_service.execute(workflow_request, files)
from typing import List from typing import List
import transaction
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
...@@ -143,3 +144,4 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -143,3 +144,4 @@ class CapabilityInfo(CapabilityInfoIF):
def save_execution(self, execution: CapabilityExecutionIF): def save_execution(self, execution: CapabilityExecutionIF):
self.session.add(execution) self.session.add(execution)
transaction.commit()
...@@ -108,7 +108,7 @@ class CapabilityEngineIF(ABC): ...@@ -108,7 +108,7 @@ class CapabilityEngineIF(ABC):
@abstractmethod @abstractmethod
def submit_workflow_request( def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile] self, execution_id:int, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
): ):
pass pass
......
...@@ -26,6 +26,9 @@ class WorkflowServiceIF(ABC): ...@@ -26,6 +26,9 @@ class WorkflowServiceIF(ABC):
""" """
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF:
raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}")
class WorkflowInfoIF(ABC): class WorkflowInfoIF(ABC):
""" """
......
...@@ -4,7 +4,7 @@ import subprocess ...@@ -4,7 +4,7 @@ import subprocess
import threading import threading
from pathlib import Path from pathlib import Path
from tempfile import mkdtemp from tempfile import mkdtemp
from typing import List from typing import List, Union, Dict
import requests import requests
from channels.amqp_helpers import Channel, WorkflowEventChannel from channels.amqp_helpers import Channel, WorkflowEventChannel
...@@ -17,7 +17,7 @@ from workspaces.workflow.schema import ( ...@@ -17,7 +17,7 @@ from workspaces.workflow.schema import (
WorkflowRequest, WorkflowRequest,
WorkflowRequestFile, WorkflowRequestFile,
) )
from workspaces.workflow.schema_interfaces import WorkflowRequestIF from workspaces.workflow.schema_interfaces import WorkflowRequestIF, WorkflowIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
...@@ -30,23 +30,22 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ...@@ -30,23 +30,22 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
) )
def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]): def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]):
# step 1: create the request # step 1: if necessary, pass the files up for this request
req = requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/create?args={request.argument}"
).json()
# step 2: if necessary, pass the files up for this request
for file in files: for file in files:
requests.post( requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/files/{file.filename}", f"{self.url}/workflows/requests/{request['workflow_request_id']}/files/{file.filename}",
body=file.content, body=file.content,
) )
# step 3: execute the request # step 2: execute the request
requests.post( requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/submit" f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit"
) )
def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
return requests.post(
f"{self.url}/workflows/{workflow}/requests/create?args={argument}"
).json()
class WorkflowService(WorkflowServiceIF): class WorkflowService(WorkflowServiceIF):
""" """
......