Skip to content
Snippets Groups Projects
Commit 3061b197 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

New errors!

parent 7779b2e0
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,8 @@ def upgrade():
primary_key=True),
sa.Column('version_number',
sa.Integer,
primary_key=True))
primary_key=True),
sa.Column('parameters', sa.String))
op.create_table('capability_executions',
sa.Column('execution_id', sa.Integer, primary_key=True),
......@@ -45,6 +46,7 @@ def upgrade():
sa.Column('capability_request_id', sa.Integer),
sa.Column('capability_version_number', sa.Integer),
sa.Column('current_step', sa.Integer),
sa.Column('steps', sa.String),
sa.ForeignKeyConstraint(['capability_request_id', 'capability_version_number'],
['capability_versions.capability_request_id', 'capability_versions.version_number']))
......
......@@ -80,10 +80,9 @@ class CapabilityInfoIF(ABC):
@abstractmethod
def create_capability_request(
self,
capability_id: int,
capability_name: str,
parameters: List["ParameterIF"] = None,
future_products: List[FutureProductIF] = None,
versions: List[str] = None,
) -> CapabilityRequestIF:
"""
Create new capability request and save it in the database
......@@ -96,10 +95,10 @@ class CapabilityInfoIF(ABC):
raise NotImplementedError
@abstractmethod
def create_execution_record(self, request_id: int) -> CapabilityExecutionIF:
def create_execution(self, request: CapabilityRequestIF) -> CapabilityExecutionIF:
"""
Create new execution record for a request and save it in the database
:param request_id: ID of the capability request
:param request: ID of the capability request
:return: Integer identifier for the record
"""
raise NotImplementedError
......
......@@ -171,7 +171,6 @@ class Capability(Base, CapabilityIF):
steps = sa.Column("capability_steps", sa.String)
max_jobs = sa.Column("max_jobs", sa.Integer)
requests = relationship("CapabilityRequest", back_populates="capability")
executions = relationship("CapabilityExecution", back_populates="steps")
@classmethod
def from_file(cls, filename: str):
......@@ -192,9 +191,7 @@ class Capability(Base, CapabilityIF):
return self
@staticmethod
def parse_capability_file(
filename: str,
) -> Tuple[int, str, int, CapabilitySequenceIF]:
def parse_capability_file(filename: str) -> Tuple[int, str, int, CapabilitySequenceIF]:
with open(filename, "r") as f:
cap_id, name, max_jobs = f.readline().split(" ")
steps = []
......@@ -288,6 +285,7 @@ class CapabilityVersion(Base, CapabilityVersionIF):
primary_key=True,
)
version_number = sa.Column("version_number", sa.Integer, primary_key=True)
parameters = sa.Column('parameters', sa.String)
capability_request = relationship(CapabilityRequest, back_populates="versions")
executions = relationship(
"CapabilityExecution", back_populates="capability_version"
......@@ -312,7 +310,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
)
version_number = sa.Column("capability_version_number", sa.Integer)
current_step = sa.Column("current_step", sa.Integer)
steps = relationship("Capability", back_populates="capability_steps")
steps = sa.Column('steps', sa.String)
capability_version = relationship(CapabilityVersion, back_populates="executions")
__table_args__ = (
......
......@@ -42,7 +42,7 @@ from .schema import (
get_session_factory,
WorkflowRequest,
AbstractFile,
CapabilityEvent,
CapabilityEvent, CapabilityVersion,
)
from channels.amqp_helpers import (
workflow_events,
......@@ -82,16 +82,16 @@ class CapabilityService(CapabilityServiceIF):
:param request: Capability request
"""
# FIXME: need request versions in between requests and executions
execution_record = self.capability_info.create_execution_record(request.id)
steps = CapabilitySequence.from_str(execution_record.steps)
execution = self.capability_info.create_execution(request)
steps = CapabilitySequence.from_str(execution.steps)
if steps[0].step_type == CapabilityStepType.PrepareAndRunWorkflow:
# First step is to run workflow, move to capability queue
# FIXME: Priority needs to be dynamic
self.enqueue_execution(execution_record, ExecutionPriority.Default.value)
self.enqueue_execution(execution, ExecutionPriority.Default.value)
else:
self.execution_pool.append(execution_record)
return execution_record
self.execution_pool.append(execution)
return execution
def update_execution(self, execution_id: int, event: CapabilityEvent):
"""
......@@ -194,10 +194,9 @@ class CapabilityInfo(CapabilityInfoIF):
def create_capability_request(
self,
capability_id: int,
capability_name: str,
parameters: List[ParameterIF] = None,
future_products: List[FutureProductIF] = None,
versions: List[str] = None,
) -> CapabilityRequest:
"""
Create new capability request and save it in the database
......@@ -207,28 +206,35 @@ class CapabilityInfo(CapabilityInfoIF):
:param versions:
:return: Created CapabilityRequest
"""
capability = self.lookup_entity(capability_id, Capability)
capability = self.lookup_capability(capability_name)
request = CapabilityRequest(
state=RequestState.Ready.name,
capability=capability,
parameters=str(parameters),
# a trick here is to ensure that we always have a first version, with the original parameters
versions=[CapabilityVersion(version_number=1, parameters=str(parameters))]
)
request.id = self.save_entity(request)
return request
def create_execution_record(self, request_id: int) -> CapabilityExecution:
def create_execution(self, request: CapabilityRequest) -> CapabilityExecution:
"""
Create new execution record for a request and save it in the database
:param request_id: ID of the capability request
:param request: ID of the capability request
:return: Created CapabilityExecution
"""
record = CapabilityExecution(
# look up the most recent version
most_recent_version = request.versions[-1]
execution = CapabilityExecution(
state=ExecutionState.Ready.name,
capability_request_id=request_id,
capability_version=most_recent_version,
current_step=0,
# ensure that we have a copy of the step sequence as it was when the execution started
steps=request.capability.steps
)
record.id = self.save_entity(record)
return record
execution.id = self.save_entity(execution)
return execution
def lookup_entity(
self,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment