Skip to content
Snippets Groups Projects
Commit 49fcd1db authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

make sure finished workflows stay finished

parent 8b8b7e7a
No related branches found
Tags end-of-sprint-46
2 merge requests!1390Catch up with Main,!1319make sure finished workflows stay finished
Pipeline #9541 passed
......@@ -907,7 +907,7 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status
if message["type"] == "workflow-executing":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
......@@ -922,7 +922,7 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# update workflow_progress table
stage_info = self._update_workflow_progress(request, message)
# message relevant control system
......@@ -939,7 +939,7 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state:
......@@ -1012,7 +1012,8 @@ class WorkflowMessageHandler:
# update request record with new status
try:
request.update_status(status)
if status is not None:
request.update_status(status)
transaction.commit()
except Exception as exc:
transaction.abort()
......@@ -1020,6 +1021,22 @@ class WorkflowMessageHandler:
f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}"
)
@staticmethod
def _verify_state_change(current_state: str) -> bool:
"""
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param current_state: Verify this request in not in a final state before persisting a state change
:return: boolean
"""
final_list = [
WorkflowRequestState.Error.name,
WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name,
]
return current_state not in final_list
@staticmethod
def _complete_no_cleanup(message: Dict):
no_cleanup_messages = ["delivery", "ingestion-complete"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment