diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index 0a57049fb55fc7e41c618174af6448e64764e429..b8a03c1f5215145f1ed16c97be560bb0db03dccd 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -46,7 +46,7 @@ class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]): ) def routing_key_for(self, event: WorkflowEvent) -> str: - return f"{event.job_name}.{event.job_id}.{event.type.name.lower()}" + return f"{event.job_name}.{event.workflow_request_id}.{event.type.name.lower()}" def exchange(self) -> str: return self.EXCHANGE diff --git a/shared/workspaces/src/workspaces/capability_interfaces.py b/shared/workspaces/src/workspaces/capability_interfaces.py index 2a2fbdc7f1f240650d2fecfea0917aa73d0eea8a..e6a231b46bc2eaaeb377fdd3dfa8bf90284bfd50 100644 --- a/shared/workspaces/src/workspaces/capability_interfaces.py +++ b/shared/workspaces/src/workspaces/capability_interfaces.py @@ -111,9 +111,16 @@ class CapabilityInfoIF(ABC): def lookup_capability(self, capability_name: str) -> CapabilityIF: raise NotImplementedError + @abstractmethod def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: raise NotImplementedError + @abstractmethod + def lookup_execution_by_workflow_request_id( + self, workflow_request_id: int + ) -> CapabilityExecutionIF: + raise NotImplementedError + class ProjectSettingsIF(ABC): """ diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 0cb7657aec4998cf50c6510a51d8067260dce65a..db605f51e2bd9bd39bb19fb7a9ede89b47a619bf 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -190,11 +190,13 @@ class CapabilityService(CapabilityServiceIF): # No corresponding capability event return None - self.capability_info.lookup_execution_by_workflow_request_id( + execution = self.capability_info.lookup_execution_by_workflow_request_id( event.workflow_request_id ) - return CapabilityEvent(event_type, execution_id) + return CapabilityEvent( + event_type, execution.capability_request_id, execution.id + ) def listen_for_events(self): """ @@ -286,7 +288,9 @@ class CapabilityInfo(CapabilityInfoIF): self.session.add(entity) self.session.flush() - def lookup_execution_by_workflow_request_id(self, workflow_request_id: int): + def lookup_execution_by_workflow_request_id( + self, workflow_request_id: int + ) -> CapabilityExecution: return ( self.session.query(CapabilityExecution) .filter_by(workflow_request_id=workflow_request_id) @@ -473,10 +477,14 @@ class WorkflowService(WorkflowServiceIF): # execution evolves. So we have to set up listening at some point # in this class def __init__(self, info: "WorkflowInfo"): - # 1. Start listening for events from the wf_monitor stream - # self.channel = workflow_events.listen(self.on_workflow_event) self.info = info + # Start listening for events from the wf_monitor stream + self.listener = threading.Thread( + target=lambda: workflow_events.listen(self.on_workflow_event) + ) + self.listener.start() + def execute(self, request: WorkflowRequest, files: List[Path] = None): """ Execute a workflow per the supplied parameters. @@ -497,16 +505,11 @@ class WorkflowService(WorkflowServiceIF): # 5. execute condor and retrieve log file log_file = self._execute_prepared(temp_folder) - # 6. start reading log file - monitor = WorkflowMonitor( - logfile_path=(log_file), workflow_request_id=request.workflow_request_id + # 6. start reading the log file (background it) + subprocess.Popen( + ["wf_monitor", log_file, str(request.workflow_request_id)], + cwd=str(temp_folder.absolute()), ) - # parse events from log - events = monitor.events - with workflow_events: - for e in events: - # send amqp event and update database - self.on_workflow_event(e, request, temp_folder) @staticmethod def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path: @@ -568,21 +571,8 @@ class WorkflowService(WorkflowServiceIF): # return the logfile return logfile - @staticmethod - def on_workflow_event( - event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path - ): - # 1. log that we received this event, somehow - # 2. update the WorkflowRequest record with the state we got - # 3. do per-event-type stuff, such as level change events, database - # updates, and logging - - decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")( - workflow_events.send - ) - - # 1. send amqp event to workflow channel - decorated_workflow_send(event) + def on_workflow_event(self, event: WorkflowEvent): + request = self.info.lookup_workflow_request(event.workflow_request_id) status: str # 2. update request record with new status @@ -597,9 +587,9 @@ class WorkflowService(WorkflowServiceIF): status = WorkflowRequestState.Running.name print( - f"Updating state on workflow request {request_record.workflow_request_id} to {status}..." + f"Updating state on workflow request {request.workflow_request_id} to {status}..." ) - request_record.update_status(status) + request.update_status(status) class WorkflowInfo(WorkflowInfoIF):