Skip to content
Snippets Groups Projects
Commit edf361d7 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Continued implementing capability submission use case via

CapabilityService, CapabilityInfo, CapabilityQueue, and
CapabilityEngine; updated some schema and interfaces
parent e2e84088
No related branches found
No related tags found
No related merge requests found
......@@ -130,10 +130,10 @@ class CapabilityQueueIF(ABC):
class CapabilityEngineIF(ABC):
"""
Executes a capability.
Executes a prepare and run workflow step of a capability
"""
@abstractmethod
def execute(self, execution: CapabilityExecution):
def execute(self):
pass
......
......@@ -131,6 +131,7 @@ class CapabilityRequest(Base, CapabilityRequestIF):
"""
__tablename__ = 'capability_requests'
id = sa.Column('request_id', sa.Integer, primary_key=True)
state = sa.Column('state', sa.String)
capability = sa.Column(
'capability',
sa.Integer,
......@@ -148,6 +149,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
"""
__tablename__ = 'capability_executions'
id = sa.Column('execution_id', sa.Integer, primary_key=True)
state = sa.Column('state', sa.String)
capability_request = sa.Column(
'capability_request_id',
sa.Integer,
......
......@@ -11,7 +11,10 @@ from typing import Dict, List, Union, Optional
from sqlalchemy.orm import Session
from .capability_interfaces import CapabilityServiceIF, CapabilityQueueIF, CapabilityInfoIF, \
CapabilityEngineIF, CapabilityName, CapabilityStepType
CapabilityEngineIF, CapabilityName, CapabilityStepType, ParameterIF, RequestState, \
ExecutionState, ExecutionPriority
from .helpers import CapabilitySequence
from .product_interfaces import FutureProductIF
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF
from .schema import Workflow, WorkflowEvent, WorkflowEventType, CapabilityRequest, Capability, \
CapabilityExecution, get_engine, get_session_factory
......@@ -22,6 +25,7 @@ class CapabilityService(CapabilityServiceIF):
The capability service: clients access this to request capability runs
"""
def __init__(self):
# FIXME: Needs an instance of CapabilityInfo
self.execution_pool = []
self.queues = {}
......@@ -33,29 +37,41 @@ class CapabilityService(CapabilityServiceIF):
execution_record = CapabilityExecution(capability_request=request.id, current_step=0)
self.execution_pool.append(execution_record)
def enqueue_execution(self, execution_record: CapabilityExecution):
def enqueue_execution(
self,
execution_record: CapabilityExecution,
priority: int = ExecutionPriority.Default.value
):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
queue
"""
capability = CapabilityInfo.lookup_capability(
CapabilityInfo.lookup_capability_request(
execution_record.capability_request
).capability
request = capability_info.lookup_entity(
CapabilityRequest, execution_record.capability_request
)
capability = capability_info.lookup_entity(Capability, request.capability)
# Get correct queue or initialize one
queue = self.queues.get(capability.name, CapabilityQueue(capability.max_jobs))
queue.enqueue(execution_record)
self.queues[capability.name] = queue
queue = self.queues.get(capability.id, CapabilityQueue(capability.max_jobs))
queue.enqueue(execution_record, priority)
self.queues[capability.id] = queue
# Remove execution record from pool
self.execution_pool.remove(execution_record)
class CapabilityEngine(CapabilityEngineIF):
def execute(self, execution: CapabilityExecution):
# Execute as many steps as possible of a given execution
pass
"""
Executes a prepare and run workflow step of a capability
"""
def __init__(self, execution: CapabilityExecution):
self.execution = execution
def execute(self):
"""
Communicate with workflow service and send a request to run a workflow with given settings
"""
# workflow_service.execute(workflow_name, arguments, files)
class CapabilityInfo(CapabilityInfoIF):
......@@ -69,7 +85,7 @@ class CapabilityInfo(CapabilityInfoIF):
def create_capability(
self,
name: CapabilityName,
steps: List["CapabilityStepType"],
steps: CapabilitySequence,
max_jobs: int
) -> int:
"""
......@@ -79,10 +95,44 @@ class CapabilityInfo(CapabilityInfoIF):
:param max_jobs: Max allowed number of concurrent executions of this kind of capability
:return: Integer identifier for the capability
"""
# TODO: Convert list of steps to DB-ready string
# TODO: steps_str = ...
capability = Capability(name=name, steps=steps_str, max_jobs=max_jobs)
# TODO: Commit capability to the database using save_entity
capability = Capability(name=name, steps=str(steps), max_jobs=max_jobs)
return self.save_entity(capability)
def create_capability_request(
self,
capability_id: int,
parameters: List[ParameterIF],
future_products: List[FutureProductIF],
versions: List[str]
) -> int:
"""
Create new capability request and save it in the database
:param capability_id: ID of the requested capability
:param parameters: List
:param future_products:
:param versions:
:return: Integer identifier of the request
"""
# FIXME: Parameters, future products and versions need valid str reprs
request = CapabilityRequest(
state=RequestState.Ready.name,
capability=capability_id,
parameters=str(parameters),
future_products=str(future_products),
versions=str(versions)
)
return self.save_entity(request)
def create_execution_record(self, request_id: int) -> int:
"""
Create new execution record for a request and save it in the database
:param request_id: ID of the capability request
:return: Integer identifier for the record
"""
record = CapabilityExecution(
state=ExecutionState.Ready.name, capability_request=request_id, current_step=0
)
return self.save_entity(record)
def lookup_entity(
self,
......@@ -95,7 +145,7 @@ class CapabilityInfo(CapabilityInfoIF):
:param entity_schema: Database schema of the entity
:return: Object representation of entity if found, else None
"""
pass
return self.session.query(entity_schema).filter(entity_schema.id == entity_id)
def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int:
"""
......@@ -103,23 +153,31 @@ class CapabilityInfo(CapabilityInfoIF):
:param entity: the entity to save
:return: the entity's identifier
"""
# TODO: Save entity information in correct table
# TODO: If the entity in database, update it
# TODO: Else save as new entry
pass
self.session.add(entity)
self.session.commit()
class CapabilityQueue(CapabilityQueueIF):
def __init__(self, max_concurrency: int):
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.paused = False
def enqueue(self, request: CapabilityRequest):
self.queue.put(request)
def enqueue(self, request: CapabilityRequest, priority: int):
self.queue.put((priority, request))
def dequeue(self) -> CapabilityRequest:
return self.queue.get()
def pause(self):
self.paused = True
def unpause(self):
self.paused = False
def __len__(self):
return self.queue.qsize()
class WorkflowService(WorkflowServiceIF):
def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment