From ad5340be90955d5208755a0fe3ea0c0747b7c44b Mon Sep 17 00:00:00 2001
From: nhertz <>
Date: Wed, 16 Sep 2020 12:48:28 -0600
Subject: [PATCH] Added capability interfaces from wksp0 and filled in
 attributes and methods based on the architecture diagrams; copied null
 capability implementation from wksp0

 apps/cli/capabilities/      | 262 ++++++++++++++++++
 .../workspaces/src/workspaces/   |  75 ++++-
 2 files changed, 329 insertions(+), 8 deletions(-)
 create mode 100644 apps/cli/capabilities/

diff --git a/apps/cli/capabilities/ b/apps/cli/capabilities/
new file mode 100644
index 000000000..323bd4d5c
--- /dev/null
+++ b/apps/cli/capabilities/
@@ -0,0 +1,262 @@
+from queue import Queue
+from threading import Semaphore
+from injector import ClassAssistedBuilder, inject
+from wksp.ifaces import *
+import pathlib
+class SearchParameters(Parameter):
+    search: str
+    @staticmethod
+    def fields() -> Dict[FieldName, FieldLabel]:
+        return {'search': 'Search'}
+    def json(self):
+        return {'search':}
+    def load(self, json):
+ = json['search']
+    def __repr__(self):
+        return f"<SearchParameters search='{}'>"
+class QaStatus(Parameter):
+    status: bool
+    @staticmethod
+    def fields() -> Dict[FieldName, FieldLabel]:
+        return {'qa-pass': 'Passes QA'}
+    def json(self):
+        return {'qa-pass': self.status}
+    def load(self, json):
+        self.status = json['qa-pass'].strip().lower() in ['yes', 'y', 'true']
+    def __repr__(self):
+        return f"<QaStatus {'pass' if self.status else 'fail'}>"
+ParameterRegistry = {'search-parameters': SearchParameters,
+                     'qa-status': QaStatus}
+class ConsoleParameterReader:
+    @staticmethod
+    def obtain_parameter(parameter_type: Type[Parameter]) -> Parameter:
+        json = {}
+        for field, label in parameter_type.fields().items():
+            json[field] = input(label + '?> ')
+        result = parameter_type()
+        result.load(json)
+        return result
+class DirectoryCapability(Capability):
+    """
+    Implements a capability by reading files off the filesystem (rather than from a database or whatnot).
+    """
+    max_jobs: int
+    sequence: List[CapabilityStep]
+    def create_request(self, locators: List[ProductLocator]):
+        return CapabilityRequest(capability=self, locators=locators, files=[], id=None, parameters=[])
+    def __init__(self, path: pathlib.Path):
+        self.path = path
+ =
+        self.sequence = self.parse(self.path / 'sequence.txt')
+        self.max_jobs = 2
+    def __hash__(self):
+        return hash({'name': 'DirectoryCapability', 'path': self.path})
+    @staticmethod
+    def parse(path: pathlib.Path):
+        sequence = []
+        with'r') as f:
+            for line in f:
+                if line.startswith('AWAIT PRODUCTS'):
+                    sequence.append(AwaitProduct())
+                elif line.startswith('AWAIT PRODUCT '):
+                    sequence.append(AwaitProduct(line.split('AWAIT PRODUCT ')[1].strip()))
+                elif line.startswith('AWAIT PARAMETER '):
+                    sequence.append(AwaitParameter(ParameterRegistry[line.split('AWAIT PARAMETER ')[1].strip()]))
+                elif line.startswith('PREPARE AND RUN WORKFLOW '):
+                    sequence.append(PrepareAndRunWorkflow(line.split('PREPARE AND RUN WORKFLOW ')[1].strip()))
+        return sequence
+class DirectoryCapabilityInfo(CapabilityInfo):
+    """
+    Finds information about capabilities on the filesystem. Stores requests in memory (in a list).
+    """
+    def __init__(self, path: pathlib.Path):
+        self.path = path
+        self.requests = []
+        self.n_requests = 0
+    def lookup_capability(self, capability_name: str) -> Capability:
+        return DirectoryCapability(self.path / capability_name)
+    def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest:
+        return self.requests[capability_request_id]
+    def save_request(self, request: CapabilityRequest) -> int:
+        # 1. Record this request in our list of requests
+        self.requests.append(request)
+        self.n_requests += 1
+        # 2. Record the ID on the request itself
+ = self.n_requests
+        # return it
+        return
+class PrototypeCapabilityQueue(CapabilityQueue):
+    """
+    Implements the CapabilityQueue API, backed by a simple thread-safe queue.
+    """
+    items: Queue
+    @inject
+    def __init__(self, capability: Capability, runner_builder: ClassAssistedBuilder["PrototypeQueueRunner"]):
+        self.items = Queue()
+        self.runner =, max_jobs=capability.max_jobs)
+        self.runner.start()
+    def enqueue(self, request: CapabilityRequest):
+        # 1. place this request into some kind of queue
+        self.items.put(request)
+class PrototypeCapabilityEngine(CapabilityEngine):
+    request: CapabilityRequest
+    responder: CapabilityEngineResponder
+    @inject
+    def __init__(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
+        self.request = request
+        self.responder = responder
+    def execute(self, request):
+        for step in request.capability.sequence:
+            self._execute_step(step)
+    def _execute_step(self, step: CapabilityStep):
+        step.execute_against(self.request, self.responder)
+class PrototypeCapabilityEngineResponder(CapabilityEngineResponder):
+    workflow_service: WorkflowService
+    product_service: ProductService
+    @inject
+    def __init__(self, workflow_service: WorkflowService, product_service: ProductService):
+        self.workflow_service = workflow_service
+        self.product_service = product_service
+        self.console = ConsoleParameterReader()
+    def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]):
+        # in here I need to find the WorkflowService
+        return self.workflow_service.execute(name, param.json(), files)
+    def await_product(self, step: CapabilityStep, product_locator: ProductLocator):
+        return self.product_service.locate_product(product_locator)
+    def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter:
+        return self.console.obtain_parameter(parameter_type)
+class PrototypeQueueRunner(QueueRunner, Thread):
+    engines: Dict[CapabilityRequest, CapabilityEngine]
+    @inject
+    def __init__(self, queue: Queue, max_jobs: int, engine_builder: ClassAssistedBuilder[PrototypeCapabilityEngine]):
+        super().__init__()
+        self.queue = queue
+        self.semaphore = Semaphore(max_jobs)
+        self.engines = {}
+        self.engine_builder = engine_builder
+    def run(self) -> None:
+        while True:
+            # obtain the semaphore
+            self.semaphore.acquire()
+            # now get a job from the queue
+            request = self.queue.get()
+            # now build an engine and start executing that request
+            self.engines[] =
+            # execute the first step of this capability
+            self.engines[].execute(request)
+            # release the semaphore
+            self.semaphore.release()
+    def complete(self, request):
+        """
+        Sent by the engine when it is done executing a capability
+        :return:
+        """
+        del self.engines[]
+        self.semaphore.release()
+class PrototypeCapabilityService(CapabilityService):
+    queues: Dict[CapabilityName, CapabilityQueue]
+    @inject
+    def __init__(self, info: CapabilityInfo, queue_builder: ClassAssistedBuilder[PrototypeCapabilityQueue]):
+        self.queues = {}
+ = info
+        self.queue_builder = queue_builder
+    def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
+        # 1. Locate the capability
+        capability =
+        # 2. Create a request
+        request = capability.create_request(locators)
+        # 3. Persist the request
+        # 4. Return it
+        return request
+    def _locate_queue(self, request: CapabilityRequest) -> CapabilityQueue:
+        # 1. Create a queue for this capability, if we don't have one currently
+        if not in self.queues:
+            self.queues[] =
+        # 2. Return the queue for this capability
+        return self.queues[]
+    def execute(self, request: CapabilityRequest) -> None:
+        # 1. Locate the proper queue for this request
+        queue = self._locate_queue(request)
+        # 2. Submit the request to that queue
+        queue.enqueue(request)
+class HardcodedProductService(ProductService):
+    def __init__(self):
+        self.products = {'nmtest-capo': pathlib.Path('/home/casa/capo/'),
+                         'readme': pathlib.Path('')}
+    def locate_product(self, product_locator: ProductLocator) -> Path:
+        return self.products[product_locator]
diff --git a/shared/workspaces/src/workspaces/ b/shared/workspaces/src/workspaces/
index 477468d24..2d7dcc570 100644
--- a/shared/workspaces/src/workspaces/
+++ b/shared/workspaces/src/workspaces/
@@ -6,7 +6,8 @@ import inspect
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
 from pathlib import Path
-from typing import Dict, List
+from threading import Thread
+from typing import Dict, List, Optional, Type
 ProductLocator = str
 CapabilityName = str
@@ -14,12 +15,12 @@ CapabilityName = str
 class Capability(ABC):
-    name: CapabilityName
-    max_jobs: int
     A capability
+    name: CapabilityName
+    max_jobs: int
     def create_request(self, locators: List[ProductLocator]):
         Create a capability request for this capability
@@ -49,21 +50,32 @@ class CapabilityRequest:
     A particular capability request
+    # From wksp0
     capability: Capability
     locators: List[ProductLocator]
     id: Optional[int]
     parameters: List["Parameter"]
     files: List[Path]
+    # From class diagram
+    # FIXME: Will versions be str or a class of their own?
+    versions: List[str]
+    tickets: List[Ticket]
+    queue: "CapabilityQueue"
     def last_parameter(self) -> "Parameter":
         return self.parameters[-1]
 class CapabilityInfo(ABC):
     Interface to stored capability information.
+    # TODO: Implement these two types
+    matrix: CapabilityMatrix
+    proj_settings: ProjectSettings
     def lookup_capability(self, capability_name: str) -> Capability:
@@ -107,10 +119,22 @@ class CapabilityQueue(ABC):
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
 class CapabilityService(ABC):
     The capability service: clients access this to request capability runs
+    # TODO: Implement EventTranslator and CapabilityFactory
+    event_translator: EventTranslator
+    queue: CapabilityQueue
+    info: CapabilityInfo
+    factory: CapabilityFactory
+    # TODO: Implement two services
+    workflow: "WorkflowServiceIF"
+    estimation_service: EstimationService
+    metrics_service: MetricsService
     def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
@@ -156,20 +180,26 @@ class Parameter(ABC):
     Abstracts parameters needed for running capabilities.
+    @abstractmethod
     def fields() -> Dict[FieldName, FieldLabel]:
         raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}')
+    @abstractmethod
     def json(self) -> Dict[str, str]:
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+    @abstractmethod
     def load(self, json: Dict[str, str]) -> None:
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
 class CapabilityStep(ABC):
     A step in a capability sequence
+    next: CapabilityStep
     def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"):
@@ -179,6 +209,15 @@ class CapabilityStep(ABC):
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+class CapabilityExecution(ABC):
+    request: WorkflowRequest
+    # FIXME: Will this be a string?
+    version: str
+    current_step: CapabilityStep
+    sequence: CapabilitySequence
 class CapabilityEngineResponder(ABC):
     Abstracts the callbacks for a capability engine
@@ -196,11 +235,12 @@ class CapabilityEngineResponder(ABC):
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
 class CapabilitySequence(ABC):
     Represents the sequence of events in a capability.
-    pass
+    steps: List[CapabilityStep]
 class CapabilityEngine(ABC):
@@ -216,10 +256,10 @@ class AwaitProduct(CapabilityStep, ABC):
     Wait for a product to become available.
+    # FIXME: Does this need to be a class attribute?
     product: ProductLocator
-    def __init__(self, product: Optional[ProductLocator]=None):
+    def __init__(self, product: Optional[ProductLocator] = None):
         self.product = product
     def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
@@ -237,7 +277,7 @@ class AwaitParameter(CapabilityStep, ABC):
     Wait for a certain parameter to arrive (probably from the UI).
+    # FIXME: Does this need to be a class attribute?
     parameter_type: Type[Parameter]
     def __init__(self, parameter_type: Type[Parameter]):
@@ -251,6 +291,7 @@ class PrepareAndRunWorkflow(CapabilityStep, ABC):
     Render templates and execute a workflow, awaiting its completion.
+    # FIXME: Does this need to be a class attribute?
     workflow_name: str
     def __init__(self, workflow_name: str):
@@ -277,10 +318,15 @@ class WorkflowServiceIF(ABC):
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
 class WorkflowInfoIF(ABC):
     Holds information about workflows.
+    # TODO: Implement WorkflowDefinition
+    definition: WorkflowDefinition
+    request: "WorkflowRequest"
     def lookup_workflow_definition(self, name: str) -> 'Workflow':
@@ -290,3 +336,16 @@ class WorkflowInfoIF(ABC):
         :return:      Workflow instance
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+class WorkflowDefinition(ABC):
+    pass
+class WorkflowRequest(ABC):
+    workflow_definition: WorkflowDefinition
+    name: str
+    argument: str  # JSON-formatted
+    files: List[Path]
+    job_id: str
\ No newline at end of file