From 3061b1975519e39c032eeca1079e1a3c6882f63c Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Fri, 9 Oct 2020 10:50:53 -0600 Subject: [PATCH] New errors! --- .../57c38b5f012e_capabilities_init.py | 4 ++- .../src/workspaces/capability_interfaces.py | 7 ++-- shared/workspaces/src/workspaces/schema.py | 8 ++--- shared/workspaces/src/workspaces/services.py | 36 +++++++++++-------- 4 files changed, 30 insertions(+), 25 deletions(-) diff --git a/schema/versions/57c38b5f012e_capabilities_init.py b/schema/versions/57c38b5f012e_capabilities_init.py index acc5231d4..f08292c38 100644 --- a/schema/versions/57c38b5f012e_capabilities_init.py +++ b/schema/versions/57c38b5f012e_capabilities_init.py @@ -37,7 +37,8 @@ def upgrade(): primary_key=True), sa.Column('version_number', sa.Integer, - primary_key=True)) + primary_key=True), + sa.Column('parameters', sa.String)) op.create_table('capability_executions', sa.Column('execution_id', sa.Integer, primary_key=True), @@ -45,6 +46,7 @@ def upgrade(): sa.Column('capability_request_id', sa.Integer), sa.Column('capability_version_number', sa.Integer), sa.Column('current_step', sa.Integer), + sa.Column('steps', sa.String), sa.ForeignKeyConstraint(['capability_request_id', 'capability_version_number'], ['capability_versions.capability_request_id', 'capability_versions.version_number'])) diff --git a/shared/workspaces/src/workspaces/capability_interfaces.py b/shared/workspaces/src/workspaces/capability_interfaces.py index 5f6d949f2..0ebabce65 100644 --- a/shared/workspaces/src/workspaces/capability_interfaces.py +++ b/shared/workspaces/src/workspaces/capability_interfaces.py @@ -80,10 +80,9 @@ class CapabilityInfoIF(ABC): @abstractmethod def create_capability_request( self, - capability_id: int, + capability_name: str, parameters: List["ParameterIF"] = None, future_products: List[FutureProductIF] = None, - versions: List[str] = None, ) -> CapabilityRequestIF: """ Create new capability request and save it in the database @@ -96,10 +95,10 @@ class CapabilityInfoIF(ABC): raise NotImplementedError @abstractmethod - def create_execution_record(self, request_id: int) -> CapabilityExecutionIF: + def create_execution(self, request: CapabilityRequestIF) -> CapabilityExecutionIF: """ Create new execution record for a request and save it in the database - :param request_id: ID of the capability request + :param request: ID of the capability request :return: Integer identifier for the record """ raise NotImplementedError diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index 8d657e519..0fc2f00a6 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -171,7 +171,6 @@ class Capability(Base, CapabilityIF): steps = sa.Column("capability_steps", sa.String) max_jobs = sa.Column("max_jobs", sa.Integer) requests = relationship("CapabilityRequest", back_populates="capability") - executions = relationship("CapabilityExecution", back_populates="steps") @classmethod def from_file(cls, filename: str): @@ -192,9 +191,7 @@ class Capability(Base, CapabilityIF): return self @staticmethod - def parse_capability_file( - filename: str, - ) -> Tuple[int, str, int, CapabilitySequenceIF]: + def parse_capability_file(filename: str) -> Tuple[int, str, int, CapabilitySequenceIF]: with open(filename, "r") as f: cap_id, name, max_jobs = f.readline().split(" ") steps = [] @@ -288,6 +285,7 @@ class CapabilityVersion(Base, CapabilityVersionIF): primary_key=True, ) version_number = sa.Column("version_number", sa.Integer, primary_key=True) + parameters = sa.Column('parameters', sa.String) capability_request = relationship(CapabilityRequest, back_populates="versions") executions = relationship( "CapabilityExecution", back_populates="capability_version" @@ -312,7 +310,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF): ) version_number = sa.Column("capability_version_number", sa.Integer) current_step = sa.Column("current_step", sa.Integer) - steps = relationship("Capability", back_populates="capability_steps") + steps = sa.Column('steps', sa.String) capability_version = relationship(CapabilityVersion, back_populates="executions") __table_args__ = ( diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 5a3262476..634dadda1 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -42,7 +42,7 @@ from .schema import ( get_session_factory, WorkflowRequest, AbstractFile, - CapabilityEvent, + CapabilityEvent, CapabilityVersion, ) from channels.amqp_helpers import ( workflow_events, @@ -82,16 +82,16 @@ class CapabilityService(CapabilityServiceIF): :param request: Capability request """ # FIXME: need request versions in between requests and executions - execution_record = self.capability_info.create_execution_record(request.id) - steps = CapabilitySequence.from_str(execution_record.steps) + execution = self.capability_info.create_execution(request) + steps = CapabilitySequence.from_str(execution.steps) if steps[0].step_type == CapabilityStepType.PrepareAndRunWorkflow: # First step is to run workflow, move to capability queue # FIXME: Priority needs to be dynamic - self.enqueue_execution(execution_record, ExecutionPriority.Default.value) + self.enqueue_execution(execution, ExecutionPriority.Default.value) else: - self.execution_pool.append(execution_record) - return execution_record + self.execution_pool.append(execution) + return execution def update_execution(self, execution_id: int, event: CapabilityEvent): """ @@ -194,10 +194,9 @@ class CapabilityInfo(CapabilityInfoIF): def create_capability_request( self, - capability_id: int, + capability_name: str, parameters: List[ParameterIF] = None, future_products: List[FutureProductIF] = None, - versions: List[str] = None, ) -> CapabilityRequest: """ Create new capability request and save it in the database @@ -207,28 +206,35 @@ class CapabilityInfo(CapabilityInfoIF): :param versions: :return: Created CapabilityRequest """ - capability = self.lookup_entity(capability_id, Capability) + capability = self.lookup_capability(capability_name) request = CapabilityRequest( state=RequestState.Ready.name, capability=capability, parameters=str(parameters), + # a trick here is to ensure that we always have a first version, with the original parameters + versions=[CapabilityVersion(version_number=1, parameters=str(parameters))] ) request.id = self.save_entity(request) return request - def create_execution_record(self, request_id: int) -> CapabilityExecution: + def create_execution(self, request: CapabilityRequest) -> CapabilityExecution: """ Create new execution record for a request and save it in the database - :param request_id: ID of the capability request + :param request: ID of the capability request :return: Created CapabilityExecution """ - record = CapabilityExecution( + # look up the most recent version + most_recent_version = request.versions[-1] + + execution = CapabilityExecution( state=ExecutionState.Ready.name, - capability_request_id=request_id, + capability_version=most_recent_version, current_step=0, + # ensure that we have a copy of the step sequence as it was when the execution started + steps=request.capability.steps ) - record.id = self.save_entity(record) - return record + execution.id = self.save_entity(execution) + return execution def lookup_entity( self, -- GitLab