Skip to content
Snippets Groups Projects
Commit cee93f54 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Continued implementing capability service; started implementing

capability info
parent be4a49b0
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ import inspect
from abc import ABC, abstractmethod
from enum import Enum, auto
from threading import Thread
from typing import List, Union, Optional
from .product_interfaces import ProductIF, FutureProductIF
from .schema import CapabilityRequest, Capability, CapabilityExecution
......@@ -25,35 +26,70 @@ class CapabilityInfoIF(ABC):
"""
Data access object that can look up and record information about capabilities
and capability requests.
# FIXME: I opted to make all the methods static. Is that going to work?
"""
@staticmethod
@abstractmethod
def lookup_capability(capability_name: str) -> Capability:
def create_capability(
self,
name: CapabilityName,
steps: List["CapabilityStepType"],
max_jobs: int
) -> int:
"""
Look up the definition of a capability
:param capability_name: the name of the capability to find
:return: a capability
Create new capability and save it in the database
:param name: Name of new capability
:param steps: List of capability steps required to execute the capability
:param max_jobs: Max allowed number of concurrent executions of this kind of capability
:return: Integer identifier for the capability
"""
raise NotImplementedError()
raise NotImplementedError
@staticmethod
@abstractmethod
def lookup_capability_request(capability_request_id: int) -> CapabilityRequest:
def create_capability_request(
self,
capability_id: int,
parameters: List["ParameterIF"],
future_products: List[FutureProductIF],
versions: List[str]
) -> int:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
Create new capability request and save it in the database
:param capability_id: ID of the requested capability
:param parameters: List
:param future_products:
:param versions:
:return: Integer identifier of the request
"""
raise NotImplementedError()
raise NotImplementedError
@abstractmethod
def create_execution_record(self, request_id: int) -> int:
"""
Create new execution record for a request and save it in the database
:param request_id: ID of the capability request
:return: Integer identifier for the record
"""
raise NotImplementedError
@abstractmethod
def lookup_entity(
self,
entity_id: int,
entity_schema: Union[Capability, CapabilityRequest, CapabilityExecution]
) -> Optional[Capability, CapabilityRequest, CapabilityExecution]:
"""
Look up entity in database and return object representation of it if found
:param entity_id: ID of entity to be searched for
:param entity_schema: Database schema of the entity
:return: Object representation of entity if found, else None
"""
raise NotImplementedError
@staticmethod
@abstractmethod
def save_request(request: CapabilityRequest) -> int:
def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int:
"""
Save a capability request and return an integer identifier for it
:param request: the request to save
:return: the request identifier
Save a given entity and return an integer identifier for it
:param entity: The entity to save
:return: The entity's identifier
"""
raise NotImplementedError()
......@@ -190,10 +226,10 @@ class FutureCapabilityResultIF(ABC, ProductIF):
pass
class ParametersIF(ABC):
class ParameterIF(ABC):
"""
Class that represents a set of capability parameters, a required input for the correct
and complete execution of said capability
and complete execution of a capability
"""
def json(self) -> str:
"""
......
......@@ -6,13 +6,15 @@ import subprocess
from pathlib import Path
from queue import PriorityQueue
from tempfile import mkdtemp
from typing import Dict, List
from typing import Dict, List, Union, Optional
from sqlalchemy.orm import Session
from .capability_interfaces import CapabilityServiceIF, CapabilityQueueIF, CapabilityInfoIF, \
CapabilityEngineIF
CapabilityEngineIF, CapabilityName, CapabilityStepType
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF
from .schema import Workflow, WorkflowEvent, WorkflowEventType, CapabilityRequest, Capability, \
CapabilityExecution
CapabilityExecution, get_engine, get_session_factory
class CapabilityService(CapabilityServiceIF):
......@@ -28,9 +30,27 @@ class CapabilityService(CapabilityServiceIF):
Run a capability with the requested settings
:param request: Capability request
"""
execution_record = CapabilityExecution(workflow_request=request.id)
execution_record = CapabilityExecution(capability_request=request.id, current_step=0)
self.execution_pool.append(execution_record)
def enqueue_execution(self, execution_record: CapabilityExecution):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
queue
"""
capability = CapabilityInfo.lookup_capability(
CapabilityInfo.lookup_capability_request(
execution_record.capability_request
).capability
)
# Get correct queue or initialize one
queue = self.queues.get(capability.name, CapabilityQueue(capability.max_jobs))
queue.enqueue(execution_record)
self.queues[capability.name] = queue
# Remove execution record from pool
self.execution_pool.remove(execution_record)
class CapabilityEngine(CapabilityEngineIF):
def execute(self, execution: CapabilityExecution):
......@@ -43,41 +63,54 @@ class CapabilityInfo(CapabilityInfoIF):
Interface for data access object that can look up and record information about capabilities
and capability requests. Accesses and modifies the database.
"""
@staticmethod
def lookup_capability(capability_name: str) -> Capability:
def __init__(self, session: Session):
self.session = session
def create_capability(
self,
name: CapabilityName,
steps: List["CapabilityStepType"],
max_jobs: int
) -> int:
"""
Look up the definition of a capability
:param capability_name: the name of the capability to find
:return: a capability
Create new capability and save it in the database
:param name: Name of new capability
:param steps: List of capability steps required to execute the capability
:param max_jobs: Max allowed number of concurrent executions of this kind of capability
:return: Integer identifier for the capability
"""
# TODO: Query the database and retrieve capability info
pass
@staticmethod
def lookup_capability_request(capability_request_id: int) -> CapabilityRequest:
# TODO: Convert list of steps to DB-ready string
# TODO: steps_str = ...
capability = Capability(name=name, steps=steps_str, max_jobs=max_jobs)
# TODO: Commit capability to the database using save_entity
def lookup_entity(
self,
entity_id: int,
entity_schema: Union[Capability, CapabilityRequest, CapabilityExecution]
) -> Optional[Capability, CapabilityRequest, CapabilityExecution]:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
Look up entity in database and return object representation of it if found
:param entity_id: ID of entity to be searched for
:param entity_schema: Database schema of the entity
:return: Object representation of entity if found, else None
"""
# TODO: Query the database and retrieve capability request info
pass
@staticmethod
def save_request(request: CapabilityRequest) -> int:
def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int:
"""
Save a capability request and return an integer identifier for it
:param request: the request to save
:return: the request identifier
Save a given entity and return an integer identifier for it
:param entity: the entity to save
:return: the entity's identifier
"""
# TODO: Save capability request information in database
# TODO: If the request in database, update it
# TODO: Save entity information in correct table
# TODO: If the entity in database, update it
# TODO: Else save as new entry
pass
class CapabilityQueue(CapabilityQueueIF):
def __init__(self, queue: PriorityQueue[CapabilityRequest], max_concurrency: int):
def __init__(self, max_concurrency: int):
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment