Skip to content
Snippets Groups Projects
Commit 0013d1b0 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

found and fixed two types of common message handling errors from when...

found and fixed two types of common message handling errors from when workflows are not attached to capability requests
parent 714e4c4c
No related branches found
No related tags found
1 merge request!1611found and fixed two types of common message handling errors
Pipeline #14859 passed
......@@ -88,6 +88,8 @@ class CapabilityService(CapabilityServiceIF):
@on_message(type="execution-failed")
def fail_request(self, **message: Dict):
logger.info(f"RECEIVED EXECUTION-FAILED: {message}")
cap_request_id = None
cap_version_number = None
if message["type"] == "workflow-aborted":
workflow_req = message["subject"]
......@@ -104,18 +106,20 @@ class CapabilityService(CapabilityServiceIF):
cap_request_id = execution["capability_request_id"]
cap_version_number = execution["version_number"]
capability_request = self.capability_info.lookup_capability_request(cap_request_id)
version = self.capability_info.lookup_version(capability_request.id, cap_version_number)
# if there isn't a capability version or request, don't try to run anything
if cap_request_id and cap_version_number:
capability_request = self.capability_info.lookup_capability_request(cap_request_id)
version = self.capability_info.lookup_version(capability_request.id, cap_version_number)
# Set version state to Failed
version.state = CapabilityVersionState.Failed.name
self.capability_info.save_entity(version)
# Set version state to Failed
version.state = CapabilityVersionState.Failed.name
self.capability_info.save_entity(version)
# Update request state
capability_request.determine_state()
self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect(version=version).compose_message("capability_failed")
self.messenger.send_message(**capability_complete_msg)
# Update request state
capability_request.determine_state()
self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect(version=version).compose_message("capability_failed")
self.messenger.send_message(**capability_complete_msg)
@on_message(type="capability-cancelled")
def cancel_request(self, **message: Dict):
......@@ -292,28 +296,30 @@ class CapabilityService(CapabilityServiceIF):
# Since we care, we should look up the execution for this
workflow_request_id = msg_subject["workflow_request_id"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(workflow_request_id)
version = execution.version
# Pull out the metadata, or make a blank dictionary because we're about to add to it
# sqlalchemy does not detect in-place mutations of JSON
# https://docs.sqlalchemy.org/en/14/core/type_basics.html#sqlalchemy.types.JSON
# to get past this we use a deepcopy of version.workflow_metadata to create a "new" JSON obj
metadata = copy.deepcopy(version.workflow_metadata) if version.workflow_metadata else {}
# Figure out which message "type" this is (Started or Ended)
msg_type = parse_log[1].split("at", 1)[0].strip()
# retrieve the timestamp
timestamp = message["condor_metadata"]["timestamp"]
# record it
metadata[f"fetch_{'start' if msg_type == 'Started' else 'end'}_time"] = timestamp
logger.info("Updating FETCH times in metadata %s on capability version %s", metadata, version)
# update the metadata (this is a no-op unless we just created the dictionary)
version.workflow_metadata = metadata
# save it
self.capability_info.save_version(version)
# if this workflow is not attached to an execution, don't do anything
if execution:
version = execution.version
# Pull out the metadata, or make a blank dictionary because we're about to add to it
# sqlalchemy does not detect in-place mutations of JSON
# https://docs.sqlalchemy.org/en/14/core/type_basics.html#sqlalchemy.types.JSON
# to get past this we use a deepcopy of version.workflow_metadata to create a "new" JSON obj
metadata = copy.deepcopy(version.workflow_metadata) if version.workflow_metadata else {}
# Figure out which message "type" this is (Started or Ended)
msg_type = parse_log[1].split("at", 1)[0].strip()
# retrieve the timestamp
timestamp = message["condor_metadata"]["timestamp"]
# record it
metadata[f"fetch_{'start' if msg_type == 'Started' else 'end'}_time"] = timestamp
logger.info("Updating FETCH times in metadata %s on capability version %s", metadata, version)
# update the metadata (this is a no-op unless we just created the dictionary)
version.workflow_metadata = metadata
# save it
self.capability_info.save_version(version)
class CapabilityLauncher:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment