Skip to content
Snippets Groups Projects
Commit 565134b3 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

WS-807: New ingestion-failed message

parent 470cd431
No related branches found
No related tags found
1 merge request!682WS-807: New ingestion-failed message
Pipeline #3786 passed
"""Ingestion-failed notification template
Revision ID: 4354741bf742
Revises: edf4886e1d9c
Create Date: 2021-12-06 15:10:18.670008
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "4354741bf742"
down_revision = "edf4886e1d9c"
branch_labels = None
depends_on = None
def upgrade():
ingestion_failed_notify_sql = """
INSERT INTO notification_templates (name, description, template) VALUES (
'ingestion_failed_email',
'An email template for notifying the DA list that an ingestion has failed',
'To: {{destination_email}}
Subject: NRAO Workspaces - Ingestion failed for request #{{request_id}}
Dear DA,
A {{capability_name}} request #{{request_id}} failed to be ingested into the archive.
Best regards,
NRAO Workspaces'
)
"""
op.execute(ingestion_failed_notify_sql)
def downgrade():
op.execute("DELETE FROM notification_templates WHERE name = 'ingestion_failed_email'")
......@@ -187,6 +187,24 @@ class CapabilityService(CapabilityServiceIF):
# Send email notification
self.notify.notify_ingestion_complete(request)
@on_message(service="workflow", type="workflow-failed")
def on_ingestion_failed(self, **message: Dict[str, str]):
logger.info(f"RECEIVED WORKFLOW-FAILED MESSAGE: {message}")
workflow_request = message.get("subject")
if (
workflow_request
and workflow_request.get("workflow_name")
and "ingest" in workflow_request.get("workflow_name")
):
# Workflow request is an ingestion request
logger.info(
f"INGEST WORKFLOW #{workflow_request['workflow_request_id']} HAS FAILED. SENDING INGESTION-FAILED NOTIFICATION!"
)
parent_wf_request_id = int(workflow_request["argument"]["parent_wf_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(parent_wf_request_id)
self.notify.notify_ingestion_failed(execution.capability_request)
@on_message(type="carta-ready")
def on_carta_ready(self, **message: Dict[str, str]):
"""
......
......@@ -17,11 +17,11 @@
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" This is the execution manager. """
# pylint: disable=C0301, E0401, W1203
import logging
from typing import Dict, List, Optional
# pylint: disable=C0301, E0401, W1203
import transaction
from messaging.messenger import MessageSender
from messaging.router import Router, on_message
......@@ -204,7 +204,14 @@ class ExecutionManager(ExecutionManagerIF):
elif message["subject"]["type"] == "CapabilityRequest":
return self.capability_info.lookup_execution(message["subject"]["current_execution"]["id"])
elif message["service"] == "workflow":
# OK, the message is from the workflow layer, so we need to look up using the workflow request ID instead
# OK, the message is from the workflow layer
# Look for a parent_wf_request_id first
if message["subject"].get("argument"):
if parent_workflow_request_id := message["subject"]["argument"].get("parent_wf_request_id"):
# Workflow request has a parent_wf_request_id; use that as the request ID instead of the true ID
logger.info("Trying to find an execution with parent_wf_request_id=%s", parent_workflow_request_id)
return self.capability_info.lookup_execution_by_workflow_request_id(parent_workflow_request_id)
# No parent ID found, so simply use the request's own workflow request ID
logger.info(
"Trying to find an execution with workflow_request_id=%s", message["subject"]["workflow_request_id"]
)
......
......@@ -65,3 +65,6 @@ class NotificationServiceIF(ABC):
def notify_ingestion_complete(self, request: CapabilityRequest):
pass
def notify_ingestion_failed(self, capability_request):
pass
......@@ -108,6 +108,22 @@ class NotificationServiceRESTClient(NotificationServiceIF):
},
)
def notify_ingestion_failed(self, request: CapabilityRequest):
"""
Notify the user that their request has failed to be ingested into the archive
:param request: Capability request that the notification is regarding
"""
analyst_email = CapoConfig().settings("edu.nrao.workspaces.NotificationSettings").analystEmail
requests.post(
f"{self.url}/notify/ingestion_failed_email/send",
json={
"destination_email": analyst_email,
"request_id": request.id,
"capability_name": request.capability.name,
},
)
def notify_qa_ready(self, request: CapabilityRequest):
"""
Notify the appropriate parties that a request is ready for QA
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment