diff --git a/shared/workspaces/src/workspaces/capability_interfaces.py b/shared/workspaces/src/workspaces/capability_interfaces.py index b525dc790368486ac46073b319053f03c5c9f7d2..5f6d949f2b37978571b1cf985d6a229324742673 100644 --- a/shared/workspaces/src/workspaces/capability_interfaces.py +++ b/shared/workspaces/src/workspaces/capability_interfaces.py @@ -18,7 +18,9 @@ class CapabilityIF: steps: List["CapabilityStepIF"] max_jobs: int - def create_request(self, parameters: "ParameterIF", future_products: FutureProductIF) -> "CapabilityRequestIF": + def create_request( + self, parameters: "ParameterIF", future_products: FutureProductIF + ) -> "CapabilityRequestIF": raise NotImplementedError @@ -48,6 +50,7 @@ class CapabilityExecutionIF: def __json__(self, request: Any) -> dict: raise NotImplementedError + """ Capability System / Services @@ -60,13 +63,11 @@ class CapabilityInfoIF(ABC): Data access object that can look up and record information about capabilities and capability requests. """ + @abstractmethod def create_capability( - self, - name: CapabilityName, - steps: List["CapabilityStepType"], - max_jobs: int - ) -> int: + self, name: CapabilityName, steps: List["CapabilityStepType"], max_jobs: int + ) -> CapabilityIF: """ Create new capability and save it in the database :param name: Name of new capability @@ -78,12 +79,12 @@ class CapabilityInfoIF(ABC): @abstractmethod def create_capability_request( - self, - capability_name: str, - parameters: List["ParameterIF"] = None, - future_products: List[FutureProductIF] = None, - versions: List[str] = None - ) -> int: + self, + capability_id: int, + parameters: List["ParameterIF"] = None, + future_products: List[FutureProductIF] = None, + versions: List[str] = None, + ) -> CapabilityRequestIF: """ Create new capability request and save it in the database :param capability_id: ID of the requested capability @@ -95,7 +96,7 @@ class CapabilityInfoIF(ABC): raise NotImplementedError @abstractmethod - def create_execution_record(self, request_id: int) -> int: + def create_execution_record(self, request_id: int) -> CapabilityExecutionIF: """ Create new execution record for a request and save it in the database :param request_id: ID of the capability request @@ -109,9 +110,9 @@ class CapabilityInfoIF(ABC): @abstractmethod def lookup_entity( - self, - entity_id: int, - entity_schema: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF] + self, + entity_id: int, + entity_schema: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF], ) -> Optional[Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF]]: """ Look up entity in database and return object representation of it if found @@ -122,7 +123,9 @@ class CapabilityInfoIF(ABC): raise NotImplementedError @abstractmethod - def save_entity(self, entity: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF]) -> int: + def save_entity( + self, entity: Union[CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF] + ) -> int: """ Save a given entity and return an integer identifier for it :param entity: The entity to save @@ -136,6 +139,7 @@ class ProjectSettingsIF(ABC): Class that deals with projects overriding capability settings and making multiple instances of a capability with the same name """ + pass @@ -144,6 +148,7 @@ class CapabilityMatrixIF(ABC): Maps CASA versions to version-specific templates, allowing us to support a variety of CASA versions """ + pass @@ -152,23 +157,25 @@ class CapabilityQueueIF(ABC): Organizes capability executions in a priority order and makes it possible to control the number of concurrent executions or pause execution altogether """ + @abstractmethod def enqueue(self, request: CapabilityRequestIF, priority: int): - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") @abstractmethod def dequeue(self): - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") @abstractmethod def pause(self): - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") class CapabilityEngineIF(ABC): """ Executes a prepare and run workflow step of a capability """ + @abstractmethod def execute(self): pass @@ -178,6 +185,7 @@ class CapabilityServiceIF(ABC): """ The capability service: clients access this to request capability runs """ + @abstractmethod def run_capability(self, request: CapabilityRequestIF): """ @@ -191,6 +199,7 @@ class EstimationServiceIF(ABC): """ Service that estimates how long a capability will take to complete """ + pass @@ -198,6 +207,7 @@ class MetricsServiceIF(ABC): """ Service that records metrics """ + pass @@ -205,6 +215,7 @@ class ArchiveServiceIF(ABC): """ Abstracts services that are needed from the archive system. """ + @abstractmethod def lookup_product(self, locator: ProductLocator) -> "ProductIF": """ @@ -212,7 +223,7 @@ class ArchiveServiceIF(ABC): :param locator: science product locator for this product :return: science product """ - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") FieldName = str @@ -234,6 +245,7 @@ class EventTranslatorIF(ABC): """ Translates a Workflow event into a Capability event """ + pass @@ -241,6 +253,7 @@ class CapabilityStepIF(ABC): """ A step in a capability sequence """ + pass @@ -249,6 +262,7 @@ class FutureCapabilityResultIF(ProductIF): Class that represents a product that will in the future result from a completed capability execution """ + pass @@ -257,6 +271,7 @@ class ParameterIF(ABC): Class that represents a set of capability parameters, a required input for the correct and complete execution of a capability """ + def json(self) -> str: """ Convert to json-formatted string diff --git a/shared/workspaces/src/workspaces/helpers.py b/shared/workspaces/src/workspaces/helpers.py index 1d1236e948426437f765e41a183d814200c9c390..3dab3efacdf0ef9b9df35a5a44cd629db5101f41 100644 --- a/shared/workspaces/src/workspaces/helpers.py +++ b/shared/workspaces/src/workspaces/helpers.py @@ -17,6 +17,21 @@ class CapabilityStep(CapabilityStepIF): self.step_type = step_type self.step_value = step_value + @classmethod + def from_str(cls, step_string: str): + """ + Create CapabilityStep from string containing space-separated step type and step value + :param step_string: String of capability step, e.g. "PrepareAndRunWorkflow null" + :return: CapabilityStep of given string + """ + step_list = step_string.split(' ') + step_type = CapabilityStepType[step_list[0]] + if step_list[1]: + step_value = step_list[1] + else: + step_value = None + return cls(step_type, step_value) + def __str__(self): if self.step_value: return f'{self.step_type.name} {self.step_value}' @@ -31,6 +46,19 @@ class CapabilitySequence(CapabilitySequenceIF): def __init__(self, steps: List[CapabilityStep]): self.steps = steps + @classmethod + def from_str(cls, sequence_str: str): + """ + Create CapabilitySequence from comma-separated str of capability steps + :param sequence_str: String sequence of steps + :return: CapabilitySequence of given steps + """ + steps = [] + for step in sequence_str.split(','): + steps.append(CapabilityStep.from_str(step)) + + return cls(steps) + def __iter__(self) -> Iterator: """ Allows CapabilitySequence objects to be iterated over diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index f7037b2ef01bb65c97a754433e015d601144a794..8d657e519de6aae7a69bf67138df953d7d49d8b1 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -129,7 +129,7 @@ class CapabilityEvent: """ # TODO: determine shape of Capability Event information - event_type = sa.Column('event_type', sa.String) + event_type = sa.Column("event_type", sa.String) @property def request_id(self): @@ -164,13 +164,14 @@ class Capability(Base, CapabilityIF): A capability, which is a particular workflow setup, intended to accept a certain kind of product and some parameters and produce another product """ - __tablename__ = 'capabilities' - id = sa.Column('capability_id', sa.Integer, primary_key=True) - name = sa.Column('capability_name', sa.String) - steps = sa.Column('capability_steps', sa.String) - max_jobs = sa.Column('max_jobs', sa.Integer) - requests = relationship("CapabilityRequest", back_populates='capability') - executions = relationship("CapabilityExecution", back_populates='steps') + + __tablename__ = "capabilities" + id = sa.Column("capability_id", sa.Integer, primary_key=True) + name = sa.Column("capability_name", sa.String) + steps = sa.Column("capability_steps", sa.String) + max_jobs = sa.Column("max_jobs", sa.Integer) + requests = relationship("CapabilityRequest", back_populates="capability") + executions = relationship("CapabilityExecution", back_populates="steps") @classmethod def from_file(cls, filename: str): @@ -245,19 +246,18 @@ class CapabilityRequest(Base, CapabilityRequestIF): A capability request, which couples a capability to a product, representing the expectation of a new product given a set of parameters """ - __tablename__ = 'capability_requests' - id = sa.Column('capability_request_id', sa.Integer, primary_key=True) - state = sa.Column('state', sa.String) + + __tablename__ = "capability_requests" + id = sa.Column("capability_request_id", sa.Integer, primary_key=True) + state = sa.Column("state", sa.String) capability_id = sa.Column( - 'capability_id', - sa.Integer, - sa.ForeignKey('capabilities.capability_id') + "capability_id", sa.Integer, sa.ForeignKey("capabilities.capability_id") ) parameters = sa.Column("parameters", sa.String) # FIXME: This needs to be changed to properly keep track of product locators. # future_products = sa.Column('future_products', sa.String) versions = relationship("CapabilityVersion", back_populates="capability_request") - capability = relationship(Capability, back_populates='requests') + capability = relationship(Capability, back_populates="requests") def update_status(self, status: str): # TODO: create field in table @@ -267,8 +267,12 @@ class CapabilityRequest(Base, CapabilityRequestIF): return f"CapabilityRequest object: {self.__dict__}" def __json__(self, request) -> dict: - return dict(id=self.id, capability_id=self.capability_id, - state=self.state, parameters=self.parameters) + return dict( + id=self.id, + capability_id=self.capability_id, + state=self.state, + parameters=self.parameters, + ) class CapabilityVersion(Base, CapabilityVersionIF): @@ -294,6 +298,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF): """ Schema representation of a capability request's execution record """ + __tablename__ = "capability_executions" __tablename__ = "capability_executions" @@ -318,8 +323,13 @@ class CapabilityExecution(Base, CapabilityExecutionIF): ) def __json__(self, request: Any) -> dict: - return dict(id=self.id, state=self.state, capability_request_id=self.capability_request_id, - version_number=self.version_number, current_step=self.current_step) + return dict( + id=self.id, + state=self.state, + capability_request_id=self.capability_request_id, + version_number=self.version_number, + current_step=self.current_step, + ) class Workflow(Base): diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index fa1e830f6b45e7825ab79532e1b9ee4c3969e72c..5a3262476636c84c60184aaeb1c4f07c80629fa4 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -21,7 +21,14 @@ from .capability_interfaces import ( CapabilityName, ParameterIF, ) -from .helpers import CapabilitySequence, ExecutionPriority, RequestState, ExecutionState +from .helpers import ( + CapabilitySequence, + ExecutionPriority, + RequestState, + ExecutionState, + CapabilityEventType, + CapabilityStepType, +) from .product_interfaces import FutureProductIF from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF from .schema import ( @@ -35,6 +42,7 @@ from .schema import ( get_session_factory, WorkflowRequest, AbstractFile, + CapabilityEvent, ) from channels.amqp_helpers import ( workflow_events, @@ -75,9 +83,41 @@ class CapabilityService(CapabilityServiceIF): """ # FIXME: need request versions in between requests and executions execution_record = self.capability_info.create_execution_record(request.id) - self.execution_pool.append(execution_record) + steps = CapabilitySequence.from_str(execution_record.steps) + + if steps[0].step_type == CapabilityStepType.PrepareAndRunWorkflow: + # First step is to run workflow, move to capability queue + # FIXME: Priority needs to be dynamic + self.enqueue_execution(execution_record, ExecutionPriority.Default.value) + else: + self.execution_pool.append(execution_record) return execution_record + def update_execution(self, execution_id: int, event: CapabilityEvent): + """ + Update capability execution given a received event + :param execution_id: ID of execution record + :param event: Incoming event + """ + execution = self.capability_info.lookup_entity( + execution_id, CapabilityExecution + ) + step_sequence = CapabilitySequence.from_str(execution.steps) + current_step = step_sequence[execution.current_step] + event_type = CapabilityEventType[event.event_type] + + # Check to make sure event type is correct + if current_step.step_type.value == event_type.value: + execution.current_step += 1 + if ( + step_sequence[execution.current_step] + == CapabilityStepType.PrepareAndRunWorkflow + ): + # FIXME: Priority needs to be dynamic + self.enqueue_execution(execution, ExecutionPriority.Default.value) + else: + print(f"Mismatched event type {event_type} for execution {execution_id}") + def enqueue_execution( self, execution_record: CapabilityExecution, @@ -100,6 +140,15 @@ class CapabilityService(CapabilityServiceIF): # Remove execution record from pool self.execution_pool.remove(execution_record) + def get_cap_from_execution(self, execution_id: int) -> int: + execution = self.capability_info.lookup_entity( + execution_id, CapabilityExecution + ) + request = self.capability_info.lookup_entity( + execution.capability_request, CapabilityRequest + ) + return request.capability.id + class CapabilityEngine(CapabilityEngineIF): """ @@ -131,53 +180,54 @@ class CapabilityInfo(CapabilityInfoIF): def create_capability( self, name: CapabilityName, steps: CapabilitySequence, max_jobs: int - ) -> int: + ) -> Capability: """ Create new capability and save it in the database :param name: Name of new capability :param steps: List of capability steps required to execute the capability :param max_jobs: Max allowed number of concurrent executions of this kind of capability - :return: Integer identifier for the capability + :return: Created Capability """ capability = Capability(name=name, steps=str(steps), max_jobs=max_jobs) - return self.save_entity(capability) + capability.id = self.save_entity(capability) + return capability def create_capability_request( self, - capability_name: str, + capability_id: int, parameters: List[ParameterIF] = None, future_products: List[FutureProductIF] = None, versions: List[str] = None, - ) -> int: + ) -> CapabilityRequest: """ Create new capability request and save it in the database :param capability_id: ID of the requested capability :param parameters: List :param future_products: :param versions: - :return: Integer identifier of the request + :return: Created CapabilityRequest """ - capability = self.lookup_capability(capability_name) + capability = self.lookup_entity(capability_id, Capability) request = CapabilityRequest( state=RequestState.Ready.name, capability=capability, - parameters=str(parameters) + parameters=str(parameters), ) - self.save_entity(request) + request.id = self.save_entity(request) return request - def create_execution_record(self, request_id: int) -> int: + def create_execution_record(self, request_id: int) -> CapabilityExecution: """ Create new execution record for a request and save it in the database :param request_id: ID of the capability request - :return: Integer identifier for the record + :return: Created CapabilityExecution """ record = CapabilityExecution( state=ExecutionState.Ready.name, capability_request_id=request_id, - current_step=0 + current_step=0, ) - self.save_entity(record) + record.id = self.save_entity(record) return record def lookup_entity( @@ -193,7 +243,11 @@ class CapabilityInfo(CapabilityInfoIF): :param entity_schema: Database schema of the entity :return: Object representation of entity if found, else None """ - return self.session.query(entity_schema).filter(entity_schema.id == entity_id).one() + return ( + self.session.query(entity_schema) + .filter(entity_schema.id == entity_id) + .one() + ) def save_entity( self, entity: Union[Capability, CapabilityRequest, CapabilityExecution] @@ -205,6 +259,7 @@ class CapabilityInfo(CapabilityInfoIF): """ self.session.add(entity) self.session.flush() + return self.lookup_entity(entity.id, type(entity)).id def lookup_capability_request(self, request_id) -> "CapabilityRequestIF": return self.lookup_entity(request_id, CapabilityRequest)