Skip to content
Snippets Groups Projects
Commit ce689d31 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Schema and CapabilityInfo fixes

parent c8d6f62a
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ Interfaces used by the Workspaces' capabilities system.
import inspect
from abc import ABC, abstractmethod
from typing import List, Union, Optional, Any
from typing import List, Union, Optional, Any, Type
from .product_interfaces import ProductIF, FutureProductIF
......@@ -107,31 +107,9 @@ class CapabilityInfoIF(ABC):
def lookup_capability(self, capability_name: str) -> CapabilityIF:
raise NotImplementedError
@abstractmethod
def lookup_entity(
self,
entity_id: int,
entity_schema: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF],
) -> Optional[Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF]]:
"""
Look up entity in database and return object representation of it if found
:param entity_id: ID of entity to be searched for
:param entity_schema: Database schema of the entity
:return: Object representation of entity if found, else None
"""
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def save_entity(
self, entity: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF]
) -> int:
"""
Save a given entity and return an integer identifier for it
:param entity: The entity to save
:return: The entity's identifier
"""
raise NotImplementedError()
class ProjectSettingsIF(ABC):
"""
......
......@@ -252,7 +252,7 @@ class CapabilityRequest(Base, CapabilityRequestIF):
parameters = sa.Column("parameters", sa.String)
# FIXME: This needs to be changed to properly keep track of product locators.
# future_products = sa.Column('future_products', sa.String)
versions = relationship("CapabilityVersion", back_populates="capability_request")
versions = relationship("CapabilityVersion", back_populates="request")
capability = relationship(Capability, back_populates="requests")
def update_status(self, status: str):
......@@ -285,10 +285,12 @@ class CapabilityVersion(Base, CapabilityVersionIF):
)
version_number = sa.Column("version_number", sa.Integer, primary_key=True)
parameters = sa.Column('parameters', sa.String)
capability_request = relationship(CapabilityRequest, back_populates="versions")
executions = relationship(
"CapabilityExecution", back_populates="capability_version"
)
request = relationship(CapabilityRequest, back_populates="versions")
executions = relationship("CapabilityExecution", back_populates="version")
@property
def capability(self):
return self.request.capability
class CapabilityExecution(Base, CapabilityExecutionIF):
......@@ -308,7 +310,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
version_number = sa.Column("capability_version_number", sa.Integer)
current_step = sa.Column("current_step", sa.Integer)
steps = sa.Column('steps', sa.String)
capability_version = relationship(CapabilityVersion, back_populates="executions")
version = relationship(CapabilityVersion, back_populates="executions")
__table_args__ = (
sa.ForeignKeyConstraint(
......@@ -326,6 +328,10 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
current_step=self.current_step,
)
@property
def capability(self):
return self.version.capability
class Workflow(Base):
"""
......
......@@ -11,7 +11,7 @@ from typing import Dict, List, Union, Optional, Type
from sqlalchemy.orm import Session
from workflow.event_catcher import EventCatcher
from workspaces.capability_interfaces import CapabilityIF
from workspaces.capability_interfaces import CapabilityIF, CapabilityExecutionIF
from .capability_interfaces import (
CapabilityServiceIF,
......@@ -42,7 +42,7 @@ from .schema import (
get_session_factory,
WorkflowRequest,
AbstractFile,
CapabilityEvent, CapabilityVersion,
CapabilityEvent, CapabilityVersion, Base,
)
from channels.amqp_helpers import (
workflow_events,
......@@ -98,9 +98,7 @@ class CapabilityService(CapabilityServiceIF):
:param execution_id: ID of execution record
:param event: Incoming event
"""
execution = self.capability_info.lookup_entity(
execution_id, CapabilityExecution
)
execution = self.capability_info.lookup_execution(execution_id)
step_sequence = CapabilitySequence.from_str(execution.steps)
current_step = step_sequence[execution.current_step]
event_type = CapabilityEventType[event.event_type]
......@@ -119,34 +117,20 @@ class CapabilityService(CapabilityServiceIF):
def enqueue_execution(
self,
execution_record: CapabilityExecution,
execution: CapabilityExecution,
priority: int = ExecutionPriority.Default.value,
):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
queue
"""
request = self.capability_info.lookup_entity(
execution_record.capability_request_id, CapabilityRequest
)
capability = self.capability_info.lookup_entity(Capability, request.capability)
# Get correct queue or initialize one
queue = self.queues.get(capability.id, CapabilityQueue(capability.max_jobs))
queue.enqueue(execution_record, priority)
self.queues[capability.id] = queue
queue = self.queues.get(execution.capability.id, CapabilityQueue(execution.capability.max_jobs))
queue.enqueue(execution, priority)
self.queues[execution.capability.id] = queue
# Remove execution record from pool
self.execution_pool.remove(execution_record)
def get_cap_from_execution(self, execution_id: int) -> int:
execution = self.capability_info.lookup_entity(
execution_id, CapabilityExecution
)
request = self.capability_info.lookup_entity(
execution.capability_request, CapabilityRequest
)
return request.capability.id
self.execution_pool.remove(execution)
class CapabilityEngine(CapabilityEngineIF):
......@@ -174,9 +158,6 @@ class CapabilityInfo(CapabilityInfoIF):
def __init__(self, session: Session):
self.session = session
def lookup_capability(self, capability_name: str) -> CapabilityIF:
return self.session.query(Capability).filter_by(name=capability_name).first()
def create_capability(
self, name: CapabilityName, steps: CapabilitySequence, max_jobs: int
) -> Capability:
......@@ -188,7 +169,7 @@ class CapabilityInfo(CapabilityInfoIF):
:return: Created Capability
"""
capability = Capability(name=name, steps=str(steps), max_jobs=max_jobs)
capability.id = self.save_entity(capability)
self.save_entity(capability)
return capability
def create_capability_request(
......@@ -213,7 +194,7 @@ class CapabilityInfo(CapabilityInfoIF):
# a trick here is to ensure that we always have a first version, with the original parameters
versions=[CapabilityVersion(version_number=1, parameters=str(parameters))]
)
request.id = self.save_entity(request)
self.save_entity(request)
return request
def create_execution(self, request: CapabilityRequest) -> CapabilityExecution:
......@@ -227,47 +208,26 @@ class CapabilityInfo(CapabilityInfoIF):
execution = CapabilityExecution(
state=ExecutionState.Ready.name,
capability_version=most_recent_version,
version=most_recent_version,
current_step=0,
# ensure that we have a copy of the step sequence as it was when the execution started
steps=request.capability.steps
)
execution.id = self.save_entity(execution)
self.save_entity(execution)
return execution
def lookup_entity(
self,
entity_id: int,
entity_schema: Union[
Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution]
],
) -> Optional[Union[Capability, CapabilityRequest, CapabilityExecution]]:
"""
Look up entity in database and return object representation of it if found
:param entity_id: ID of entity to be searched for
:param entity_schema: Database schema of the entity
:return: Object representation of entity if found, else None
"""
return (
self.session.query(entity_schema.__class__)
.filter(entity_schema.id == entity_id)
.one()
)
def lookup_capability(self, capability_name: str) -> CapabilityIF:
return self.session.query(Capability).filter_by(name=capability_name).one()
def save_entity(
self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]
) -> int:
"""
Save a given entity and return an integer identifier for it
:param entity: the entity to save
:return: the entity's identifier
"""
def lookup_capability_request(self, request_id) -> "CapabilityRequestIF":
return self.session.query(CapabilityRequest).filter_by(id=request_id).one()
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
return self.session.query(CapabilityExecution).filter_by(id=execution_id).one()
def save_entity(self, entity: Base):
self.session.add(entity)
self.session.flush()
return self.lookup_entity(entity.id, type(entity)).id
def lookup_capability_request(self, request_id) -> "CapabilityRequestIF":
return self.lookup_entity(request_id, CapabilityRequest)
class CapabilityQueue(CapabilityQueueIF):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment