From 5c7b59d70fcafeeb9ae3b934a30ce4f4e9937c9c Mon Sep 17 00:00:00 2001 From: Sam Kagan <skagan@nrao.edu> Date: Mon, 22 Jul 2024 11:44:18 -0600 Subject: [PATCH] Updated cap. service to send messages to AAT Only on workflow-update, capability-cancelled, delivery, and capability-failed messages, and only if the CR's controller is AAT --- .../capability/message_architect.py | 100 ++++++++++++++++- .../workspaces/capability/schema.py | 4 +- .../capability/services/capability_service.py | 103 ++++++++++++------ 3 files changed, 168 insertions(+), 39 deletions(-) diff --git a/shared/workspaces/workspaces/capability/message_architect.py b/shared/workspaces/workspaces/capability/message_architect.py index 33ab79a07..7ba3fcfc3 100644 --- a/shared/workspaces/workspaces/capability/message_architect.py +++ b/shared/workspaces/workspaces/capability/message_architect.py @@ -24,6 +24,7 @@ import logging from immutable_views import DictView +from workspaces.capability.schema import CapabilityRequest from workspaces.shared_interfaces import MessageArchitectIF logger = logging.getLogger(__name__) @@ -40,10 +41,39 @@ service_templates = DictView( "routing_key": "capability", "subject": None, "type": None, - } + }, + "archive": { + "service": "archive", + "routing_key": None, + "subject": None, + }, } ) +# archive messages that require additional information +archive_msg_templates = DictView( + { + "restore_cms_complete": { + "application": "capability", + "message": "restore_cms complete", + "delivery_url": None, + "delivery_path": None, + "token": None, + }, + "restore_cms_failed": { + "application": "capability", + "message": "restore_cms failed", + }, + "restore_cms_cancelled": {"application": "capability", "message": "capability cancelled"}, + "update": { + "application": "capability", + "message": "capability updated", + "stage": None, + "start": None, + "stop": None, + }, + } +) # messages that require additional information, currently unneeded # capability_msg_templates = DictView( # { @@ -66,13 +96,13 @@ class CapabilityMessageArchitect(MessageArchitectIF): self.subject = execution or request or version @staticmethod - def get_message_template(msg_type: str) -> dict: + def get_message_template(msg_type: str) -> DictView: # If messages need additional information outside of the basic template, # uncomment this and add to relevant DictView # if msg_type in capability_msg_templates: # return {**service_templates["capability"], **capability_msg_templates[msg_type]} # else: - return service_templates["capability"] + return DictView(service_templates["capability"]) def compose_message(self, msg_type: str) -> DictView: template = self.get_message_template(msg_type) @@ -81,3 +111,67 @@ class CapabilityMessageArchitect(MessageArchitectIF): template["subject"] = self.subject template["type"] = msg_type.replace("_", "-") return DictView(template) + + +class ArchiveMessageArchitect(MessageArchitectIF): + """Builds archive system AMQP messages""" + + def __init__( + self, + request: CapabilityRequest, + routing_key: str = None, + delivery_info: dict = None, + stage_info: dict = None, + ): + """ + Initialize Architect + + :param routing_key: AMQP routing key to use + :param request: The capability request this message concerns + """ + self.routing_key = routing_key if routing_key is not None else self._make_archive_routing_key(request) + self.request = request + self.delivery = delivery_info + self.stage_info = stage_info + + @staticmethod + def _make_archive_routing_key(cr: CapabilityRequest) -> str: + return f"{cr.controller.routing_key_prefix}{cr.capability_name}_{cr.id}" + + @staticmethod + def get_message_template(msg_type: str) -> DictView: + """ + Construct the message template based on type + + :param msg_type: type of message to construct + :return: full message template dictionary for rendering + """ + + if "update" in msg_type: + return DictView({**service_templates["archive"], **archive_msg_templates["update"]}) + else: + return DictView({**service_templates["archive"], **archive_msg_templates[msg_type]}) + + def compose_message(self, msg_type: str) -> DictView: + """ + Render message template for requested type + + :param msg_type: the type of message to send + :return: the rendered message ready for sending + """ + template = self.get_message_template(msg_type) + template = template.copy() + template["subject"] = f"WS capability request #{self.request.id}" + template["routing_key"] = self.routing_key + + if self.delivery is not None: + template["download_url"] = self.delivery["url"] + template["delivery_path"] = self.delivery["delivered_to"] + template["token"] = self.delivery["token"] + + if self.stage_info is not None and "update" in msg_type: + template["stage"] = self.stage_info["stage"] + template["start"] = self.stage_info["start"] + template["stop"] = self.stage_info["stop"] + + return DictView(template) diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index c4107e76c..c9126be3f 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -27,7 +27,7 @@ import pathlib import re import subprocess from abc import abstractmethod -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import arrow import chevron @@ -1491,7 +1491,7 @@ class CapabilityVersion(JSONSerializable): capability_request: CapabilityRequest version_number: int parameters: List[ParameterIF] - workflow_metadata: str # JSON-formatted + workflow_metadata: dict[str, Any] # JSON-formatted files: List[CapabilityVersionFile] __tablename__ = "capability_versions" diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 496ee5a33..a42cbb452 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -27,8 +27,15 @@ from messaging.router import Router, on_message from workspaces.capability.enums import CapabilityVersionState from workspaces.capability.helpers import Parameter -from workspaces.capability.message_architect import CapabilityMessageArchitect -from workspaces.capability.schema import CapabilityExecution, CapabilityVersion +from workspaces.capability.message_architect import ( + ArchiveMessageArchitect, + CapabilityMessageArchitect, +) +from workspaces.capability.schema import ( + CapabilityExecution, + CapabilityRequest, + CapabilityVersion, +) from workspaces.capability.services.capability_info import CapabilityInfo from workspaces.capability.services.execution_manager import ExecutionManager from workspaces.capability.services.interfaces import CapabilityServiceIF @@ -52,6 +59,7 @@ class CapabilityService(CapabilityServiceIF): notification_service: NotificationServiceIF, ): self.messenger = MessageSender("capability") + self.archive_messenger = MessageSender("archive") self.router = Router("capability") self.router.register(self) self.execution_manager = ExecutionManager( @@ -138,6 +146,13 @@ class CapabilityService(CapabilityServiceIF): capability_request.sealed = True self.capability_info.save_entity(capability_request) + if capability_request.controller.message_handler == "archive": + # Send failure message to AAT + msg_arch = ArchiveMessageArchitect(capability_request) + self.archive_messenger.send_message( + **msg_arch.compose_message(f"{capability_request.capability_name}_cancelled") + ) + @on_message(type="execution-error") def error_request(self, **message: Dict): logger.info(f"RECEIVED EXECUTION-ERROR: {message}") @@ -205,6 +220,11 @@ class CapabilityService(CapabilityServiceIF): execution.delivery_url = delivery["url"] + "/" if "url" in delivery else None execution.delivery_path = delivery["delivered_to"] if "delivered_to" in delivery else None + cr: CapabilityRequest = execution.capability_request + if cr.controller.message_handler == "archive": + msg_arch = ArchiveMessageArchitect(cr, delivery_info=delivery) + self.archive_messenger.send_message(**msg_arch.compose_message(f"{cr.capability_name}_complete")) + @on_message(type="capability-submitted") def execute_submitted_capability(self, **message: Dict): logger.info(f"Executing capability: {message}") @@ -276,6 +296,13 @@ class CapabilityService(CapabilityServiceIF): :param message: the message we received """ + msg_subject = message["subject"] + if msg_subject["type"] == "CapabilityVersion" and message["type"] == "capability-failed": + capability_request_id = msg_subject["capability_request_id"] + cr = self.capability_info.lookup_capability_request(capability_request_id) + if cr.controller.message_handler == "archive": + msg_arch = ArchiveMessageArchitect(cr) + self.archive_messenger.send_message(**msg_arch.compose_message(f"{cr.capability_name}_failed")) logger.debug(f"Received {message['type']} (no action taken): {message}") @on_message(service="workflow", type="workflow-updated") @@ -285,41 +312,49 @@ class CapabilityService(CapabilityServiceIF): msg_subject = message["subject"] if msg_subject["type"] == "WorkflowRequest": # Figure out what stage we're discussing—that will determine whether we care about the log message - parse_log = message["condor_metadata"]["log"].split("Stage", 1)[1].split("has", 1) + parse_log: list[str] = message["condor_metadata"]["log"].split("Stage", 1)[1].split("has", 1) stage = parse_log[0].strip() logger.debug("Got an update for stage %s", stage) - # If it's the FETCH stage, we care - if stage == "FETCH": - logger.debug("Updating the fetch start or stop time") - - # Since we care, we should look up the execution for this - workflow_request_id = msg_subject["workflow_request_id"] - execution = self.capability_info.lookup_execution_by_workflow_request_id(workflow_request_id) - # if this workflow is not attached to an execution, don't do anything - if execution: - version = execution.version - - # Pull out the metadata, or make a blank dictionary because we're about to add to it - # sqlalchemy does not detect in-place mutations of JSON - # https://docs.sqlalchemy.org/en/14/core/type_basics.html#sqlalchemy.types.JSON - # to get past this we use a deepcopy of version.workflow_metadata to create a "new" JSON obj - metadata = copy.deepcopy(version.workflow_metadata) if version.workflow_metadata else {} - - # Figure out which message "type" this is (Started or Ended) - msg_type = parse_log[1].split("at", 1)[0].strip() - - # retrieve the timestamp - timestamp = message["condor_metadata"]["timestamp"] - # record it - metadata[f"fetch_{'start' if msg_type == 'Started' else 'end'}_time"] = timestamp - - logger.info("Updating FETCH times in metadata %s on capability version %s", metadata, version) - # update the metadata (this is a no-op unless we just created the dictionary) - version.workflow_metadata = metadata - - # save it - self.capability_info.save_version(version) + logger.debug(f"Updating the {stage} start or stop time") + + # Since we care, we should look up the execution for this + workflow_request_id = msg_subject["workflow_request_id"] + execution = self.capability_info.lookup_execution_by_workflow_request_id(workflow_request_id) + # if this workflow is not attached to an execution, don't do anything + if execution: + version = execution.version + + # Pull out the metadata, or make a blank dictionary because we're about to add to it + # sqlalchemy does not detect in-place mutations of JSON + # https://docs.sqlalchemy.org/en/14/core/type_basics.html#sqlalchemy.types.JSON + # to get past this we use a deepcopy of version.workflow_metadata to create a "new" JSON obj + metadata = copy.deepcopy(version.workflow_metadata) if version.workflow_metadata else {} + + # Figure out which message "type" this is (Started or Ended) + msg_type = parse_log[1].split("at", 1)[0].strip() + + # retrieve the timestamp + timestamp = message["condor_metadata"]["timestamp"] + # record it + metadata[f"{stage}_{'start' if msg_type == 'Started' else 'end'}_time"] = timestamp + + logger.info(f"Updating {stage} times in metadata %s on capability version %s", metadata, version) + # update the metadata (this is a no-op unless we just created the dictionary) + version.workflow_metadata = metadata + + # save it + self.capability_info.save_version(version) + if version.capability_request.controller.message_handler == "archive": + stage_info = { + "stage": stage, + "start": metadata.get(f"{stage}_start_time"), + "stop": metadata.get(f"{stage}_end_time"), + } + cr = version.capability_request + # Send message to AAT + msg = ArchiveMessageArchitect(cr, stage_info=stage_info).compose_message("update") + self.archive_messenger.send_message(**msg) class CapabilityLauncher: -- GitLab