Skip to content
Snippets Groups Projects

make sure finished workflows stay finished

Merged Charlotte Hausman requested to merge fix_states into main
1 file
+ 21
4
Compare changes
  • Side-by-side
  • Inline
@@ -907,7 +907,7 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status
if message["type"] == "workflow-executing":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
@@ -922,7 +922,7 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# update workflow_progress table
stage_info = self._update_workflow_progress(request, message)
# message relevant control system
@@ -939,7 +939,7 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing":
status = WorkflowRequestState.Running.name
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state:
@@ -1012,7 +1012,8 @@ class WorkflowMessageHandler:
# update request record with new status
try:
request.update_status(status)
if status is not None:
request.update_status(status)
transaction.commit()
except Exception as exc:
transaction.abort()
@@ -1020,6 +1021,22 @@ class WorkflowMessageHandler:
f"Failed to update status on workflow request {request.workflow_request_id} to {status}: {exc}"
)
@staticmethod
def _verify_state_change(current_state: str) -> bool:
"""
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param current_state: Verify this request in not in a final state before persisting a state change
:return: boolean
"""
final_list = [
WorkflowRequestState.Error.name,
WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name,
]
return current_state not in final_list
@staticmethod
def _complete_no_cleanup(message: Dict):
no_cleanup_messages = ["delivery", "ingestion-complete"]
Loading