Skip to content
Snippets Groups Projects
Commit ad5340be authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added capability interfaces from wksp0 and filled in attributes and

methods based on the architecture diagrams; copied null capability
implementation from wksp0
parent 77322a17
No related branches found
No related tags found
No related merge requests found
from queue import Queue
from threading import Semaphore
from injector import ClassAssistedBuilder, inject
from wksp.ifaces import *
import pathlib
class SearchParameters(Parameter):
search: str
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
return {'search': 'Search'}
def json(self):
return {'search': self.search}
def load(self, json):
self.search = json['search']
def __repr__(self):
return f"<SearchParameters search='{self.search}'>"
class QaStatus(Parameter):
status: bool
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
return {'qa-pass': 'Passes QA'}
def json(self):
return {'qa-pass': self.status}
def load(self, json):
self.status = json['qa-pass'].strip().lower() in ['yes', 'y', 'true']
def __repr__(self):
return f"<QaStatus {'pass' if self.status else 'fail'}>"
ParameterRegistry = {'search-parameters': SearchParameters,
'qa-status': QaStatus}
class ConsoleParameterReader:
@staticmethod
def obtain_parameter(parameter_type: Type[Parameter]) -> Parameter:
json = {}
for field, label in parameter_type.fields().items():
json[field] = input(label + '?> ')
result = parameter_type()
result.load(json)
return result
class DirectoryCapability(Capability):
"""
Implements a capability by reading files off the filesystem (rather than from a database or whatnot).
"""
max_jobs: int
sequence: List[CapabilityStep]
def create_request(self, locators: List[ProductLocator]):
return CapabilityRequest(capability=self, locators=locators, files=[], id=None, parameters=[])
def __init__(self, path: pathlib.Path):
self.path = path
self.name = path.name
self.sequence = self.parse(self.path / 'sequence.txt')
self.max_jobs = 2
def __hash__(self):
return hash({'name': 'DirectoryCapability', 'path': self.path})
@staticmethod
def parse(path: pathlib.Path):
sequence = []
with path.open('r') as f:
for line in f:
if line.startswith('AWAIT PRODUCTS'):
sequence.append(AwaitProduct())
elif line.startswith('AWAIT PRODUCT '):
sequence.append(AwaitProduct(line.split('AWAIT PRODUCT ')[1].strip()))
elif line.startswith('AWAIT PARAMETER '):
sequence.append(AwaitParameter(ParameterRegistry[line.split('AWAIT PARAMETER ')[1].strip()]))
elif line.startswith('PREPARE AND RUN WORKFLOW '):
sequence.append(PrepareAndRunWorkflow(line.split('PREPARE AND RUN WORKFLOW ')[1].strip()))
return sequence
class DirectoryCapabilityInfo(CapabilityInfo):
"""
Finds information about capabilities on the filesystem. Stores requests in memory (in a list).
"""
def __init__(self, path: pathlib.Path):
self.path = path
self.requests = []
self.n_requests = 0
def lookup_capability(self, capability_name: str) -> Capability:
return DirectoryCapability(self.path / capability_name)
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest:
return self.requests[capability_request_id]
def save_request(self, request: CapabilityRequest) -> int:
# 1. Record this request in our list of requests
self.requests.append(request)
self.n_requests += 1
# 2. Record the ID on the request itself
request.id = self.n_requests
# return it
return request.id
class PrototypeCapabilityQueue(CapabilityQueue):
"""
Implements the CapabilityQueue API, backed by a simple thread-safe queue.
"""
items: Queue
@inject
def __init__(self, capability: Capability, runner_builder: ClassAssistedBuilder["PrototypeQueueRunner"]):
self.items = Queue()
self.runner = runner_builder.build(queue=self.items, max_jobs=capability.max_jobs)
self.runner.start()
def enqueue(self, request: CapabilityRequest):
# 1. place this request into some kind of queue
self.items.put(request)
class PrototypeCapabilityEngine(CapabilityEngine):
request: CapabilityRequest
responder: CapabilityEngineResponder
@inject
def __init__(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
self.request = request
self.responder = responder
def execute(self, request):
for step in request.capability.sequence:
self._execute_step(step)
def _execute_step(self, step: CapabilityStep):
step.execute_against(self.request, self.responder)
class PrototypeCapabilityEngineResponder(CapabilityEngineResponder):
workflow_service: WorkflowService
product_service: ProductService
@inject
def __init__(self, workflow_service: WorkflowService, product_service: ProductService):
self.workflow_service = workflow_service
self.product_service = product_service
self.console = ConsoleParameterReader()
def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]):
# in here I need to find the WorkflowService
return self.workflow_service.execute(name, param.json(), files)
def await_product(self, step: CapabilityStep, product_locator: ProductLocator):
return self.product_service.locate_product(product_locator)
def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter:
return self.console.obtain_parameter(parameter_type)
class PrototypeQueueRunner(QueueRunner, Thread):
engines: Dict[CapabilityRequest, CapabilityEngine]
@inject
def __init__(self, queue: Queue, max_jobs: int, engine_builder: ClassAssistedBuilder[PrototypeCapabilityEngine]):
super().__init__()
self.queue = queue
self.semaphore = Semaphore(max_jobs)
self.engines = {}
self.engine_builder = engine_builder
def run(self) -> None:
while True:
# obtain the semaphore
self.semaphore.acquire()
# now get a job from the queue
request = self.queue.get()
# now build an engine and start executing that request
self.engines[request.id] = self.engine_builder.build(request=request)
# execute the first step of this capability
self.engines[request.id].execute(request)
# release the semaphore
self.semaphore.release()
def complete(self, request):
"""
Sent by the engine when it is done executing a capability
:return:
"""
del self.engines[request.id]
self.semaphore.release()
class PrototypeCapabilityService(CapabilityService):
queues: Dict[CapabilityName, CapabilityQueue]
@inject
def __init__(self, info: CapabilityInfo, queue_builder: ClassAssistedBuilder[PrototypeCapabilityQueue]):
self.queues = {}
self.info = info
self.queue_builder = queue_builder
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
# 1. Locate the capability
capability = self.info.lookup_capability(name)
# 2. Create a request
request = capability.create_request(locators)
# 3. Persist the request
self.info.save_request(request)
# 4. Return it
return request
def _locate_queue(self, request: CapabilityRequest) -> CapabilityQueue:
# 1. Create a queue for this capability, if we don't have one currently
if request.capability.name not in self.queues:
self.queues[request.capability.name] = self.queue_builder.build(capability=request.capability)
# 2. Return the queue for this capability
return self.queues[request.capability.name]
def execute(self, request: CapabilityRequest) -> None:
# 1. Locate the proper queue for this request
queue = self._locate_queue(request)
# 2. Submit the request to that queue
queue.enqueue(request)
class HardcodedProductService(ProductService):
def __init__(self):
self.products = {'nmtest-capo': pathlib.Path('/home/casa/capo/nmtest.properties'),
'readme': pathlib.Path('README.md')}
def locate_product(self, product_locator: ProductLocator) -> Path:
return self.products[product_locator]
......@@ -6,7 +6,8 @@ import inspect
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List
from threading import Thread
from typing import Dict, List, Optional, Type
ProductLocator = str
CapabilityName = str
......@@ -14,12 +15,12 @@ CapabilityName = str
@dataclass
class Capability(ABC):
name: CapabilityName
max_jobs: int
"""
A capability
"""
name: CapabilityName
max_jobs: int
def create_request(self, locators: List[ProductLocator]):
"""
Create a capability request for this capability
......@@ -49,21 +50,32 @@ class CapabilityRequest:
"""
A particular capability request
"""
# From wksp0
capability: Capability
locators: List[ProductLocator]
id: Optional[int]
parameters: List["Parameter"]
files: List[Path]
# From class diagram
# FIXME: Will versions be str or a class of their own?
versions: List[str]
tickets: List[Ticket]
queue: "CapabilityQueue"
@property
def last_parameter(self) -> "Parameter":
return self.parameters[-1]
@dataclass
class CapabilityInfo(ABC):
"""
Interface to stored capability information.
"""
# TODO: Implement these two types
matrix: CapabilityMatrix
proj_settings: ProjectSettings
@abstractmethod
def lookup_capability(self, capability_name: str) -> Capability:
"""
......@@ -107,10 +119,22 @@ class CapabilityQueue(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilityService(ABC):
"""
The capability service: clients access this to request capability runs
"""
# TODO: Implement EventTranslator and CapabilityFactory
event_translator: EventTranslator
queue: CapabilityQueue
info: CapabilityInfo
factory: CapabilityFactory
# TODO: Implement two services
workflow: "WorkflowServiceIF"
estimation_service: EstimationService
metrics_service: MetricsService
@abstractmethod
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
"""
......@@ -156,20 +180,26 @@ class Parameter(ABC):
Abstracts parameters needed for running capabilities.
"""
@staticmethod
@abstractmethod
def fields() -> Dict[FieldName, FieldLabel]:
raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}')
@abstractmethod
def json(self) -> Dict[str, str]:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def load(self, json: Dict[str, str]) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilityStep(ABC):
"""
A step in a capability sequence
"""
next: CapabilityStep
@abstractmethod
def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"):
"""
......@@ -179,6 +209,15 @@ class CapabilityStep(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilityExecution(ABC):
request: WorkflowRequest
# FIXME: Will this be a string?
version: str
current_step: CapabilityStep
sequence: CapabilitySequence
class CapabilityEngineResponder(ABC):
"""
Abstracts the callbacks for a capability engine
......@@ -196,11 +235,12 @@ class CapabilityEngineResponder(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilitySequence(ABC):
"""
Represents the sequence of events in a capability.
"""
pass
steps: List[CapabilityStep]
class CapabilityEngine(ABC):
......@@ -216,10 +256,10 @@ class AwaitProduct(CapabilityStep, ABC):
"""
Wait for a product to become available.
"""
# FIXME: Does this need to be a class attribute?
product: ProductLocator
def __init__(self, product: Optional[ProductLocator]=None):
def __init__(self, product: Optional[ProductLocator] = None):
self.product = product
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
......@@ -237,7 +277,7 @@ class AwaitParameter(CapabilityStep, ABC):
"""
Wait for a certain parameter to arrive (probably from the UI).
"""
# FIXME: Does this need to be a class attribute?
parameter_type: Type[Parameter]
def __init__(self, parameter_type: Type[Parameter]):
......@@ -251,6 +291,7 @@ class PrepareAndRunWorkflow(CapabilityStep, ABC):
"""
Render templates and execute a workflow, awaiting its completion.
"""
# FIXME: Does this need to be a class attribute?
workflow_name: str
def __init__(self, workflow_name: str):
......@@ -277,10 +318,15 @@ class WorkflowServiceIF(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class WorkflowInfoIF(ABC):
"""
Holds information about workflows.
"""
# TODO: Implement WorkflowDefinition
definition: WorkflowDefinition
request: "WorkflowRequest"
@abstractmethod
def lookup_workflow_definition(self, name: str) -> 'Workflow':
"""
......@@ -290,3 +336,16 @@ class WorkflowInfoIF(ABC):
:return: Workflow instance
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowDefinition(ABC):
pass
@dataclass
class WorkflowRequest(ABC):
workflow_definition: WorkflowDefinition
name: str
argument: str # JSON-formatted
files: List[Path]
job_id: str
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment