Skip to content
Snippets Groups Projects
Commit 0123cd08 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

ok, now this should be right

parent 2f22b75d
No related branches found
No related tags found
1 merge request!1319make sure finished workflows stay finished
Pipeline #9540 passed
...@@ -907,11 +907,7 @@ class WorkflowMessageHandler: ...@@ -907,11 +907,7 @@ class WorkflowMessageHandler:
def _process_message_for_request(self, message: Dict, request: WorkflowRequest): def _process_message_for_request(self, message: Dict, request: WorkflowRequest):
# update request record with new status # update request record with new status
if message["type"] == "workflow-executing": if message["type"] == "workflow-executing":
status = ( status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# on initiation, save the HTC id for this workflow: # on initiation, save the HTC id for this workflow:
if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
...@@ -926,11 +922,7 @@ class WorkflowMessageHandler: ...@@ -926,11 +922,7 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg) self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-updated": elif message["type"] == "workflow-updated":
status = ( status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
# update workflow_progress table # update workflow_progress table
stage_info = self._update_workflow_progress(request, message) stage_info = self._update_workflow_progress(request, message)
# message relevant control system # message relevant control system
...@@ -947,11 +939,7 @@ class WorkflowMessageHandler: ...@@ -947,11 +939,7 @@ class WorkflowMessageHandler:
self._post_workflow_cleanup(message, request, "failed") self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-continuing": elif message["type"] == "workflow-continuing":
status = ( status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None
WorkflowRequestState.Running.name
if self._verify_state_change(WorkflowRequestState.Running.name)
else None
)
elif message["type"] in ("update-wf-metadata", "ingestion-failed"): elif message["type"] in ("update-wf-metadata", "ingestion-failed"):
# no action, keep existing state: # no action, keep existing state:
...@@ -1034,11 +1022,11 @@ class WorkflowMessageHandler: ...@@ -1034,11 +1022,11 @@ class WorkflowMessageHandler:
) )
@staticmethod @staticmethod
def _verify_state_change(new_state: str) -> bool: def _verify_state_change(current_state: str) -> bool:
""" """
Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set. Sometimes messages come in out of order. Make sure Complete/Failed/Error states don't change once set.
:param new_state: state change to verify before persisting :param current_state: Verify this request in not in a final state before persisting a state change
:return: boolean :return: boolean
""" """
final_list = [ final_list = [
...@@ -1046,7 +1034,8 @@ class WorkflowMessageHandler: ...@@ -1046,7 +1034,8 @@ class WorkflowMessageHandler:
WorkflowRequestState.Complete.name, WorkflowRequestState.Complete.name,
WorkflowRequestState.Failed.name, WorkflowRequestState.Failed.name,
] ]
return new_state not in final_list
return current_state not in final_list
@staticmethod @staticmethod
def _complete_no_cleanup(message: Dict): def _complete_no_cleanup(message: Dict):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment