From 2ab6315cfc185e624333713e0f86259c2269c3b9 Mon Sep 17 00:00:00 2001 From: Daniel Lyons <dlyons@nrao.edu> Date: Fri, 9 Apr 2021 19:09:55 +0000 Subject: [PATCH] Workflow parse delivery json --- shared/messaging/messaging/router.py | 4 +- .../services/notification_service.py | 46 ++++++++++++++----- .../workflow/services/workflow_service.py | 35 +++++++++++++- 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/shared/messaging/messaging/router.py b/shared/messaging/messaging/router.py index a0d4f1065..bfc7105fd 100644 --- a/shared/messaging/messaging/router.py +++ b/shared/messaging/messaging/router.py @@ -86,14 +86,14 @@ class Router(MessageReceiverIF): :param message: Received message """ if self.service == "capability": - logger.info(f"receiving message: {message}") + logger.debug(f"receiving message: {message}") message = json.loads(message) for pattern, callbacks in self.callbacks.items(): # Convert hashable str version of pattern back to dict dict_pattern = json.loads(pattern) - logger.info( + logger.debug( f"\nRouter.receive_message > pattern: {dict_pattern}, message: {message['type']}, matches: {self.matches(pattern, message)}\n" ) if self.matches(dict_pattern, message): diff --git a/shared/workspaces/workspaces/notification/services/notification_service.py b/shared/workspaces/workspaces/notification/services/notification_service.py index 9236b6a55..ba3f12ccf 100644 --- a/shared/workspaces/workspaces/notification/services/notification_service.py +++ b/shared/workspaces/workspaces/notification/services/notification_service.py @@ -1,3 +1,5 @@ +import logging + import requests from pycapo import CapoConfig @@ -5,28 +7,50 @@ from workspaces.notification.services.interfaces import NotificationServiceIF, N from workspaces.capability.services.interfaces import CapabilityRequestIF +logger = logging.getLogger(__name__) + + class NotificationServiceRESTClient(NotificationServiceIF): def __init__(self): - self.url = CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl + self.url = ( + CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl + ) self.linkUrl = CapoConfig().settings("edu.nrao.archive.workspaces.UISettings").serviceUrl def notify_submitted(self, request: CapabilityRequestIF): + # bail out early if we have no user email + if "user_email" not in request.parameters: + logger.info("Not sending notification because no user email supplied") + return + user_email = request.parameters["user_email"] - requests.post(f'{self.url}/notify/submitted_email/send', - json={"destination_email": user_email, - "request_id": request.id, - "capability_name": request.capability.name, - "status_link": self.linkUrl + "/request-status/" + str(request.id)}) + requests.post( + f"{self.url}/notify/submitted_email/send", + json={ + "destination_email": user_email, + "request_id": request.id, + "capability_name": request.capability.name, + "status_link": self.linkUrl + "/request-status/" + str(request.id), + }, + ) def notify_complete(self, request: CapabilityRequestIF): + # bail out early if we have no user email + if "user_email" not in request.parameters: + logger.info("Not sending notification because no user email supplied") + return + user_email = request.parameters["user_email"] - requests.post(f'{self.url}/notify/complete_email/send', - json={"destination_email":user_email, - "request_id": request.id, - "capability_name": request.capability.name}) + requests.post( + f"{self.url}/notify/complete_email/send", + json={ + "destination_email": user_email, + "request_id": request.id, + "capability_name": request.capability.name, + }, + ) class NotificationService(NotificationServiceIF): def __init__(self, info: NotificationInfoIF): self.info = info - diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index e1d8e8bb5..e7e991daa 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -1,5 +1,7 @@ +import json import logging import os +import pathlib import stat import subprocess from pathlib import Path @@ -72,7 +74,9 @@ class WorkflowService(WorkflowServiceIF): self.message_router = Router("workflow") self.message_router.register(self) self.info = info - self.useCondor = CapoConfig().getboolean("edu.nrao.archive.workspaces.ProcessingSettings.useCondor") + self.useCondor = CapoConfig().getboolean( + "edu.nrao.archive.workspaces.ProcessingSettings.useCondor" + ) def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile): self.info.save_file(request=request, filename=file.filename, content=file.content) @@ -201,9 +205,36 @@ class WorkflowService(WorkflowServiceIF): return Path(logfile_name.strip()) return Path(f"{jobfile_name.stem}.log") + @on_message(service="workflow", type="workflow-complete") + def propagate_delivery(self, **message: Dict): + # look up the workflow request to get the path + request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"]) + + # find the path to the delivery.json file + delivery_file = pathlib.Path(request.results_dir) / "delivery.json" + + # does it exist? if so, we need to send out a new event with its contents + if delivery_file.exists(): + logger.info("Found delivery.json file, sending delivery notification") + with open(delivery_file, "r") as delivery_fo: + delivery_info = json.load(delivery_fo) + + # construct the message + message = { + "subject": message["subject"], + "service": "workflow", + "type": "delivery", + "delivery": delivery_info, + } + + # send the message + self.message_router.send_message(**message) + else: + logger.info("No delivery.json found") + @on_message(service="workflow") def on_workflow_event(self, **message: Dict): - # logger.info(f"RECEIVED WORKFLOW MESSAGE: {message}") + logger.info(f"RECEIVED WORKFLOW MESSAGE: {message}") msg_subject = message["subject"] if msg_subject["type"] == "WorkflowRequest": request_id = msg_subject["workflow_request_id"] -- GitLab