diff --git a/schema/versions/57c38b5f012e_capabilities_init.py b/schema/versions/57c38b5f012e_capabilities_init.py index 009e3014745be448a6308525ee5b54c5886e9d19..e550d9dbe8e4d0f745c9dde919dcecb3a40d1636 100644 --- a/schema/versions/57c38b5f012e_capabilities_init.py +++ b/schema/versions/57c38b5f012e_capabilities_init.py @@ -5,9 +5,8 @@ Revises: 44d5bbbf2615 Create Date: 2020-10-07 23:01:25.765027 """ -from alembic import op import sqlalchemy as sa - +from alembic import op # revision identifiers, used by Alembic. revision = "57c38b5f012e" @@ -20,8 +19,7 @@ def upgrade(): print("creating capabilities") op.create_table( "capabilities", - sa.Column("capability_id", sa.Integer, primary_key=True), - sa.Column("capability_name", sa.String), + sa.Column("capability_name", sa.String, primary_key=True), sa.Column("capability_steps", sa.String), sa.Column("max_jobs", sa.Integer), ) @@ -30,9 +28,7 @@ def upgrade(): "capability_requests", sa.Column("capability_request_id", sa.Integer, primary_key=True), sa.Column("state", sa.String), - sa.Column( - "capability_id", sa.Integer, sa.ForeignKey("capabilities.capability_id") - ), + sa.Column("capability_name", sa.String, sa.ForeignKey("capabilities.capability_name")), sa.Column("parameters", sa.String), ) @@ -62,7 +58,7 @@ def upgrade(): sa.ForeignKey("workflow_requests.workflow_request_id"), ), sa.ForeignKeyConstraint( - ["capability_request_id", "capability_version_number"], + ("capability_request_id", "capability_version_number"), [ "capability_versions.capability_request_id", "capability_versions.version_number", diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index d6ca7d9c1f594bceefc0604b8bee0643501c7011..33a91f67bb644d730256e5bbc093cf3336b1d37f 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json from typing import Tuple @@ -5,17 +7,17 @@ import sqlalchemy as sa from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import relationship -from workspaces.workflow.schema import WorkflowRequest from workspaces.capability.enums import CapabilityEventType, CapabilityStepType -from workspaces.capability.helpers import CapabilityStep, CapabilitySequence +from workspaces.capability.helpers import CapabilitySequence, CapabilityStep from workspaces.capability.helpers_interfaces import CapabilitySequenceIF, ParameterIF from workspaces.capability.schema_interfaces import ( + CapabilityExecutionIF, CapabilityIF, CapabilityRequestIF, CapabilityVersionIF, - CapabilityExecutionIF, ) from workspaces.products.schema_interfaces import FutureProductIF +from workspaces.workflow.schema import WorkflowRequest class CapabilityEvent: @@ -23,9 +25,7 @@ class CapabilityEvent: An event sent from the underlying workflow execution system. """ - def __init__( - self, event_type: CapabilityEventType, request_id: int, execution_id: int - ): + def __init__(self, event_type: CapabilityEventType, request_id: int, execution_id: int): self.event_type = event_type self.execution_id = execution_id @@ -52,14 +52,13 @@ class Capability(Base, CapabilityIF): """ __tablename__ = "capabilities" - id = sa.Column("capability_id", sa.Integer, primary_key=True) - name = sa.Column("capability_name", sa.String) + name = sa.Column("capability_name", sa.String, primary_key=True) steps = sa.Column("capability_steps", sa.String) max_jobs = sa.Column("max_jobs", sa.Integer) requests = relationship("CapabilityRequest", back_populates="capability") @classmethod - def from_file(cls, filename: str): + def from_file(cls, filename: str) -> Capability: """ Method that creates an instance of a Capability from a given file @@ -67,13 +66,11 @@ class Capability(Base, CapabilityIF): :return: Capability instance """ self = cls() - self.id, self.name, self.max_jobs, self.steps = cls.parse_capability_file( - filename - ) + self.name, self.max_jobs, self.steps = cls.parse_capability_file(filename) return self @classmethod - def from_json(cls, json_data: str): + def from_json(cls, json_data: str) -> Capability: """ Method that creates an instance of a Capability from given JSON @@ -82,7 +79,6 @@ class Capability(Base, CapabilityIF): """ self = cls() json_dict = json.loads(json_data) - self.id = json_dict.get("id", None) self.name = json_dict["name"] self.max_jobs = json_dict["max_jobs"] self.steps = json_dict["steps"] @@ -91,7 +87,7 @@ class Capability(Base, CapabilityIF): @staticmethod def parse_capability_file( filename: str, - ) -> Tuple[int, str, int, CapabilitySequenceIF]: + ) -> Tuple[str, int, CapabilitySequenceIF]: """ Parse a file containing metadata for a capability @@ -99,18 +95,14 @@ class Capability(Base, CapabilityIF): :return: Tuple containing capability's ID, name, max concurrent jobs, and sequence of steps """ with open(filename, "r") as f: - cap_id, name, max_jobs = f.readline().split(" ") + name, max_jobs = f.readline().split(" ") steps = [] for line in f.readlines(): step_type, step_value = line.strip().split(" ") - steps.append( - CapabilityStep( - CapabilityStepType.from_string(step_type), step_value - ) - ) + steps.append(CapabilityStep(CapabilityStepType.from_string(step_type), step_value)) - return int(cap_id), name, int(max_jobs), CapabilitySequence(steps) + return name, int(max_jobs), CapabilitySequence(steps) def create_request(self, parameters: ParameterIF, future_products: FutureProductIF): """ @@ -124,7 +116,6 @@ class Capability(Base, CapabilityIF): def to_dict(self): return { - "id": self.id, "name": self.name, "max_jobs": self.max_jobs, "steps": str(self.steps), @@ -132,7 +123,7 @@ class Capability(Base, CapabilityIF): def __str__(self): return ( - f"Capability object with ID {self.id}, name {self.name} and max simultaneous jobs of {self.max_jobs}" + f"Capability object with name {self.name} and max simultaneous jobs of {self.max_jobs}" f"\nSequence: {self.steps}" ) @@ -146,8 +137,8 @@ class CapabilityRequest(Base, CapabilityRequestIF): __tablename__ = "capability_requests" id = sa.Column("capability_request_id", sa.Integer, primary_key=True) state = sa.Column("state", sa.String) - capability_id = sa.Column( - "capability_id", sa.Integer, sa.ForeignKey("capabilities.capability_id") + capability_name = sa.Column( + "capability_name", sa.String, sa.ForeignKey("capabilities.capability_name") ) parameters = sa.Column("parameters", sa.String) # FIXME: This needs to be changed to properly keep track of product locators. @@ -165,7 +156,7 @@ class CapabilityRequest(Base, CapabilityRequestIF): def __json__(self, request) -> dict: return dict( id=self.id, - capability_id=self.capability_id, + capability_name=self.capability_name, state=self.state, parameters=self.parameters, ) @@ -211,15 +202,11 @@ class CapabilityExecution(Base, CapabilityExecutionIF): current_step = sa.Column("current_step", sa.Integer) steps = sa.Column("steps", sa.String) version = relationship(CapabilityVersion, back_populates="executions") - current_workflow_request_id = sa.Column( - "current_workflow_request_id", - sa.Integer - ) - # current_workflow_request = relationship(WorkflowRequest) + current_workflow_request_id = sa.Column("current_workflow_request_id", sa.Integer) __table_args__ = ( sa.ForeignKeyConstraint( - [capability_request_id, version_number], + (capability_request_id, version_number), [CapabilityVersion.capability_request_id, CapabilityVersion.version_number], ), ) diff --git a/shared/workspaces/workspaces/capability/services/capability_info.py b/shared/workspaces/workspaces/capability/services/capability_info.py index 09d95b527c160119fcf0e1c804e2a230763fcbbe..16f90306cd58e65dcc70c21fe787cd46ab21605b 100644 --- a/shared/workspaces/workspaces/capability/services/capability_info.py +++ b/shared/workspaces/workspaces/capability/services/capability_info.py @@ -100,9 +100,7 @@ class CapabilityInfo(CapabilityInfoIF): return self.session.query(CapabilityRequest).filter_by(id=request_id).first() def lookup_execution(self, execution_id: int) -> CapabilityExecution: - return ( - self.session.query(CapabilityExecution).filter_by(id=execution_id).first() - ) + return self.session.query(CapabilityExecution).filter_by(id=execution_id).first() def save_entity(self, entity: Base): """ @@ -137,9 +135,7 @@ class CapabilityInfo(CapabilityInfoIF): """ capability = self.lookup_capability(capability_name) return ( - self.session.query(CapabilityRequest) - .filter_by(capability_id=capability.id) - .all() + self.session.query(CapabilityRequest).filter_by(capability_name=capability.name).all() ) def save_execution(self, execution: CapabilityExecutionIF): diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 322b97ce3bd1ca845f872f46e2a69074fafd784d..0ba617286bf1662b8aa921f1f827329abe794b38 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -31,9 +31,7 @@ class CapabilityService(CapabilityServiceIF): The capability service: clients access this to request capability runs """ - def __init__( - self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF - ): + def __init__(self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF): self.execution_pool = [] self.queues = {} self.capability_info = capability_info @@ -55,9 +53,7 @@ class CapabilityService(CapabilityServiceIF): :param products: Products the request will create upon success :return: Capability request """ - return self.capability_info.create_capability_request( - capability_name, parameters, products - ) + return self.capability_info.create_capability_request(capability_name, parameters, products) def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF: """ @@ -103,7 +99,7 @@ class CapabilityService(CapabilityServiceIF): == CapabilityStepType.PrepareAndRunWorkflow.name ): # Return capability engine to available state - self.queues[execution.capability.id].complete_execution( + self.queues[execution.capability.name].complete_execution( capability_event.execution_id ) @@ -150,7 +146,7 @@ class CapabilityService(CapabilityServiceIF): """ # Get correct queue or initialize one queue = self.queues.get( - execution.capability.id, + execution.capability.name, CapabilityQueue( self.capability_info, self.workflow_service, @@ -158,7 +154,7 @@ class CapabilityService(CapabilityServiceIF): ), ) queue.enqueue(execution, priority) - self.queues[execution.capability.id] = queue + self.queues[execution.capability.name] = queue # Remove execution record from pool if execution in self.execution_pool: @@ -185,9 +181,7 @@ class CapabilityService(CapabilityServiceIF): event.workflow_request_id ) - return CapabilityEvent( - event_type, execution.capability_request.id, execution.id - ) + return CapabilityEvent(event_type, execution.capability_request.id, execution.id) def listen_for_events(self): """