from queue import Queue from threading import Semaphore # FIXME: This package doesn't seem installable via conda from injector import ClassAssistedBuilder, inject from workspaces.interfaces import * import pathlib class SearchParameters(Parameter): search: str @staticmethod def fields() -> Dict[FieldName, FieldLabel]: return {'search': 'Search'} def json(self): return {'search': self.search} def load(self, json): self.search = json['search'] def __repr__(self): return f"<SearchParameters search='{self.search}'>" class QaStatus(Parameter): status: bool @staticmethod def fields() -> Dict[FieldName, FieldLabel]: return {'qa-pass': 'Passes QA'} def json(self): return {'qa-pass': self.status} def load(self, json): self.status = json['qa-pass'].strip().lower() in ['yes', 'y', 'true'] def __repr__(self): return f"<QaStatus {'pass' if self.status else 'fail'}>" ParameterRegistry = {'search-parameters': SearchParameters, 'qa-status': QaStatus} class ConsoleParameterReader: @staticmethod def obtain_parameter(parameter_type: Type[Parameter]) -> Parameter: json = {} for field, label in parameter_type.fields().items(): json[field] = input(label + '?> ') result = parameter_type() result.load(json) return result class DirectoryCapability(Capability): """ Implements a capability by reading files off the filesystem (rather than from a database or whatnot). """ max_jobs: int sequence: List[CapabilityStep] def create_request(self, locators: List[ProductLocator]): return CapabilityRequest(capability=self, locators=locators, files=[], id=None, parameters=[]) def __init__(self, path: pathlib.Path): self.path = path self.name = path.name self.sequence = self.parse(self.path / 'sequence.txt') self.max_jobs = 2 def __hash__(self): return hash({'name': 'DirectoryCapability', 'path': self.path}) @staticmethod def parse(path: pathlib.Path): sequence = [] with path.open('r') as f: for line in f: if line.startswith('AWAIT PRODUCTS'): sequence.append(AwaitProduct()) elif line.startswith('AWAIT PRODUCT '): sequence.append(AwaitProduct(line.split('AWAIT PRODUCT ')[1].strip())) elif line.startswith('AWAIT PARAMETER '): sequence.append(AwaitParameter(ParameterRegistry[line.split('AWAIT PARAMETER ')[1].strip()])) elif line.startswith('PREPARE AND RUN WORKFLOW '): sequence.append(PrepareAndRunWorkflow(line.split('PREPARE AND RUN WORKFLOW ')[1].strip())) return sequence class DirectoryCapabilityInfo(CapabilityInfo): """ Finds information about capabilities on the filesystem. Stores requests in memory (in a list). """ def __init__(self, path: pathlib.Path): self.path = path self.requests = [] self.n_requests = 0 def lookup_capability(self, capability_name: str) -> Capability: return DirectoryCapability(self.path / capability_name) def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest: return self.requests[capability_request_id] def save_request(self, request: CapabilityRequest) -> int: # 1. Record this request in our list of requests self.requests.append(request) self.n_requests += 1 # 2. Record the ID on the request itself request.id = self.n_requests # return it return request.id class PrototypeCapabilityQueue(CapabilityQueue): """ Implements the CapabilityQueue API, backed by a simple thread-safe queue. """ items: Queue @inject def __init__(self, capability: Capability, runner_builder: ClassAssistedBuilder["PrototypeQueueRunner"]): self.items = Queue() self.runner = runner_builder.build(queue=self.items, max_jobs=capability.max_jobs) self.runner.start() def enqueue(self, request: CapabilityRequest): # 1. place this request into some kind of queue self.items.put(request) class PrototypeCapabilityEngine(CapabilityEngine): request: CapabilityRequest responder: CapabilityEngineResponder @inject def __init__(self, request: CapabilityRequest, responder: CapabilityEngineResponder): self.request = request self.responder = responder def execute(self, request): for step in request.capability.sequence: self._execute_step(step) def _execute_step(self, step: CapabilityStep): step.execute_against(self.request, self.responder) class PrototypeCapabilityEngineResponder(CapabilityEngineResponder): workflow_service: WorkflowService product_service: ProductService @inject def __init__(self, workflow_service: WorkflowService, product_service: ProductService): self.workflow_service = workflow_service self.product_service = product_service self.console = ConsoleParameterReader() def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]): # in here I need to find the WorkflowService return self.workflow_service.execute(name, param.json(), files) def await_product(self, step: CapabilityStep, product_locator: ProductLocator): return self.product_service.locate_product(product_locator) def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter: return self.console.obtain_parameter(parameter_type) class PrototypeQueueRunner(QueueRunner, Thread): engines: Dict[CapabilityRequest, CapabilityEngine] @inject def __init__(self, queue: Queue, max_jobs: int, engine_builder: ClassAssistedBuilder[PrototypeCapabilityEngine]): super().__init__() self.queue = queue self.semaphore = Semaphore(max_jobs) self.engines = {} self.engine_builder = engine_builder def run(self) -> None: while True: # obtain the semaphore self.semaphore.acquire() # now get a job from the queue request = self.queue.get() # now build an engine and start executing that request self.engines[request.id] = self.engine_builder.build(request=request) # execute the first step of this capability self.engines[request.id].execute(request) # release the semaphore self.semaphore.release() def complete(self, request): """ Sent by the engine when it is done executing a capability :return: """ del self.engines[request.id] self.semaphore.release() class PrototypeCapabilityService(CapabilityService): queues: Dict[CapabilityName, CapabilityQueue] @inject def __init__(self, info: CapabilityInfo, queue_builder: ClassAssistedBuilder[PrototypeCapabilityQueue]): self.queues = {} self.info = info self.queue_builder = queue_builder def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest: # 1. Locate the capability capability = self.info.lookup_capability(name) # 2. Create a request request = capability.create_request(locators) # 3. Persist the request self.info.save_request(request) # 4. Return it return request def _locate_queue(self, request: CapabilityRequest) -> CapabilityQueue: # 1. Create a queue for this capability, if we don't have one currently if request.capability.name not in self.queues: self.queues[request.capability.name] = self.queue_builder.build(capability=request.capability) # 2. Return the queue for this capability return self.queues[request.capability.name] def execute(self, request: CapabilityRequest) -> None: # 1. Locate the proper queue for this request queue = self._locate_queue(request) # 2. Submit the request to that queue queue.enqueue(request) class HardcodedProductService(ProductService): def __init__(self): self.products = {'nmtest-capo': pathlib.Path('/home/casa/capo/nmtest.properties'), 'readme': pathlib.Path('README.md')} def locate_product(self, product_locator: ProductLocator) -> Path: return self.products[product_locator]