From 5a0d19ce6ecbcea442a7e52ed951ecd3462b20df Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Thu, 24 Jun 2021 16:05:33 -0600 Subject: [PATCH] WS-267: Await QA --- services/workflow/workflow/server.py | 16 +++++++++++++ .../workspaces/capability/helpers.py | 3 +-- .../capability/services/capability_engine.py | 14 +++-------- .../capability/services/capability_queue.py | 12 ++-------- .../capability/services/capability_service.py | 17 ++++++++++++++ .../capability/services/execution_manager.py | 2 +- .../capability/services/interfaces.py | 8 +------ .../notification/services/interfaces.py | 3 +++ .../services/notification_service.py | 1 + shared/workspaces/workspaces/system/schema.py | 2 +- .../workflow/services/workflow_service.py | 23 +++++++++++++++++++ 11 files changed, 69 insertions(+), 32 deletions(-) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index a0e2d4ed3..681e71bc5 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -228,6 +228,15 @@ class WorkflowRequestRestService: return self.request.workflows.execute(self.request.context) + @view_config(request_method="POST", route_name="announce_qa_ready") + def announce_qa(self): + """ + Stopgap for announcing QA ready + :return: + """ + print(f"Announcing QA ready for workflow {self.request.context}") + + return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"]) @view_defaults(route_name="workflow_request_files", renderer="json") class WorkflowFilesRestService: @@ -408,6 +417,13 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/submit", factory=lookup_request, ) + + # yes I know this doesn't match pattern, it's a stopgap. + config.add_route( + "announce_qa_ready", + "/workflows/requests/{request_id}/qa", + factory=lookup_request + ) config.include("pyramid_beaker") config.scan(".") return config.make_wsgi_app() diff --git a/shared/workspaces/workspaces/capability/helpers.py b/shared/workspaces/workspaces/capability/helpers.py index afe8ead22..544e8523c 100644 --- a/shared/workspaces/workspaces/capability/helpers.py +++ b/shared/workspaces/workspaces/capability/helpers.py @@ -198,8 +198,7 @@ class AwaitQa(CapabilityStep): :param engine: Engine executing the capability :param execution: Corresponding capability execution """ - NotificationServiceRESTClient().notify_qa_ready(execution.capability_request) - engine.step_complete(execution) + engine.announce_qa_ready(execution.id) class AwaitWorkflow(CapabilityStep): diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py index 026b701b3..efa36efec 100644 --- a/shared/workspaces/workspaces/capability/services/capability_engine.py +++ b/shared/workspaces/workspaces/capability/services/capability_engine.py @@ -2,7 +2,6 @@ import logging from typing import List import transaction -from messaging.router import Router from workspaces.capability.helpers import CapabilitySequence from workspaces.capability.schema_interfaces import CapabilityExecutionIF @@ -25,12 +24,10 @@ class CapabilityEngine(CapabilityEngineIF): self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router, ): self.capability_info = capability_info self.workflow_service = workflow_service self.execution = None - self.message_router = message_router def load_engine(self, execution: CapabilityExecutionIF): """ @@ -100,11 +97,6 @@ class CapabilityEngine(CapabilityEngineIF): transaction.commit() self.workflow_service.execute(workflow_request) - def step_complete(self, execution: CapabilityExecutionIF): - step_complete_msg = { - "service": "capability", - "routing_key": "capability", - "subject": execution.__json__(), - "type": "step-complete", - } - self.message_router.send_message(**step_complete_msg) + def announce_qa_ready(self, execution_id: int): + execution = self.capability_info.lookup_execution(execution_id) + self.workflow_service.announce_qa_ready(execution.current_workflow_request_id) diff --git a/shared/workspaces/workspaces/capability/services/capability_queue.py b/shared/workspaces/workspaces/capability/services/capability_queue.py index c7dcc686d..f7bf92c52 100644 --- a/shared/workspaces/workspaces/capability/services/capability_queue.py +++ b/shared/workspaces/workspaces/capability/services/capability_queue.py @@ -5,8 +5,6 @@ from collections import namedtuple from queue import Empty, PriorityQueue, Queue from typing import NamedTuple, Optional -from messaging.router import Router - from workspaces.capability.enums import ExecutionState from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.services.capability_engine import CapabilityEngine @@ -32,14 +30,11 @@ class CapabilityQueue(CapabilityQueueIF): capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, max_concurrency: int, - message_router: Router, ): self.capability_info = capability_info self.queue = PriorityQueue() self.max_concurrency = max_concurrency - self.engine_list = self.init_engines( - max_concurrency, capability_info, workflow_service, message_router - ) + self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service) self.queue_manager = threading.Thread(target=self.process_executions, daemon=True) self.queue_manager.start() self.paused = False @@ -50,7 +45,6 @@ class CapabilityQueue(CapabilityQueueIF): num_engines: int, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router, ) -> EngineList: """ Initialize available engine queue and list of in-use engines @@ -59,9 +53,7 @@ class CapabilityQueue(CapabilityQueueIF): """ engine_list = cls.EngineList(available=Queue(), in_use=[]) for _ in range(num_engines): - engine_list.available.put( - CapabilityEngine(capability_info, workflow_service, message_router) - ) + engine_list.available.put(CapabilityEngine(capability_info, workflow_service)) return engine_list diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 1820de6e2..c308205e1 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -168,3 +168,20 @@ class CapabilityService(CapabilityServiceIF): subject = message["subject"] request = self.capability_info.lookup_capability_request(subject["id"]) self.notify.notify_failed(request) + + @on_message(type="qa_ready") + def notify_qa_ready(self, **message: Dict): + logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") + + subject = message["subject"] + execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) + self.notify.notify_qa_ready(execution.capability_request) + + # sending step complete notification until QA is properly implemented + step_complete_msg = { + "service": "capability", + "routing_key": "capability", + "subject": execution.__json__(), + "type": "step-complete", + } + self.message_router.send_message(**step_complete_msg) diff --git a/shared/workspaces/workspaces/capability/services/execution_manager.py b/shared/workspaces/workspaces/capability/services/execution_manager.py index 2e125dc50..9af2134fc 100644 --- a/shared/workspaces/workspaces/capability/services/execution_manager.py +++ b/shared/workspaces/workspaces/capability/services/execution_manager.py @@ -165,6 +165,7 @@ class ExecutionManager: execution.state = ExecutionState.Waiting.name if execution.on_last_step(): + logger.info("ON LAST STEP!!!!!") # Execution is now complete execution = self._complete_execution(execution) else: @@ -231,7 +232,6 @@ class ExecutionManager: self.capability_info, self.workflow_service, execution.capability.max_jobs, - self.message_router ), ) queue.enqueue(execution, priority) diff --git a/shared/workspaces/workspaces/capability/services/interfaces.py b/shared/workspaces/workspaces/capability/services/interfaces.py index 095bc0175..8e7bde8b1 100644 --- a/shared/workspaces/workspaces/capability/services/interfaces.py +++ b/shared/workspaces/workspaces/capability/services/interfaces.py @@ -143,13 +143,7 @@ class CapabilityEngineIF(ABC): pass @abstractmethod - def step_complete(self, execution: CapabilityExecutionIF): - """ - Announce that the step is complete. A bit of a kludge, steps should probably - not know they are complete during the time they are being handled by the CapabilityEngine. - - :param execution: the execution which needs to be advanced - """ + def announce_qa_ready(self, execution_id: int): pass diff --git a/shared/workspaces/workspaces/notification/services/interfaces.py b/shared/workspaces/workspaces/notification/services/interfaces.py index 521edda39..6af3feded 100644 --- a/shared/workspaces/workspaces/notification/services/interfaces.py +++ b/shared/workspaces/workspaces/notification/services/interfaces.py @@ -39,3 +39,6 @@ class NotificationServiceIF(ABC): def notify_failed(self, request: CapabilityRequestIF): raise NotImplementedError + + def notify_qa_ready(self, request: CapabilityRequestIF): + pass diff --git a/shared/workspaces/workspaces/notification/services/notification_service.py b/shared/workspaces/workspaces/notification/services/notification_service.py index 347f48ce1..024a1a611 100644 --- a/shared/workspaces/workspaces/notification/services/notification_service.py +++ b/shared/workspaces/workspaces/notification/services/notification_service.py @@ -84,6 +84,7 @@ class NotificationServiceRESTClient(NotificationServiceIF): "destination_email": analyst_email, "request_id": request.id, "capability_name": request.capability.name, + "status_link": self.linkUrl + "/request-status/" + str(request.id), }, ) diff --git a/shared/workspaces/workspaces/system/schema.py b/shared/workspaces/workspaces/system/schema.py index 630e62404..7965cc83e 100644 --- a/shared/workspaces/workspaces/system/schema.py +++ b/shared/workspaces/workspaces/system/schema.py @@ -66,7 +66,7 @@ def get_engine(): url = capo.jdbcUrl.replace("jdbc:", "").replace( "://", f"://{capo.jdbcUsername}:{capo.jdbcPassword}@" ) - return create_engine(url, echo=True) + return create_engine(url) def get_session_factory(engine): diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index c6921c0d0..f84546ddd 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -93,6 +93,17 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): return WorkflowRequest.from_json(result) + def announce_qa_ready(self, workflow_request_id: int): + """ + Announce results are available for QA + THIS IS A WORKAROUND PENDING THE IMPLEMENTATION OF THE QA SYSTEM! + <insert unhappy tears here> + :param workflow_request: completed workflow request + :param workflow_request_id: id of completed workflow + :return: + """ + requests.post(f"{self.url}/workflows/requests/{workflow_request_id}/qa") + class WorkflowService(WorkflowServiceIF): """ @@ -381,6 +392,18 @@ class WorkflowService(WorkflowServiceIF): else: logger.info("No delivery.json found") + def announce_qa_ready(self, wf_request_id): + wf_req = self.info.lookup_workflow_request(wf_request_id) + # TODO: find better way of listing QA applicable workflows. Enum, maybe? + logger.info("ANNOUNCING QA READY!") + qa_ready_msg = { + "service": "capability", + "routing_key": "capability", + "subject": wf_req.__json__(), + "type": "qa_ready", + } + self.message_router.send_message(**qa_ready_msg) + @on_message(service="workflow") def on_workflow_event(self, **message: Dict): """ -- GitLab