Skip to content
Snippets Groups Projects

make sure finished workflows stay finished

Merged Charlotte Hausman requested to merge fix_states into main
1 file
+ 7
18
Compare changes
  • Side-by-side
  • Inline
@@ -907,11 +907,7 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status
if message["type"] == "workflow-executing":
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
@@ -926,11 +922,7 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated":
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
# update workflow_progress table
stage_info = self._update_workflow_progress(request, message)
# message relevant control system
@@ -947,11 +939,7 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing":
status = (
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state:
@@ -1034,11 +1022,11 @@ class WorkflowMessageHandler:
)
@staticmethod
def _verify_state_change(new_state: str) -> bool:
def _verify_state_change(current_state: str) -> bool:
"""
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param new_state: state change to verify before persisting
:param current_state: Verify this request in not in a final state before persisting a state change
:return: boolean
"""
final_list = [
@@ -1046,7 +1034,8 @@ class WorkflowMessageHandler:
WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name,
]
return new_state not in final_list
return current_state not in final_list
@staticmethod
def _complete_no_cleanup(message: Dict):
Loading