Skip to content
Snippets Groups Projects
Commit 49d7f151 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Refactored `ExecutionManager.execution_from_message`

parent 55d688bb
No related branches found
No related tags found
1 merge request!731Refactored `ExecutionManager.execution_from_message`
Pipeline #4075 passed
......@@ -190,6 +190,42 @@ class ExecutionManager(ExecutionManagerIF):
transaction.get().addAfterCommitHook(execute_workflow_request, (workflow_request,))
def _execution_from_child_message(self, **message: Dict) -> Optional[CapabilityExecution]:
"""
Extract a CapabilityExecution from a message sent from a child workflow request
:param message: Message sent from a child workflow request
:return: CapabilityExecution if one can be located for child request; else None
"""
msg_subject = message.get("subject")
if msg_subject.get("execution_wf_id"):
parent_wf_request_id = msg_subject.get("execution_wf_id")
elif message["argument"].get("parent_wf_request_id"):
parent_wf_request_id = message["argument"].get("parent_wf_request_id")
if parent_wf_request_id:
# Workflow request has a parent_wf_request_id; use that as the request ID instead of the true ID
logger.info("Trying to find an execution with parent_wf_request_id=%s", parent_wf_request_id)
return self.capability_info.lookup_execution_by_workflow_request_id(parent_wf_request_id)
return None
def _execution_from_qa_message(self, **message: Dict) -> Optional[CapabilityExecution]:
"""
Extract a CapabilityExecution from a QA message
:param message: QA message
:return: CapabilityExecution if one can be located; else None
"""
msg_subject = message.get("subject")
version_number = message["capability_version"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(msg_subject["workflow_request_id"])
if version_number != execution.version_number:
logger.error("ERROR: Specified version does not match retrieved execution.")
return None
else:
return execution
def execution_from_message(self, **message: Dict) -> Optional[CapabilityExecution]:
"""
Extract a CapabilityExecution from a message
......@@ -197,41 +233,30 @@ class ExecutionManager(ExecutionManagerIF):
:param message: Message
:return: CapabilityExecution if ID is found in message; else None
"""
msg_subject = message.get("subject")
msg_type = msg_subject.get("type")
# first, let's check for a capability execution ID
if message["subject"]["type"] == "CapabilityExecution":
if msg_type == "CapabilityExecution":
# if we found one, we're basically done
return self.capability_info.lookup_execution(message["subject"]["id"])
elif message["subject"]["type"] == "CapabilityRequest":
return self.capability_info.lookup_execution(message["subject"]["current_execution"]["id"])
return self.capability_info.lookup_execution(msg_subject["id"])
elif msg_type == "CapabilityRequest":
return self.capability_info.lookup_execution(msg_subject["current_execution"]["id"])
elif message["service"] == "workflow":
# OK, the message is from the workflow layer
# Look for a parent_wf_request_id first
if message["subject"]:
if parent_workflow_request_id := message["subject"].get("execution_wf_id"):
# Workflow request has a parent_wf_request_id; use that as the request ID instead of the true ID
logger.info("Trying to find an execution with parent_wf_request_id=%s", parent_workflow_request_id)
return self.capability_info.lookup_execution_by_workflow_request_id(parent_workflow_request_id)
# No parent ID found, so simply use the request's own workflow request ID
logger.info(
"Trying to find an execution with workflow_request_id=%s", message["subject"]["workflow_request_id"]
)
if "qa" in message["type"]:
version_number = message["capability_version"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(
message["subject"]["workflow_request_id"]
)
if version_number != execution.version_number:
logger.error("ERROR: Specified version does not match retrieved execution.")
return None
else:
if msg_subject:
if execution := self._execution_from_child_message(**message):
return execution
return self.capability_info.lookup_execution_by_workflow_request_id(
message["subject"]["workflow_request_id"]
)
else:
logger.error("Error: unable to ascertain the capability execution for message %s", message)
return None
# No parent ID found, so simply use the request's own workflow request ID
logger.info("Trying to find an execution with workflow_request_id=%s", msg_subject["workflow_request_id"])
if "qa" in msg_type:
self._execution_from_qa_message(**message)
return self.capability_info.lookup_execution_by_workflow_request_id(msg_subject["workflow_request_id"])
logger.error("Error: unable to ascertain the capability execution for message %s", message)
return None
def _complete_execution(self, execution: CapabilityExecution) -> CapabilityExecution:
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment