diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 31299d0550e251f32ec4746421aa32d65441ffce..c88b9b000d18a993421630f8ec95a22969d94c79 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -907,7 +907,11 @@ class WorkflowMessageHandler: def _process_message_for_request(self, message: Dict, request: WorkflowRequest): # update request record with new status if message["type"] == "workflow-executing": - status = WorkflowRequestState.Running.name + status = ( + WorkflowRequestState.Running.name + if self._verify_state_change(WorkflowRequestState.Running.name) + else None + ) # on initiation, save the HTC id for this workflow: if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): @@ -922,7 +926,11 @@ class WorkflowMessageHandler: self.messenger.send_message(**iterations_msg) elif message["type"] == "workflow-updated": - status = WorkflowRequestState.Running.name + status = ( + WorkflowRequestState.Running.name + if self._verify_state_change(WorkflowRequestState.Running.name) + else None + ) # update workflow_progress table stage_info = self._update_workflow_progress(request, message) # message relevant control system @@ -939,7 +947,11 @@ class WorkflowMessageHandler: self._post_workflow_cleanup(message, request, "failed") elif message["type"] == "workflow-continuing": - status = WorkflowRequestState.Running.name + status = ( + WorkflowRequestState.Running.name + if self._verify_state_change(WorkflowRequestState.Running.name) + else None + ) elif message["type"] in ("update-wf-metadata", "ingestion-failed"): # no action, keep existing state: @@ -1012,7 +1024,8 @@ class WorkflowMessageHandler: # update request record with new status try: - request.update_status(status) + if status is not None: + request.update_status(status) transaction.commit() except Exception as exc: transaction.abort() @@ -1020,6 +1033,21 @@ class WorkflowMessageHandler: f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}" ) + @staticmethod + def _verify_state_change(new_state: str) -> bool: + """ + Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set. + + :param new_state: state change to verify before persisting + :return: boolean + """ + final_list = [ + WorkflowRequestState.Error.name, + WorkflowRequestState.Complete.name, + WorkflowRequestState.Failed.name, + ] + return new_state not in final_list + @staticmethod def _complete_no_cleanup(message: Dict): no_cleanup_messages = ["delivery", "ingestion-complete"]