Skip to content
Snippets Groups Projects
Commit de4f4aad authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Refactor to make Capability use workflow REST API

parent 397d8ade
No related branches found
No related tags found
No related merge requests found
......@@ -5,7 +5,12 @@ from pyramid.request import Request
from pyramid.view import view_config, view_defaults
from pyramid_beaker import session_factory_from_settings
from workspaces.schema import get_session_factory, get_engine
from workspaces.services import CapabilityInfo, CapabilityService, WorkflowInfo
from workspaces.services import (
CapabilityInfo,
CapabilityService,
WorkflowInfo,
WorkflowServiceRESTClient,
)
# ---------------------------------------------------------
......@@ -16,7 +21,9 @@ from workspaces.services import CapabilityInfo, CapabilityService, WorkflowInfo
def lookup_request(request):
return request.capability_info.lookup_capability_request(request.matchdict["request_id"])
return request.capability_info.lookup_capability_request(
request.matchdict["request_id"]
)
@view_defaults(route_name="capability_request", renderer="json")
......@@ -95,13 +102,15 @@ def main(global_config, **settings):
# make workflow_info available for use in Pyramid
config.add_request_method(
# r.tm is the transaction manager used by pyramid_tm
lambda request: WorkflowInfo(get_tm_session(session_factory, request.tm)),
"workflow_info",
lambda request: WorkflowServiceRESTClient(),
"workflow_service",
reify=True,
)
# make workflow_service available for use in Pyramid
config.add_request_method(
lambda r: CapabilityService(r.capability_info, r.workflow_info), "capabilities", reify=True
lambda r: CapabilityService(r.capability_info, r.workflow_service),
"capabilities",
reify=True,
)
# add some routes
......
......@@ -45,11 +45,15 @@ class CapabilityVersionIF:
class CapabilityExecutionIF:
id: int
capability_version: CapabilityVersionIF
state: str
current_step: int
version: CapabilityVersionIF
current_workflow_request: WorkflowRequestIF
steps: str
capability: CapabilityIF
capability_request: CapabilityRequestIF
def __json__(self, request: Any) -> dict:
raise NotImplementedError
......@@ -120,6 +124,10 @@ class CapabilityInfoIF(ABC):
) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def save_execution(self, execution: CapabilityExecutionIF):
pass
class ProjectSettingsIF(ABC):
"""
......
......@@ -340,6 +340,10 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
def capability(self):
return self.version.capability
@property
def capability_request(self):
return self.version.request
class Workflow(Base, WorkflowIF):
"""
......
......@@ -10,17 +10,21 @@ from queue import PriorityQueue, Queue, Empty
from tempfile import mkdtemp
from typing import Dict, List, Optional, NamedTuple, Union
import requests
from channels.amqp_helpers import (
workflow_events,
capability_events,
Channel,
WorkflowEventChannel,
)
from pycapo import CapoConfig
from sqlalchemy.orm import Session
from wf_monitor.monitor import (
log_decorator_factory,
WorkflowMonitor,
)
from workspaces.workflow_interfaces import WorkflowRequestIF
from .capability_interfaces import (
CapabilityServiceIF,
CapabilityQueueIF,
......@@ -29,6 +33,7 @@ from .capability_interfaces import (
CapabilityName,
ParameterIF,
CapabilityExecutionIF,
CapabilityRequestIF,
)
from .helpers import (
CapabilitySequence,
......@@ -65,12 +70,12 @@ class CapabilityService(CapabilityServiceIF):
"""
def __init__(
self, capability_info: "CapabilityInfoIF", workflow_info: "WorkflowInfoIF"
self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF
):
self.execution_pool = []
self.queues = {}
self.capability_info = capability_info
self.workflow_info = workflow_info
self.workflow_service = workflow_service
self.listener = threading.Thread(target=self.listen_for_events, daemon=True)
self.listener.start()
......@@ -79,12 +84,12 @@ class CapabilityService(CapabilityServiceIF):
capability_name: str,
parameters: List[Parameter] = None,
products: List[FutureProduct] = None,
) -> CapabilityRequest:
) -> CapabilityRequestIF:
return self.capability_info.create_capability_request(
capability_name, parameters, products
)
def run_capability(self, request: CapabilityRequest) -> CapabilityExecution:
def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF:
"""
Run a capability with the requested settings
:param request: Capability request
......@@ -130,7 +135,7 @@ class CapabilityService(CapabilityServiceIF):
else:
# Execution on its last step and received the proper event
execution.state = ExecutionState.Complete.name
self.capability_info.save_entity(execution)
self.capability_info.save_execution(execution)
return
# Check if upcoming step is run workflow step
......@@ -146,7 +151,7 @@ class CapabilityService(CapabilityServiceIF):
# Update execution record's state
execution.state = ExecutionState.Ready.name
self.capability_info.save_entity(execution)
self.capability_info.save_execution(execution)
else:
# TODO: Logging
print(
......@@ -155,7 +160,7 @@ class CapabilityService(CapabilityServiceIF):
def enqueue_execution(
self,
execution: CapabilityExecution,
execution: CapabilityExecutionIF,
priority: int = ExecutionPriority.Default.value,
):
"""
......@@ -166,7 +171,9 @@ class CapabilityService(CapabilityServiceIF):
queue = self.queues.get(
execution.capability.id,
CapabilityQueue(
self.capability_info, self.workflow_info, execution.capability.max_jobs
self.capability_info,
self.workflow_service,
execution.capability.max_jobs,
),
)
queue.enqueue(execution, priority)
......@@ -197,7 +204,7 @@ class CapabilityService(CapabilityServiceIF):
)
return CapabilityEvent(
event_type, execution.capability_request_id, execution.id
event_type, execution.capability_request.id, execution.id
)
def listen_for_events(self):
......@@ -312,6 +319,9 @@ class CapabilityInfo(CapabilityInfoIF):
.all()
)
def save_execution(self, execution: CapabilityExecutionIF):
self.session.add(execution)
class CapabilityEngine(CapabilityEngineIF):
"""
......@@ -338,8 +348,8 @@ class CapabilityEngine(CapabilityEngineIF):
def submit_workflow_request(
self, workflow_name: str, workflow_args: dict, files: List[AbstractFile]
):
workflow_request = self.workflow_service.info.create_workflow_request(
workflow_name, workflow_args
workflow_request = WorkflowRequest(
workflow_name=workflow_name, argument=workflow_args
)
self.workflow_service.execute(workflow_request, files)
......@@ -354,16 +364,15 @@ class CapabilityQueue(CapabilityQueueIF):
def __init__(
self,
capability_info: CapabilityInfo,
workflow_info: "WorkflowInfo",
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
max_concurrency: int,
):
self.capability_info = capability_info
self.workflow_info = workflow_info
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(
max_concurrency, capability_info, WorkflowService(workflow_info)
max_concurrency, capability_info, workflow_service
)
self.paused = False
......@@ -371,8 +380,8 @@ class CapabilityQueue(CapabilityQueueIF):
def init_engines(
cls,
num_engines: int,
capability_info: CapabilityInfo,
workflow_service: "WorkflowService",
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
) -> NamedTuple:
"""
Initialize available engine queue and list of in-use engines
......@@ -436,10 +445,10 @@ class CapabilityQueue(CapabilityQueueIF):
except Empty:
return None
def enqueue(self, request: CapabilityRequest, priority: int):
def enqueue(self, request: CapabilityExecutionIF, priority: int):
self.queue.put((priority, request))
def dequeue(self) -> CapabilityRequest:
def dequeue(self) -> CapabilityExecutionIF:
return self.queue.get()
def pause(self):
......@@ -454,6 +463,33 @@ class CapabilityQueue(CapabilityQueueIF):
return self.queue.qsize()
class WorkflowServiceRESTClient(WorkflowServiceIF):
def __init__(self):
self.url = (
CapoConfig()
.settings("edu.nrao.archive.workspaces.WorkflowSettings")
.serviceUrl
)
def execute(self, request: WorkflowRequestIF, files: List["AbstractFile"]):
# step 1: create the request
req = requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/create"
).json()
# step 2: if necessary, pass the files up for this request
for file in files:
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/files/{file.filename}",
body=file.content,
)
# step 3: execute the request
requests.post(
f"{self.url}/workflows/requests/{req['workflow_request_id']}/submit"
)
class WorkflowService(WorkflowServiceIF):
"""
Executes workflows; should be a freestanding service.
......
......@@ -31,7 +31,7 @@ class WorkflowIF:
class WorkflowRequestIF:
workflow_request_id: int
workflow: WorkflowIF
name: str
workflow_name: str
argument: str # JSON-formatted
files: List[WorkflowRequestFileIF]
job_id: str
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment