diff --git a/apps/cli/capabilities/null_capability.py b/apps/cli/capabilities/null_capability.py new file mode 100644 index 0000000000000000000000000000000000000000..323bd4d5c468a2072a367f887dfb86e4515a236e --- /dev/null +++ b/apps/cli/capabilities/null_capability.py @@ -0,0 +1,262 @@ +from queue import Queue +from threading import Semaphore + +from injector import ClassAssistedBuilder, inject + +from wksp.ifaces import * +import pathlib + + +class SearchParameters(Parameter): + search: str + + @staticmethod + def fields() -> Dict[FieldName, FieldLabel]: + return {'search': 'Search'} + + def json(self): + return {'search': self.search} + + def load(self, json): + self.search = json['search'] + + def __repr__(self): + return f"<SearchParameters search='{self.search}'>" + + +class QaStatus(Parameter): + status: bool + + @staticmethod + def fields() -> Dict[FieldName, FieldLabel]: + return {'qa-pass': 'Passes QA'} + + def json(self): + return {'qa-pass': self.status} + + def load(self, json): + self.status = json['qa-pass'].strip().lower() in ['yes', 'y', 'true'] + + def __repr__(self): + return f"<QaStatus {'pass' if self.status else 'fail'}>" + + +ParameterRegistry = {'search-parameters': SearchParameters, + 'qa-status': QaStatus} + + +class ConsoleParameterReader: + @staticmethod + def obtain_parameter(parameter_type: Type[Parameter]) -> Parameter: + json = {} + + for field, label in parameter_type.fields().items(): + json[field] = input(label + '?> ') + + result = parameter_type() + result.load(json) + return result + + +class DirectoryCapability(Capability): + """ + Implements a capability by reading files off the filesystem (rather than from a database or whatnot). + """ + max_jobs: int + sequence: List[CapabilityStep] + + def create_request(self, locators: List[ProductLocator]): + return CapabilityRequest(capability=self, locators=locators, files=[], id=None, parameters=[]) + + def __init__(self, path: pathlib.Path): + self.path = path + self.name = path.name + self.sequence = self.parse(self.path / 'sequence.txt') + self.max_jobs = 2 + + def __hash__(self): + return hash({'name': 'DirectoryCapability', 'path': self.path}) + + @staticmethod + def parse(path: pathlib.Path): + sequence = [] + with path.open('r') as f: + for line in f: + if line.startswith('AWAIT PRODUCTS'): + sequence.append(AwaitProduct()) + elif line.startswith('AWAIT PRODUCT '): + sequence.append(AwaitProduct(line.split('AWAIT PRODUCT ')[1].strip())) + elif line.startswith('AWAIT PARAMETER '): + sequence.append(AwaitParameter(ParameterRegistry[line.split('AWAIT PARAMETER ')[1].strip()])) + elif line.startswith('PREPARE AND RUN WORKFLOW '): + sequence.append(PrepareAndRunWorkflow(line.split('PREPARE AND RUN WORKFLOW ')[1].strip())) + + return sequence + + +class DirectoryCapabilityInfo(CapabilityInfo): + """ + Finds information about capabilities on the filesystem. Stores requests in memory (in a list). + """ + def __init__(self, path: pathlib.Path): + self.path = path + self.requests = [] + self.n_requests = 0 + + def lookup_capability(self, capability_name: str) -> Capability: + return DirectoryCapability(self.path / capability_name) + + def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest: + return self.requests[capability_request_id] + + def save_request(self, request: CapabilityRequest) -> int: + # 1. Record this request in our list of requests + self.requests.append(request) + self.n_requests += 1 + + # 2. Record the ID on the request itself + request.id = self.n_requests + + # return it + return request.id + + +class PrototypeCapabilityQueue(CapabilityQueue): + """ + Implements the CapabilityQueue API, backed by a simple thread-safe queue. + """ + items: Queue + + @inject + def __init__(self, capability: Capability, runner_builder: ClassAssistedBuilder["PrototypeQueueRunner"]): + self.items = Queue() + self.runner = runner_builder.build(queue=self.items, max_jobs=capability.max_jobs) + self.runner.start() + + def enqueue(self, request: CapabilityRequest): + # 1. place this request into some kind of queue + self.items.put(request) + + +class PrototypeCapabilityEngine(CapabilityEngine): + request: CapabilityRequest + responder: CapabilityEngineResponder + + @inject + def __init__(self, request: CapabilityRequest, responder: CapabilityEngineResponder): + self.request = request + self.responder = responder + + def execute(self, request): + for step in request.capability.sequence: + self._execute_step(step) + + def _execute_step(self, step: CapabilityStep): + step.execute_against(self.request, self.responder) + + +class PrototypeCapabilityEngineResponder(CapabilityEngineResponder): + workflow_service: WorkflowService + product_service: ProductService + + @inject + def __init__(self, workflow_service: WorkflowService, product_service: ProductService): + self.workflow_service = workflow_service + self.product_service = product_service + + self.console = ConsoleParameterReader() + + def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]): + # in here I need to find the WorkflowService + return self.workflow_service.execute(name, param.json(), files) + + def await_product(self, step: CapabilityStep, product_locator: ProductLocator): + return self.product_service.locate_product(product_locator) + + def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter: + return self.console.obtain_parameter(parameter_type) + + +class PrototypeQueueRunner(QueueRunner, Thread): + engines: Dict[CapabilityRequest, CapabilityEngine] + + @inject + def __init__(self, queue: Queue, max_jobs: int, engine_builder: ClassAssistedBuilder[PrototypeCapabilityEngine]): + super().__init__() + self.queue = queue + self.semaphore = Semaphore(max_jobs) + self.engines = {} + self.engine_builder = engine_builder + + def run(self) -> None: + while True: + # obtain the semaphore + self.semaphore.acquire() + + # now get a job from the queue + request = self.queue.get() + + # now build an engine and start executing that request + self.engines[request.id] = self.engine_builder.build(request=request) + + # execute the first step of this capability + self.engines[request.id].execute(request) + + # release the semaphore + self.semaphore.release() + + def complete(self, request): + """ + Sent by the engine when it is done executing a capability + :return: + """ + del self.engines[request.id] + self.semaphore.release() + + +class PrototypeCapabilityService(CapabilityService): + queues: Dict[CapabilityName, CapabilityQueue] + + @inject + def __init__(self, info: CapabilityInfo, queue_builder: ClassAssistedBuilder[PrototypeCapabilityQueue]): + self.queues = {} + self.info = info + self.queue_builder = queue_builder + + def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest: + # 1. Locate the capability + capability = self.info.lookup_capability(name) + + # 2. Create a request + request = capability.create_request(locators) + + # 3. Persist the request + self.info.save_request(request) + + # 4. Return it + return request + + def _locate_queue(self, request: CapabilityRequest) -> CapabilityQueue: + # 1. Create a queue for this capability, if we don't have one currently + if request.capability.name not in self.queues: + self.queues[request.capability.name] = self.queue_builder.build(capability=request.capability) + + # 2. Return the queue for this capability + return self.queues[request.capability.name] + + def execute(self, request: CapabilityRequest) -> None: + # 1. Locate the proper queue for this request + queue = self._locate_queue(request) + + # 2. Submit the request to that queue + queue.enqueue(request) + + +class HardcodedProductService(ProductService): + def __init__(self): + self.products = {'nmtest-capo': pathlib.Path('/home/casa/capo/nmtest.properties'), + 'readme': pathlib.Path('README.md')} + + def locate_product(self, product_locator: ProductLocator) -> Path: + return self.products[product_locator] + diff --git a/shared/workspaces/src/workspaces/interfaces.py b/shared/workspaces/src/workspaces/interfaces.py index 477468d2439d8002f8cf0fab45252c24ce447699..2d7dcc570ca2d14e03cb8f17bc948954b527da76 100644 --- a/shared/workspaces/src/workspaces/interfaces.py +++ b/shared/workspaces/src/workspaces/interfaces.py @@ -6,7 +6,8 @@ import inspect from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path -from typing import Dict, List +from threading import Thread +from typing import Dict, List, Optional, Type ProductLocator = str CapabilityName = str @@ -14,12 +15,12 @@ CapabilityName = str @dataclass class Capability(ABC): - name: CapabilityName - max_jobs: int - """ A capability """ + name: CapabilityName + max_jobs: int + def create_request(self, locators: List[ProductLocator]): """ Create a capability request for this capability @@ -49,21 +50,32 @@ class CapabilityRequest: """ A particular capability request """ + # From wksp0 capability: Capability locators: List[ProductLocator] id: Optional[int] parameters: List["Parameter"] files: List[Path] + # From class diagram + # FIXME: Will versions be str or a class of their own? + versions: List[str] + tickets: List[Ticket] + queue: "CapabilityQueue" @property def last_parameter(self) -> "Parameter": return self.parameters[-1] +@dataclass class CapabilityInfo(ABC): """ Interface to stored capability information. """ + # TODO: Implement these two types + matrix: CapabilityMatrix + proj_settings: ProjectSettings + @abstractmethod def lookup_capability(self, capability_name: str) -> Capability: """ @@ -107,10 +119,22 @@ class CapabilityQueue(ABC): raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') +@dataclass class CapabilityService(ABC): """ The capability service: clients access this to request capability runs """ + # TODO: Implement EventTranslator and CapabilityFactory + event_translator: EventTranslator + queue: CapabilityQueue + info: CapabilityInfo + factory: CapabilityFactory + + # TODO: Implement two services + workflow: "WorkflowServiceIF" + estimation_service: EstimationService + metrics_service: MetricsService + @abstractmethod def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest: """ @@ -156,20 +180,26 @@ class Parameter(ABC): Abstracts parameters needed for running capabilities. """ @staticmethod + @abstractmethod def fields() -> Dict[FieldName, FieldLabel]: raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}') + @abstractmethod def json(self) -> Dict[str, str]: raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + @abstractmethod def load(self, json: Dict[str, str]) -> None: raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') +@dataclass class CapabilityStep(ABC): """ A step in a capability sequence """ + next: CapabilityStep + @abstractmethod def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"): """ @@ -179,6 +209,15 @@ class CapabilityStep(ABC): raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') +@dataclass +class CapabilityExecution(ABC): + request: WorkflowRequest + # FIXME: Will this be a string? + version: str + current_step: CapabilityStep + sequence: CapabilitySequence + + class CapabilityEngineResponder(ABC): """ Abstracts the callbacks for a capability engine @@ -196,11 +235,12 @@ class CapabilityEngineResponder(ABC): raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') +@dataclass class CapabilitySequence(ABC): """ Represents the sequence of events in a capability. """ - pass + steps: List[CapabilityStep] class CapabilityEngine(ABC): @@ -216,10 +256,10 @@ class AwaitProduct(CapabilityStep, ABC): """ Wait for a product to become available. """ - + # FIXME: Does this need to be a class attribute? product: ProductLocator - def __init__(self, product: Optional[ProductLocator]=None): + def __init__(self, product: Optional[ProductLocator] = None): self.product = product def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder): @@ -237,7 +277,7 @@ class AwaitParameter(CapabilityStep, ABC): """ Wait for a certain parameter to arrive (probably from the UI). """ - + # FIXME: Does this need to be a class attribute? parameter_type: Type[Parameter] def __init__(self, parameter_type: Type[Parameter]): @@ -251,6 +291,7 @@ class PrepareAndRunWorkflow(CapabilityStep, ABC): """ Render templates and execute a workflow, awaiting its completion. """ + # FIXME: Does this need to be a class attribute? workflow_name: str def __init__(self, workflow_name: str): @@ -277,10 +318,15 @@ class WorkflowServiceIF(ABC): raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') +@dataclass class WorkflowInfoIF(ABC): """ Holds information about workflows. """ + # TODO: Implement WorkflowDefinition + definition: WorkflowDefinition + request: "WorkflowRequest" + @abstractmethod def lookup_workflow_definition(self, name: str) -> 'Workflow': """ @@ -290,3 +336,16 @@ class WorkflowInfoIF(ABC): :return: Workflow instance """ raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class WorkflowDefinition(ABC): + pass + + +@dataclass +class WorkflowRequest(ABC): + workflow_definition: WorkflowDefinition + name: str + argument: str # JSON-formatted + files: List[Path] + job_id: str \ No newline at end of file