diff --git a/schema/versions/4354741bf742_ingestion_failed_notification_template.py b/schema/versions/4354741bf742_ingestion_failed_notification_template.py new file mode 100644 index 0000000000000000000000000000000000000000..0198ff1925a8dbf48a8bcd59b02484371febd2d6 --- /dev/null +++ b/schema/versions/4354741bf742_ingestion_failed_notification_template.py @@ -0,0 +1,38 @@ +"""Ingestion-failed notification template + +Revision ID: 4354741bf742 +Revises: edf4886e1d9c +Create Date: 2021-12-06 15:10:18.670008 + +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "4354741bf742" +down_revision = "edf4886e1d9c" +branch_labels = None +depends_on = None + + +def upgrade(): + ingestion_failed_notify_sql = """ + INSERT INTO notification_templates (name, description, template) VALUES ( + 'ingestion_failed_email', + 'An email template for notifying the DA list that an ingestion has failed', + 'To: {{destination_email}} +Subject: NRAO Workspaces - Ingestion failed for request #{{request_id}} + + +Dear DA, + +A {{capability_name}} request #{{request_id}} failed to be ingested into the archive. + +Best regards, +NRAO Workspaces' +) + """ + op.execute(ingestion_failed_notify_sql) + + +def downgrade(): + op.execute("DELETE FROM notification_templates WHERE name = 'ingestion_failed_email'") diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index cedc9aaaad4c4ced4f313e3adac0de5eb6b36c27..d4fc0caa98b46542ce1c029e5ecad82d8d1fccfa 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -187,6 +187,24 @@ class CapabilityService(CapabilityServiceIF): # Send email notification self.notify.notify_ingestion_complete(request) + @on_message(service="workflow", type="workflow-failed") + def on_ingestion_failed(self, **message: Dict[str, str]): + logger.info(f"RECEIVED WORKFLOW-FAILED MESSAGE: {message}") + workflow_request = message.get("subject") + + if ( + workflow_request + and workflow_request.get("workflow_name") + and "ingest" in workflow_request.get("workflow_name") + ): + # Workflow request is an ingestion request + logger.info( + f"INGEST WORKFLOW #{workflow_request['workflow_request_id']} HAS FAILED. SENDING INGESTION-FAILED NOTIFICATION!" + ) + parent_wf_request_id = int(workflow_request["argument"]["parent_wf_request_id"]) + execution = self.capability_info.lookup_execution_by_workflow_request_id(parent_wf_request_id) + self.notify.notify_ingestion_failed(execution.capability_request) + @on_message(type="carta-ready") def on_carta_ready(self, **message: Dict[str, str]): """ diff --git a/shared/workspaces/workspaces/capability/services/execution_manager.py b/shared/workspaces/workspaces/capability/services/execution_manager.py index bca0683ddaae09f96b7ed9282e6fc0151c378204..d7db8d71c3683832b3ec10257b0b1cbfadbeac69 100644 --- a/shared/workspaces/workspaces/capability/services/execution_manager.py +++ b/shared/workspaces/workspaces/capability/services/execution_manager.py @@ -17,11 +17,11 @@ # along with Workspaces. If not, see <https://www.gnu.org/licenses/>. """ This is the execution manager. """ +# pylint: disable=C0301, E0401, W1203 + import logging from typing import Dict, List, Optional -# pylint: disable=C0301, E0401, W1203 - import transaction from messaging.messenger import MessageSender from messaging.router import Router, on_message @@ -204,7 +204,14 @@ class ExecutionManager(ExecutionManagerIF): elif message["subject"]["type"] == "CapabilityRequest": return self.capability_info.lookup_execution(message["subject"]["current_execution"]["id"]) elif message["service"] == "workflow": - # OK, the message is from the workflow layer, so we need to look up using the workflow request ID instead + # OK, the message is from the workflow layer + # Look for a parent_wf_request_id first + if message["subject"].get("argument"): + if parent_workflow_request_id := message["subject"]["argument"].get("parent_wf_request_id"): + # Workflow request has a parent_wf_request_id; use that as the request ID instead of the true ID + logger.info("Trying to find an execution with parent_wf_request_id=%s", parent_workflow_request_id) + return self.capability_info.lookup_execution_by_workflow_request_id(parent_workflow_request_id) + # No parent ID found, so simply use the request's own workflow request ID logger.info( "Trying to find an execution with workflow_request_id=%s", message["subject"]["workflow_request_id"] ) diff --git a/shared/workspaces/workspaces/notification/services/interfaces.py b/shared/workspaces/workspaces/notification/services/interfaces.py index a60c654d5d1cbb434a989d1a5aa9904a6b4bca80..4ce7d750adc1dae2cadc76e96da763a796e40701 100644 --- a/shared/workspaces/workspaces/notification/services/interfaces.py +++ b/shared/workspaces/workspaces/notification/services/interfaces.py @@ -65,3 +65,6 @@ class NotificationServiceIF(ABC): def notify_ingestion_complete(self, request: CapabilityRequest): pass + + def notify_ingestion_failed(self, capability_request): + pass diff --git a/shared/workspaces/workspaces/notification/services/notification_service.py b/shared/workspaces/workspaces/notification/services/notification_service.py index 2b39cfd446d2df4008297b682568a7f10ebd3737..2dd0312d268ff3f668dc76c7a6dce4ff3d547dfe 100644 --- a/shared/workspaces/workspaces/notification/services/notification_service.py +++ b/shared/workspaces/workspaces/notification/services/notification_service.py @@ -108,6 +108,22 @@ class NotificationServiceRESTClient(NotificationServiceIF): }, ) + def notify_ingestion_failed(self, request: CapabilityRequest): + """ + Notify the user that their request has failed to be ingested into the archive + + :param request: Capability request that the notification is regarding + """ + analyst_email = CapoConfig().settings("edu.nrao.workspaces.NotificationSettings").analystEmail + requests.post( + f"{self.url}/notify/ingestion_failed_email/send", + json={ + "destination_email": analyst_email, + "request_id": request.id, + "capability_name": request.capability.name, + }, + ) + def notify_qa_ready(self, request: CapabilityRequest): """ Notify the appropriate parties that a request is ready for QA