Skip to content
Snippets Groups Projects
Commit b0822517 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

ensure operations are on the request's current data

parent 49c655e3
No related branches found
No related tags found
3 merge requests!1605Merge 2.8.2.3 work to main,!1581catch up with 2.8.2.3,!1578Try fixing condor id issues
...@@ -276,9 +276,6 @@ class WorkflowRequest(JSONSerializable): ...@@ -276,9 +276,6 @@ class WorkflowRequest(JSONSerializable):
def update_cleaned(self, cleaned: bool): def update_cleaned(self, cleaned: bool):
self.cleaned = cleaned self.cleaned = cleaned
def update_htcondor_job_id(self, job_id: int | None):
self.htcondor_job_id = job_id
def __getitem__(self, item): def __getitem__(self, item):
return self.files return self.files
......
...@@ -180,3 +180,12 @@ class WorkflowInfoIF(ABC): ...@@ -180,3 +180,12 @@ class WorkflowInfoIF(ABC):
:return: :return:
""" """
pass pass
def refresh_request(self, request: WorkflowRequest) -> WorkflowRequest:
"""
Refresh an in use request object with current data
:param request: the request to be refreshed
:return: the updated WorkflowRequest object
"""
pass
...@@ -280,3 +280,13 @@ class WorkflowInfo(WorkflowInfoIF): ...@@ -280,3 +280,13 @@ class WorkflowInfo(WorkflowInfoIF):
""" """
request.update_cleaned(update_flag) request.update_cleaned(update_flag)
self.save_request(request) self.save_request(request)
def refresh_request(self, request: WorkflowRequest) -> WorkflowRequest:
"""
Ensure we are operating on an up-to-date object
:return: updated WorkflowRequest object
"""
request = self.session.merge(request)
return request
...@@ -944,10 +944,12 @@ class WorkflowMessageHandler: ...@@ -944,10 +944,12 @@ class WorkflowMessageHandler:
self.send_external_event("update", **tack_on) self.send_external_event("update", **tack_on)
elif message["type"] == "workflow-failed": elif message["type"] == "workflow-failed":
request = self.info.refresh_request(request)
status = WorkflowRequestState.Error.name status = WorkflowRequestState.Error.name
self._post_workflow_cleanup(message, request, "failed") self._post_workflow_cleanup(message, request, "failed")
elif message["type"] == "workflow-aborted": elif message["type"] == "workflow-aborted":
request = self.info.refresh_request(request)
status = WorkflowRequestState.Failed.name status = WorkflowRequestState.Failed.name
self._post_workflow_cleanup(message, request, "failed") self._post_workflow_cleanup(message, request, "failed")
...@@ -1009,6 +1011,7 @@ class WorkflowMessageHandler: ...@@ -1009,6 +1011,7 @@ class WorkflowMessageHandler:
self.messenger.send_message(**iterations_msg) self.messenger.send_message(**iterations_msg)
elif message["type"] == "workflow-complete": elif message["type"] == "workflow-complete":
request = self.info.refresh_request(request)
status = WorkflowRequestState.Complete.name status = WorkflowRequestState.Complete.name
self._post_workflow_cleanup(message, request, "complete") self._post_workflow_cleanup(message, request, "complete")
...@@ -1180,21 +1183,19 @@ class WorkflowMessageHandler: ...@@ -1180,21 +1183,19 @@ class WorkflowMessageHandler:
result = injector.clear_subspace() result = injector.clear_subspace()
if result is False: if result is False:
# the processing directory somehow disappeared, mark as cleaned to avoid further errors # the processing directory somehow disappeared, mark as cleaned to avoid further errors
request.update_cleaned(True) request.cleaned = True
def clean_workflow(self, request: WorkflowRequest): @staticmethod
def clean_workflow(request: WorkflowRequest):
if request.htcondor_job_id is not None: if request.htcondor_job_id is not None:
# Request has a set condor_job_id; unset it # Request has a set condor_job_id; unset it
logger.info(f"Workflow #{request.workflow_request_id} finished; unsetting HTCondor ID") logger.info(f"Workflow #{request.workflow_request_id} finished; unsetting HTCondor ID")
request.update_htcondor_job_id(None) request.htcondor_job_id = None
if request.controller == "VLASS" and "vlass_" in request.results_dir: if request.controller == "VLASS" and "vlass_" in request.results_dir:
# request is initiated and controlled by VLASS Manager cleanup system. # request is initiated and controlled by VLASS Manager cleanup system.
# Mark request as cleaned to bypass WS Annihilator and prevent clogging the logs # Mark request as cleaned to bypass WS Annihilator and prevent clogging the logs
request.update_cleaned(True) request.cleaned = True
# let's try forcing save here too...
self.info.save_request(request)
@staticmethod @staticmethod
def write_vlass_result_file(wf_request: WorkflowRequest, message: dict): def write_vlass_result_file(wf_request: WorkflowRequest, message: dict):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment