Skip to content
Snippets Groups Projects
Commit 2f22b75d authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

make sure finished workflows stay finished

parent 8b8b7e7a
No related branches found
No related tags found
1 merge request!1319make sure finished workflows stay finished
Pipeline #9539 passed
...@@ -907,7 +907,11 @@ class WorkflowMessageHandler: ...@@ -907,7 +907,11 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest): def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status # update request record with new status
if message["type"] == "workflow-executing": if message["type"] == "workflow-executing":
status = WorkflowRequestState.Running.name status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# on initiation, save the HTC id for this workflow: # on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
...@@ -922,7 +926,11 @@ class WorkflowMessageHandler: ...@@ -922,7 +926,11 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg) self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated": elif message["type"] == "workflow-updated":
status = WorkflowRequestState.Running.name status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# update workflow_progress table # update workflow_progress table
stage_info = self._update_workflow_progress(request, message) stage_info = self._update_workflow_progress(request, message)
# message relevant control system # message relevant control system
...@@ -939,7 +947,11 @@ class WorkflowMessageHandler: ...@@ -939,7 +947,11 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed") self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing": elif message["type"] == "workflow-continuing":
status = WorkflowRequestState.Running.name status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
elif message["type"] in ("update-wf-metadata", "ingestion-failed"): elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state: # no action, keep existing state:
...@@ -1012,7 +1024,8 @@ class WorkflowMessageHandler: ...@@ -1012,7 +1024,8 @@ class WorkflowMessageHandler:
# update request record with new status # update request record with new status
try: try:
request.update_status(status) if status is not None:
request.update_status(status)
transaction.commit() transaction.commit()
except Exception as exc: except Exception as exc:
transaction.abort() transaction.abort()
...@@ -1020,6 +1033,21 @@ class WorkflowMessageHandler: ...@@ -1020,6 +1033,21 @@ class WorkflowMessageHandler:
f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}" f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}"
) )
@staticmethod
def _verify_state_change(new_state: str) -> bool:
"""
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param new_state: state change to verify before persisting
:return: boolean
"""
final_list = [
WorkflowRequestState.Error.name,
WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name,
]
return new_state not in final_list
@staticmethod @staticmethod
def _complete_no_cleanup(message: Dict): def _complete_no_cleanup(message: Dict):
no_cleanup_messages = ["delivery", "ingestion-complete"] no_cleanup_messages = ["delivery", "ingestion-complete"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment