Skip to content
Snippets Groups Projects
Commit b3de8c1d authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Merge remote-tracking branch 'origin/release/WS-0.1' into release/WS-0.1

parents 616aa9b8 4b8ae7f1
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,7 @@ from workspaces.services import CapabilityInfo, CapabilityService, WorkflowInfo
def lookup_request(request):
return request.info.lookup_capability_request(request.matchdict["request_id"])
return request.capability_info.lookup_capability_request(request.matchdict["request_id"])
@view_defaults(route_name="capability_request", renderer="json")
......
......@@ -81,7 +81,7 @@ class Channel(Generic[ChannelDef]):
self.chan: BlockingChannel = None
self.config: CapoConfig = None
def connect(self, **kwargs: Union[int, str]):
def connect(self, threaded: bool, **kwargs: Union[int, str]):
"""
Initialize connection to AMQP server given a CAPO profile
......@@ -97,7 +97,7 @@ class Channel(Generic[ChannelDef]):
"""
global CONN
if not CONN:
if not CONN or threaded is True:
self.config = CapoConfig(profile=kwargs.get("profile", None)).settings(
"edu.nrao.archive.configuration.AmqpServer"
)
......@@ -140,7 +140,7 @@ class Channel(Generic[ChannelDef]):
)
def listen(
self, callback: Optional[Callable], pattern: str = "#", auto_ack: bool = False
self, callback: Optional[Callable], pattern: str = "#", auto_ack: bool = False, threaded: bool = False
):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
......@@ -155,7 +155,7 @@ class Channel(Generic[ChannelDef]):
event = self.definition.schema().loads(message)
callback(event)
self.connect()
self.connect(threaded)
queue = self.chan.queue_declare(queue="", exclusive=True).method.queue
self.chan.queue_bind(
queue=queue, exchange=self.definition.exchange(), routing_key=pattern
......
......@@ -13,7 +13,7 @@ from typing import Dict, List, Optional, NamedTuple
from channels.amqp_helpers import (
workflow_events,
capability_events,
capability_events, Channel, WorkflowEventChannel,
)
from sqlalchemy.orm import Session
from wf_monitor.monitor import (
......@@ -61,7 +61,9 @@ class CapabilityService(CapabilityServiceIF):
The capability service: clients access this to request capability runs
"""
def __init__(self, capability_info: "CapabilityInfo", workflow_info: "WorkflowInfo"):
def __init__(
self, capability_info: "CapabilityInfo", workflow_info: "WorkflowInfo"
):
self.execution_pool = []
self.queues = {}
self.capability_info = capability_info
......@@ -160,7 +162,9 @@ class CapabilityService(CapabilityServiceIF):
# Get correct queue or initialize one
queue = self.queues.get(
execution.capability.id,
CapabilityQueue(self.capability_info, execution.capability.max_jobs),
CapabilityQueue(
self.capability_info, self.workflow_info, execution.capability.max_jobs
),
)
queue.enqueue(execution, priority)
self.queues[execution.capability.id] = queue
......@@ -185,7 +189,9 @@ class CapabilityService(CapabilityServiceIF):
# No corresponding capability event
return None
workflow_request = self.workflow_info.lookup_workflow_request(event.workflow_request_id)
workflow_request = self.workflow_info.lookup_workflow_request(
event.workflow_request_id
)
execution_id = workflow_request.execution_id
return CapabilityEvent(event_type, execution_id)
......@@ -196,7 +202,8 @@ class CapabilityService(CapabilityServiceIF):
to update capability executions
:return:
"""
workflow_events.listen(callback=self.update_execution)
thread_workflow_events = Channel(WorkflowEventChannel())
thread_workflow_events.listen(callback=self.update_execution)
class CapabilityInfo(CapabilityInfoIF):
......@@ -335,11 +342,16 @@ class CapabilityQueue(CapabilityQueueIF):
EngineList = namedtuple("EngineList", ["available", "in_use"])
def __init__(self, capability_info: CapabilityInfo, max_concurrency: int):
def __init__(
self,
capability_info: CapabilityInfo,
workflow_info: "WorkflowInfo",
max_concurrency: int,
):
self.queue = PriorityQueue()
self.max_concurrency = max_concurrency
self.engine_list = self.init_engines(
max_concurrency, capability_info, WorkflowService()
max_concurrency, capability_info, WorkflowService(workflow_info)
)
self.paused = False
......@@ -527,21 +539,25 @@ class WorkflowService(WorkflowServiceIF):
# vulture is a workaround for testing locally without submitting to condor
print("submitting to vulture...")
subprocess.run(["vulture", 'job', 'execute', str(condor)], cwd=str(folder.absolute()))
subprocess.run(
["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())
)
# return the logfile
return logfile
@staticmethod
def on_workflow_event(
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(workflow_events.send)
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
# 1. send amqp event to workflow channel
decorated_workflow_send(event)
......@@ -558,7 +574,9 @@ class WorkflowService(WorkflowServiceIF):
else:
status = WorkflowRequestState.Running.name
print(f"Updating state on workflow request {request_record.workflow_request_id} to {status}...")
print(
f"Updating state on workflow request {request_record.workflow_request_id} to {status}..."
)
request_record.update_status(status)
......@@ -574,7 +592,11 @@ class WorkflowInfo(WorkflowInfoIF):
return self.session.query(Workflow).get(name)
def lookup_workflow_request(self, request_id: int) -> WorkflowRequest:
return self.session.query(WorkflowRequest).filter_by(workflow_request_id=request_id).one()
return (
self.session.query(WorkflowRequest)
.filter_by(workflow_request_id=request_id)
.one()
)
def all_workflows(self) -> List[Workflow]:
return self.session.query(Workflow).all()
......@@ -593,7 +615,7 @@ class WorkflowInfo(WorkflowInfoIF):
workflow_name=workflow.workflow_name,
argument=argument,
state=WorkflowRequestState.Created.name,
execution_id=execution_id
execution_id=execution_id,
)
self.save_request(request)
return request
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment