Skip to content
Snippets Groups Projects

Workflow parse delivery json

Merged Daniel Lyons requested to merge workflow-parse-delivery-json into main
1 file
+ 35
11
Compare changes
  • Side-by-side
  • Inline
import logging
import requests
from pycapo import CapoConfig
@@ -5,28 +7,50 @@ from workspaces.notification.services.interfaces import NotificationServiceIF, N
from workspaces.capability.services.interfaces import CapabilityRequestIF
logger = logging.getLogger(__name__)
class NotificationServiceRESTClient(NotificationServiceIF):
def __init__(self):
self.url = CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl
self.url = (
CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").serviceUrl
)
self.linkUrl = CapoConfig().settings("edu.nrao.archive.workspaces.UISettings").serviceUrl
def notify_submitted(self, request: CapabilityRequestIF):
# bail out early if we have no user email
if "user_email" not in request.parameters:
logger.info("Not sending notification because no user email supplied")
return
user_email = request.parameters["user_email"]
requests.post(f'{self.url}/notify/submitted_email/send',
json={"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
"status_link": self.linkUrl + "/request-status/" + str(request.id)})
requests.post(
f"{self.url}/notify/submitted_email/send",
json={
"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
"status_link": self.linkUrl + "/request-status/" + str(request.id),
},
)
def notify_complete(self, request: CapabilityRequestIF):
# bail out early if we have no user email
if "user_email" not in request.parameters:
logger.info("Not sending notification because no user email supplied")
return
user_email = request.parameters["user_email"]
requests.post(f'{self.url}/notify/complete_email/send',
json={"destination_email":user_email,
"request_id": request.id,
"capability_name": request.capability.name})
requests.post(
f"{self.url}/notify/complete_email/send",
json={
"destination_email": user_email,
"request_id": request.id,
"capability_name": request.capability.name,
},
)
class NotificationService(NotificationServiceIF):
def __init__(self, info: NotificationInfoIF):
self.info = info
Loading