Skip to content
Snippets Groups Projects
Commit 2ab6315c authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Workflow parse delivery json

parent 58545185
No related branches found
No related tags found
1 merge request!181Workflow parse delivery json
Pipeline #1233 passed
......@@ -86,14 +86,14 @@ class Router(MessageReceiverIF):
:param message: Received message
"""
if self.service == "capability":
logger.info(f"receiving message: {message}")
logger.debug(f"receiving message: {message}")
message = json.loads(message)
for pattern, callbacks in self.callbacks.items():
# Convert hashable str version of pattern back to dict
dict_pattern = json.loads(pattern)
logger.info(
logger.debug(
f"\nRouter.receive_message > pattern: {dict_pattern}, message: {message['type']}, matches: {self.matches(pattern, message)}\n"
)
if self.matches(dict_pattern, message):
......
import logging
import requests
from pycapo import CapoConfig
......@@ -5,28 +7,50 @@ from workspaces.notification.services.interfaces import NotificationServiceIF, N
from workspaces.capability.services.interfaces import CapabilityRequestIF
logger = logging.getLogger(__name__)
class NotificationServiceRESTClient(NotificationServiceIF):
def __init__(self):
self.url = CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl
self.url = (
CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl
)
self.linkUrl = CapoConfig().settings("edu.nrao.archive.workspaces.UISettings").serviceUrl
def notify_submitted(self, request: CapabilityRequestIF):
# bail out early if we have no user email
if "user_email" not in request.parameters:
logger.info("Not sending notification because no user email supplied")
return
user_email = request.parameters["user_email"]
requests.post(f'{self.url}/notify/submitted_email/send',
json={"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
"status_link": self.linkUrl + "/request-status/" + str(request.id)})
requests.post(
f"{self.url}/notify/submitted_email/send",
json={
"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
"status_link": self.linkUrl + "/request-status/" + str(request.id),
},
)
def notify_complete(self, request: CapabilityRequestIF):
# bail out early if we have no user email
if "user_email" not in request.parameters:
logger.info("Not sending notification because no user email supplied")
return
user_email = request.parameters["user_email"]
requests.post(f'{self.url}/notify/complete_email/send',
json={"destination_email":user_email,
"request_id": request.id,
"capability_name": request.capability.name})
requests.post(
f"{self.url}/notify/complete_email/send",
json={
"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
},
)
class NotificationService(NotificationServiceIF):
def __init__(self, info: NotificationInfoIF):
self.info = info
import json
import logging
import os
import pathlib
import stat
import subprocess
from pathlib import Path
......@@ -72,7 +74,9 @@ class WorkflowService(WorkflowServiceIF):
self.message_router = Router("workflow")
self.message_router.register(self)
self.info = info
self.useCondor = CapoConfig().getboolean("edu.nrao.archive.workspaces.ProcessingSettings.useCondor")
self.useCondor = CapoConfig().getboolean(
"edu.nrao.archive.workspaces.ProcessingSettings.useCondor"
)
def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile):
self.info.save_file(request=request, filename=file.filename, content=file.content)
......@@ -201,9 +205,36 @@ class WorkflowService(WorkflowServiceIF):
return Path(logfile_name.strip())
return Path(f"{jobfile_name.stem}.log")
@on_message(service="workflow", type="workflow-complete")
def propagate_delivery(self, **message: Dict):
# look up the workflow request to get the path
request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"])
# find the path to the delivery.json file
delivery_file = pathlib.Path(request.results_dir) / "delivery.json"
# does it exist? if so, we need to send out a new event with its contents
if delivery_file.exists():
logger.info("Found delivery.json file, sending delivery notification")
with open(delivery_file, "r") as delivery_fo:
delivery_info = json.load(delivery_fo)
# construct the message
message = {
"subject": message["subject"],
"service": "workflow",
"type": "delivery",
"delivery": delivery_info,
}
# send the message
self.message_router.send_message(**message)
else:
logger.info("No delivery.json found")
@on_message(service="workflow")
def on_workflow_event(self, **message: Dict):
# logger.info(f"RECEIVED WORKFLOW MESSAGE: {message}")
logger.info(f"RECEIVED WORKFLOW MESSAGE: {message}")
msg_subject = message["subject"]
if msg_subject["type"] == "WorkflowRequest":
request_id = msg_subject["workflow_request_id"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment