diff --git a/shared/workspaces/src/workspaces/interfaces.py b/shared/workspaces/src/workspaces/interfaces.py index 4108d8717d6d9b96ae6c0a495ba513197c23da1e..477468d2439d8002f8cf0fab45252c24ce447699 100644 --- a/shared/workspaces/src/workspaces/interfaces.py +++ b/shared/workspaces/src/workspaces/interfaces.py @@ -4,9 +4,261 @@ Interfaces used by the Workspace system. import inspect from abc import ABC, abstractmethod +from dataclasses import dataclass from pathlib import Path from typing import Dict, List +ProductLocator = str +CapabilityName = str + + +@dataclass +class Capability(ABC): + name: CapabilityName + max_jobs: int + + """ + A capability + """ + def create_request(self, locators: List[ProductLocator]): + """ + Create a capability request for this capability + + :param locators: product locators for the new request + :return: a capability request + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class ProductService(ABC): + """ + Locate products and realize them on disk (haha). + """ + @abstractmethod + def locate_product(self, product_locator: ProductLocator) -> Path: + """ + Locates a given product and produces a file path to it. + :param product_locator: the locator to this product + :return: a path to this product + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +@dataclass +class CapabilityRequest: + """ + A particular capability request + """ + capability: Capability + locators: List[ProductLocator] + id: Optional[int] + parameters: List["Parameter"] + files: List[Path] + + @property + def last_parameter(self) -> "Parameter": + return self.parameters[-1] + + +class CapabilityInfo(ABC): + """ + Interface to stored capability information. + """ + @abstractmethod + def lookup_capability(self, capability_name: str) -> Capability: + """ + Look up the definition of a capability. + + :param capability_name: the name of the capability to find + :return: a capability + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + @abstractmethod + def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest: + """ + Look up a particular request + :param capability_request_id: the request identifier + :return: a capability request + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + @abstractmethod + def save_request(self, request: CapabilityRequest) -> int: + """ + Save a capability request and return an integer identifier for it. + + :param request: the request to save + :return: the request identifier + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class QueueRunner(Thread, ABC): + pass + + +class CapabilityQueue(ABC): + """ + Holds capability requests until they can be executed. + """ + @abstractmethod + def enqueue(self, request: CapabilityRequest): + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class CapabilityService(ABC): + """ + The capability service: clients access this to request capability runs + """ + @abstractmethod + def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest: + """ + Start a capability request with the given capability name and product locators. + + :param name: the capability name to look things up with + :param locators: the products to start the capability with + :return: a new capability request + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + @abstractmethod + def execute(self, request: CapabilityRequest) -> None: + """ + Begin executing a capability request + + :param request: the request to execute + :return: None + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class ArchiveService(ABC): + """ + Abstracts services that are needed from the archive system. + """ + @abstractmethod + def lookup_product(self, locator: ProductLocator) -> ScienceProduct: + """ + Look up a science product by its locator + :param locator: science product locator for this product + :return: science product + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +FieldName = str +FieldLabel = str + + +class Parameter(ABC): + """ + Abstracts parameters needed for running capabilities. + """ + @staticmethod + def fields() -> Dict[FieldName, FieldLabel]: + raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}') + + def json(self) -> Dict[str, str]: + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + def load(self, json: Dict[str, str]) -> None: + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class CapabilityStep(ABC): + """ + A step in a capability sequence + """ + @abstractmethod + def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"): + """ + Execute this capability step + :return: None + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class CapabilityEngineResponder(ABC): + """ + Abstracts the callbacks for a capability engine + """ + @abstractmethod + def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter: + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + @abstractmethod + def await_product(self, step: CapabilityStep, product_locator: ProductLocator): + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + @abstractmethod + def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]): + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class CapabilitySequence(ABC): + """ + Represents the sequence of events in a capability. + """ + pass + + +class CapabilityEngine(ABC): + """ + Executes a capability. + """ + @abstractmethod + def execute(self, request): + pass + + +class AwaitProduct(CapabilityStep, ABC): + """ + Wait for a product to become available. + """ + + product: ProductLocator + + def __init__(self, product: Optional[ProductLocator]=None): + self.product = product + + def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder): + # if we have a product, await it + if self.product: + request.files.append(responder.await_product(self, self.product)) + + # if we do not, await the locators on the request itself + else: + for locator in request.locators: + request.files.append(responder.await_product(self, locator)) + + +class AwaitParameter(CapabilityStep, ABC): + """ + Wait for a certain parameter to arrive (probably from the UI). + """ + + parameter_type: Type[Parameter] + + def __init__(self, parameter_type: Type[Parameter]): + self.parameter_type = parameter_type + + def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder): + request.parameters.append(responder.await_parameter(self, self.parameter_type)) + + +class PrepareAndRunWorkflow(CapabilityStep, ABC): + """ + Render templates and execute a workflow, awaiting its completion. + """ + workflow_name: str + + def __init__(self, workflow_name: str): + self.workflow_name = workflow_name + + def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder): + responder.prepare_and_run_workflow(self, self.workflow_name, request.last_parameter, request.files) + class WorkflowServiceIF(ABC): """