From 4926ddd76c8ca080ea7bef2f967c618c07aa8739 Mon Sep 17 00:00:00 2001 From: Charlotte Hausman <chausman@nrao.edu> Date: Thu, 24 Jun 2021 22:33:03 +0000 Subject: [PATCH] Await QA --- ...3ae77f_add_await_qa_step_to_calibration.py | 39 +++++++++++++++++++ ...7bdf5c2e2_add_calibration_ready_for_qa_.py | 14 +++---- services/workflow/workflow/server.py | 16 ++++++++ .../workspaces/capability/helpers.py | 25 +++++++----- .../capability/services/capability_engine.py | 4 ++ .../capability/services/capability_info.py | 14 +++++-- .../capability/services/capability_service.py | 17 ++++++++ .../capability/services/execution_manager.py | 8 +++- .../capability/services/interfaces.py | 16 ++++++-- .../notification/services/interfaces.py | 3 ++ .../services/notification_service.py | 21 +++++++++- .../workflow/services/workflow_service.py | 23 +++++++++++ 12 files changed, 174 insertions(+), 26 deletions(-) create mode 100644 schema/versions/46dbb53ae77f_add_await_qa_step_to_calibration.py diff --git a/schema/versions/46dbb53ae77f_add_await_qa_step_to_calibration.py b/schema/versions/46dbb53ae77f_add_await_qa_step_to_calibration.py new file mode 100644 index 000000000..5e57172c5 --- /dev/null +++ b/schema/versions/46dbb53ae77f_add_await_qa_step_to_calibration.py @@ -0,0 +1,39 @@ +"""add await qa step to calibration + +Revision ID: 46dbb53ae77f +Revises: 6909d5608422 +Create Date: 2021-06-24 16:06:30.176007 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '46dbb53ae77f' +down_revision = '6909d5608422' +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute( + """ + UPDATE capabilities + SET capability_steps='prepare-and-run-workflow std_calibration +await-workflow +await-qa' + WHERE capability_name='std_calibration' + """ + ) + + +def downgrade(): + op.execute( + """ + UPDATE capabilities + SET capability_steps='prepare-and-run-workflow std_calibration +await-workflow' + WHERE capability_name='std_calibration' + """ + ) diff --git a/schema/versions/7f77bdf5c2e2_add_calibration_ready_for_qa_.py b/schema/versions/7f77bdf5c2e2_add_calibration_ready_for_qa_.py index b1f73fb5e..ca1ad94f0 100644 --- a/schema/versions/7f77bdf5c2e2_add_calibration_ready_for_qa_.py +++ b/schema/versions/7f77bdf5c2e2_add_calibration_ready_for_qa_.py @@ -22,13 +22,13 @@ def upgrade(): VALUES ('ready-for-QA_email', 'An email template for notifying a DA that a capability request is ready for QA.', 'Subject: NRAO Workspaces Request #{{request_id}}: Ready for QA\n\n - Dear DA,\n - A {{capability_name}} request is ready for QA.\n - UI for passing QA is currently unavailable, and by default all requests are QA-passed. Sorry for the inconvenience.\n - Request status can be checked here:\n - {{{status_link}}}\n - Best regards,\n - NRAO Workspaces +Dear DA,\n +A {{capability_name}} request is ready for QA.\n +UI for passing QA is currently unavailable, and by default all requests are QA-passed. Sorry for the inconvenience.\n +Request status can be checked here:\n +{{status_link}}\n +Best regards,\n +NRAO Workspaces ') """ ) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index a0e2d4ed3..681e71bc5 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -228,6 +228,15 @@ class WorkflowRequestRestService: return self.request.workflows.execute(self.request.context) + @view_config(request_method="POST", route_name="announce_qa_ready") + def announce_qa(self): + """ + Stopgap for announcing QA ready + :return: + """ + print(f"Announcing QA ready for workflow {self.request.context}") + + return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"]) @view_defaults(route_name="workflow_request_files", renderer="json") class WorkflowFilesRestService: @@ -408,6 +417,13 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/submit", factory=lookup_request, ) + + # yes I know this doesn't match pattern, it's a stopgap. + config.add_route( + "announce_qa_ready", + "/workflows/requests/{request_id}/qa", + factory=lookup_request + ) config.include("pyramid_beaker") config.scan(".") return config.make_wsgi_app() diff --git a/shared/workspaces/workspaces/capability/helpers.py b/shared/workspaces/workspaces/capability/helpers.py index c0ec1a74a..544e8523c 100644 --- a/shared/workspaces/workspaces/capability/helpers.py +++ b/shared/workspaces/workspaces/capability/helpers.py @@ -5,6 +5,8 @@ import logging import re from typing import Iterator, List, Optional +from messaging.router import Router + from workspaces.capability.enums import CapabilityStepType from workspaces.capability.helpers_interfaces import ( CapabilitySequenceIF, @@ -13,7 +15,9 @@ from workspaces.capability.helpers_interfaces import ( ) from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.services.interfaces import CapabilityEngineIF - +from workspaces.notification.services.notification_service import ( + NotificationServiceRESTClient, +) logger = logging.getLogger(__name__) @@ -94,9 +98,7 @@ class CapabilityStep(CapabilityStepIF): return cls.TYPES[step_type](step_type, step_value, step_args) else: # Step string not well-formatted - raise MalformedCapabilityStep( - f"Capability step {step_string} is malformed." - ) + raise MalformedCapabilityStep(f"Capability step {step_string} is malformed.") def __str__(self): string = f"{self.step_type.name}" @@ -184,14 +186,19 @@ class PrepareAndRunWorkflow(CapabilityStep): workflow_args, type(workflow_args), ) - engine.submit_workflow_request( - execution.id, workflow_name, workflow_args, files - ) + engine.submit_workflow_request(execution.id, workflow_name, workflow_args, files) class AwaitQa(CapabilityStep): - def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF): - pass + @staticmethod + def execute(engine: CapabilityEngineIF, execution: CapabilityExecutionIF): + """ + Execute await QA step, which as of now will just send a message that triggers a notification email + + :param engine: Engine executing the capability + :param execution: Corresponding capability execution + """ + engine.announce_qa_ready(execution.id) class AwaitWorkflow(CapabilityStep): diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py index b180cf28d..efa36efec 100644 --- a/shared/workspaces/workspaces/capability/services/capability_engine.py +++ b/shared/workspaces/workspaces/capability/services/capability_engine.py @@ -96,3 +96,7 @@ class CapabilityEngine(CapabilityEngineIF): self.capability_info.save_execution(execution) transaction.commit() self.workflow_service.execute(workflow_request) + + def announce_qa_ready(self, execution_id: int): + execution = self.capability_info.lookup_execution(execution_id) + self.workflow_service.announce_qa_ready(execution.current_workflow_request_id) diff --git a/shared/workspaces/workspaces/capability/services/capability_info.py b/shared/workspaces/workspaces/capability/services/capability_info.py index 9943a2070..f32bc090f 100644 --- a/shared/workspaces/workspaces/capability/services/capability_info.py +++ b/shared/workspaces/workspaces/capability/services/capability_info.py @@ -30,7 +30,6 @@ from workspaces.products.schema_interfaces import FutureProductIF # pylint: disable=E0401, R0205, R0903 - Base = declarative_base() logger = logging.getLogger(__name__) @@ -148,7 +147,7 @@ class CapabilityInfo(CapabilityInfoIF): ) self.save_entity(version) - # Reset request state to Ready + # Reset request state to Created request.state = CapabilityRequestState.Created.name self.save_entity(request) @@ -293,11 +292,20 @@ class CapabilityInfo(CapabilityInfoIF): Commit this execution. :param execution: - :return: """ self.session.add(execution) transaction.commit() + def save_request(self, request: CapabilityRequest): + """ + Commit this request. + + :param request: Capability request to commit + """ + logger.info(f"SAVING REQUEST {request.__json__()} w/ CONTEXT MGR") + self.session.add(request) + transaction.commit() + def save_version_file( self, version: CapabilityVersion, filename: str, content: bytes ) -> CapabilityVersionFile: diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 1820de6e2..c308205e1 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -168,3 +168,20 @@ class CapabilityService(CapabilityServiceIF): subject = message["subject"] request = self.capability_info.lookup_capability_request(subject["id"]) self.notify.notify_failed(request) + + @on_message(type="qa_ready") + def notify_qa_ready(self, **message: Dict): + logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") + + subject = message["subject"] + execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) + self.notify.notify_qa_ready(execution.capability_request) + + # sending step complete notification until QA is properly implemented + step_complete_msg = { + "service": "capability", + "routing_key": "capability", + "subject": execution.__json__(), + "type": "step-complete", + } + self.message_router.send_message(**step_complete_msg) diff --git a/shared/workspaces/workspaces/capability/services/execution_manager.py b/shared/workspaces/workspaces/capability/services/execution_manager.py index 653c131ef..9af2134fc 100644 --- a/shared/workspaces/workspaces/capability/services/execution_manager.py +++ b/shared/workspaces/workspaces/capability/services/execution_manager.py @@ -165,6 +165,7 @@ class ExecutionManager: execution.state = ExecutionState.Waiting.name if execution.on_last_step(): + logger.info("ON LAST STEP!!!!!") # Execution is now complete execution = self._complete_execution(execution) else: @@ -183,8 +184,11 @@ class ExecutionManager: execution.current_step += 1 current_step = step_sequence[execution.current_step] - if current_step.step_type == CapabilityStepType.PrepareAndRunWorkflow: - # Enqueue execution that is on a run workflow step + if current_step.step_type in [ + CapabilityStepType.PrepareAndRunWorkflow, + CapabilityStepType.AwaitQA, + ]: + # Enqueue execution that is on a run workflow step or await-QA step # FIXME: Priority needs to be dynamic; perhaps a priority column in execution schema? self.enqueue_execution(execution, ExecutionPriority.Default.value) diff --git a/shared/workspaces/workspaces/capability/services/interfaces.py b/shared/workspaces/workspaces/capability/services/interfaces.py index 299a67610..8e7bde8b1 100644 --- a/shared/workspaces/workspaces/capability/services/interfaces.py +++ b/shared/workspaces/workspaces/capability/services/interfaces.py @@ -7,7 +7,8 @@ from workspaces.capability.helpers_interfaces import ParameterIF from workspaces.capability.schema_interfaces import ( CapabilityExecutionIF, CapabilityIF, - CapabilityRequestIF, CapabilityVersionIF, + CapabilityRequestIF, + CapabilityVersionIF, ) from workspaces.products.schema_interfaces import FutureProductIF from workspaces.system.schema import AbstractFile @@ -52,9 +53,9 @@ class CapabilityInfoIF(ABC): @abstractmethod def create_capability_version( - self, - capability_request_id: int, - parameters: List[ParameterIF] = None, + self, + capability_request_id: int, + parameters: List[ParameterIF] = None, ) -> CapabilityVersionIF: """ Create new capability request and save it in the database @@ -99,6 +100,9 @@ class CapabilityInfoIF(ABC): def save_entity(self, entity): pass + def save_request(self, capability_request): + raise NotImplementedError + class CapabilityQueueIF(ABC): """ @@ -138,6 +142,10 @@ class CapabilityEngineIF(ABC): ): pass + @abstractmethod + def announce_qa_ready(self, execution_id: int): + pass + class CapabilityServiceIF(ABC): """ diff --git a/shared/workspaces/workspaces/notification/services/interfaces.py b/shared/workspaces/workspaces/notification/services/interfaces.py index 521edda39..6af3feded 100644 --- a/shared/workspaces/workspaces/notification/services/interfaces.py +++ b/shared/workspaces/workspaces/notification/services/interfaces.py @@ -39,3 +39,6 @@ class NotificationServiceIF(ABC): def notify_failed(self, request: CapabilityRequestIF): raise NotImplementedError + + def notify_qa_ready(self, request: CapabilityRequestIF): + pass diff --git a/shared/workspaces/workspaces/notification/services/notification_service.py b/shared/workspaces/workspaces/notification/services/notification_service.py index 0c8a1cbc5..024a1a611 100644 --- a/shared/workspaces/workspaces/notification/services/notification_service.py +++ b/shared/workspaces/workspaces/notification/services/notification_service.py @@ -68,6 +68,26 @@ class NotificationServiceRESTClient(NotificationServiceIF): }, ) + def notify_qa_ready(self, request: CapabilityRequestIF): + """ + Notify the appropriate parties that a request is ready for QA + + :param request: Capability request + """ + logger.info("SENDING QA-READY NOTIFICATION!!!") + analyst_email = ( + CapoConfig().settings("edu.nrao.archive.workspaces.NotificationSettings").analystEmail + ) + requests.post( + f"{self.url}/notify/ready-for-QA_email/send", + json={ + "destination_email": analyst_email, + "request_id": request.id, + "capability_name": request.capability.name, + "status_link": self.linkUrl + "/request-status/" + str(request.id), + }, + ) + class NotificationService(NotificationServiceIF): def __init__(self, info: NotificationInfoIF): @@ -83,7 +103,6 @@ class NotificationService(NotificationServiceIF): if template is None: logger.info("No template '" + template_name + "' found.") - # args = {"template": template.content, "data": parameters} file = template.render(parameters) with smtplib.SMTP(smtp_server) as server: diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index c6921c0d0..f84546ddd 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -93,6 +93,17 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): return WorkflowRequest.from_json(result) + def announce_qa_ready(self, workflow_request_id: int): + """ + Announce results are available for QA + THIS IS A WORKAROUND PENDING THE IMPLEMENTATION OF THE QA SYSTEM! + <insert unhappy tears here> + :param workflow_request: completed workflow request + :param workflow_request_id: id of completed workflow + :return: + """ + requests.post(f"{self.url}/workflows/requests/{workflow_request_id}/qa") + class WorkflowService(WorkflowServiceIF): """ @@ -381,6 +392,18 @@ class WorkflowService(WorkflowServiceIF): else: logger.info("No delivery.json found") + def announce_qa_ready(self, wf_request_id): + wf_req = self.info.lookup_workflow_request(wf_request_id) + # TODO: find better way of listing QA applicable workflows. Enum, maybe? + logger.info("ANNOUNCING QA READY!") + qa_ready_msg = { + "service": "capability", + "routing_key": "capability", + "subject": wf_req.__json__(), + "type": "qa_ready", + } + self.message_router.send_message(**qa_ready_msg) + @on_message(service="workflow") def on_workflow_event(self, **message: Dict): """ -- GitLab