Skip to content
Snippets Groups Projects
Commit e27a99ec authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Replace workflow event monitor internal call with executable.

Fix another execution_id lookup.
Change workflow event listening to a thread.
parent 30ce1a17
No related branches found
No related tags found
No related merge requests found
......@@ -111,9 +111,16 @@ class CapabilityInfoIF(ABC):
def lookup_capability(self, capability_name: str) -> CapabilityIF:
raise NotImplementedError
@abstractmethod
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
class ProjectSettingsIF(ABC):
"""
......
......@@ -191,11 +191,13 @@ class CapabilityService(CapabilityServiceIF):
# No corresponding capability event
return None
self.capability_info.lookup_execution_by_workflow_request_id(
execution = self.capability_info.lookup_execution_by_workflow_request_id(
event.workflow_request_id
)
return CapabilityEvent(event_type, execution_id)
return CapabilityEvent(
event_type, execution.capability_request_id, execution.id
)
def listen_for_events(self):
"""
......@@ -287,7 +289,9 @@ class CapabilityInfo(CapabilityInfoIF):
self.session.add(entity)
self.session.flush()
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int):
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecution:
return (
self.session.query(CapabilityExecution)
.filter_by(workflow_request_id=workflow_request_id)
......@@ -461,10 +465,14 @@ class WorkflowService(WorkflowServiceIF):
# execution evolves. So we have to set up listening at some point
# in this class
def __init__(self, info: "WorkflowInfo"):
# 1. Start listening for events from the wf_monitor stream
# self.channel = workflow_events.listen(self.on_workflow_event)
self.info = info
# Start listening for events from the wf_monitor stream
self.listener = threading.Thread(
target=lambda: workflow_events.listen(self.on_workflow_event)
)
self.listener.start()
def execute(self, request: WorkflowRequest, files: List[Path] = None):
"""
Execute a workflow per the supplied parameters.
......@@ -485,14 +493,11 @@ class WorkflowService(WorkflowServiceIF):
# 5. execute condor and retrieve log file
log_file = self._execute_prepared(temp_folder)
# 6. start reading log file
monitor = WorkflowMonitor(logfile_path=str(log_file), workflow_request_id=request.workflow_request_id)
# parse events from log
events = monitor.events
with workflow_events:
for e in events:
# send amqp event and update database
self.on_workflow_event(e, request, temp_folder)
# 6. start reading the log file (background it)
subprocess.Popen(
["wf_monitor", log_file, str(request.workflow_request_id)],
cwd=str(temp_folder.absolute()),
)
@staticmethod
def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path:
......@@ -554,21 +559,8 @@ class WorkflowService(WorkflowServiceIF):
# return the logfile
return logfile
@staticmethod
def on_workflow_event(
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
# 1. send amqp event to workflow channel
decorated_workflow_send(event)
def on_workflow_event(self, event: WorkflowEvent):
request = self.info.lookup_workflow_request(event.workflow_request_id)
status: str
# 2. update request record with new status
......@@ -583,9 +575,9 @@ class WorkflowService(WorkflowServiceIF):
status = WorkflowRequestState.Running.name
print(
f"Updating state on workflow request {request_record.workflow_request_id} to {status}..."
f"Updating state on workflow request {request.workflow_request_id} to {status}..."
)
request_record.update_status(status)
request.update_status(status)
class WorkflowInfo(WorkflowInfoIF):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment