Skip to content
Snippets Groups Projects

make sure finished workflows stay finished

Merged Charlotte Hausman requested to merge fix_states into main
1 file
+ 32
4
Compare changes
  • Side-by-side
  • Inline
@@ -907,7 +907,11 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status
if message["type"] == "workflow-executing":
status = WorkflowRequestState.Running.name
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
@@ -922,7 +926,11 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated":
status = WorkflowRequestState.Running.name
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# update workflow_progress table
stage_info = self._update_workflow_progress(request, message)
# message relevant control system
@@ -939,7 +947,11 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing":
status = WorkflowRequestState.Running.name
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state:
@@ -1012,7 +1024,8 @@ class WorkflowMessageHandler:
# update request record with new status
try:
request.update_status(status)
if status is not None:
request.update_status(status)
transaction.commit()
except Exception as exc:
transaction.abort()
@@ -1020,6 +1033,21 @@ class WorkflowMessageHandler:
f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}"
)
@staticmethod
def _verify_state_change(new_state: str) -> bool:
"""
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param new_state: state change to verify before persisting
:return: boolean
"""
final_list = [
WorkflowRequestState.Error.name,
WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name,
]
return new_state not in final_list
@staticmethod
def _complete_no_cleanup(message: Dict):
no_cleanup_messages = ["delivery", "ingestion-complete"]
Loading