diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index a0e2d4ed3f736973dcd5fd4a7d186d3e861b8c52..681e71bc518905ece87d01dd4061e6f111ea7002 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -228,6 +228,15 @@ class WorkflowRequestRestService: return self.request.workflows.execute(self.request.context) + @view_config(request_method="POST", route_name="announce_qa_ready") + def announce_qa(self): + """ + Stopgap for announcing QA ready + :return: + """ + print(f"Announcing QA ready for workflow {self.request.context}") + + return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"]) @view_defaults(route_name="workflow_request_files", renderer="json") class WorkflowFilesRestService: @@ -408,6 +417,13 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/submit", factory=lookup_request, ) + + # yes I know this doesn't match pattern, it's a stopgap. + config.add_route( + "announce_qa_ready", + "/workflows/requests/{request_id}/qa", + factory=lookup_request + ) config.include("pyramid_beaker") config.scan(".") return config.make_wsgi_app() diff --git a/shared/workspaces/workspaces/capability/helpers.py b/shared/workspaces/workspaces/capability/helpers.py index afe8ead221dfea2b0912e4c2d08be837d7fd24e5..544e8523ca6a8d0324d7a00d69c38d6b3043d12c 100644 --- a/shared/workspaces/workspaces/capability/helpers.py +++ b/shared/workspaces/workspaces/capability/helpers.py @@ -198,8 +198,7 @@ class AwaitQa(CapabilityStep): :param engine: Engine executing the capability :param execution: Corresponding capability execution """ - NotificationServiceRESTClient().notify_qa_ready(execution.capability_request) - engine.step_complete(execution) + engine.announce_qa_ready(execution.id) class AwaitWorkflow(CapabilityStep): diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py index 026b701b3fffea7447737c834bc3109ff8722431..efa36efec93b199df4b25b1938162d9c65b6994d 100644 --- a/shared/workspaces/workspaces/capability/services/capability_engine.py +++ b/shared/workspaces/workspaces/capability/services/capability_engine.py @@ -2,7 +2,6 @@ import logging from typing import List import transaction -from messaging.router import Router from workspaces.capability.helpers import CapabilitySequence from workspaces.capability.schema_interfaces import CapabilityExecutionIF @@ -25,12 +24,10 @@ class CapabilityEngine(CapabilityEngineIF): self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router, ): self.capability_info = capability_info self.workflow_service = workflow_service self.execution = None - self.message_router = message_router def load_engine(self, execution: CapabilityExecutionIF): """ @@ -100,11 +97,6 @@ class CapabilityEngine(CapabilityEngineIF): transaction.commit() self.workflow_service.execute(workflow_request) - def step_complete(self, execution: CapabilityExecutionIF): - step_complete_msg = { - "service": "capability", - "routing_key": "capability", - "subject": execution.__json__(), - "type": "step-complete", - } - self.message_router.send_message(**step_complete_msg) + def announce_qa_ready(self, execution_id: int): + execution = self.capability_info.lookup_execution(execution_id) + self.workflow_service.announce_qa_ready(execution.current_workflow_request_id) diff --git a/shared/workspaces/workspaces/capability/services/capability_queue.py b/shared/workspaces/workspaces/capability/services/capability_queue.py index c7dcc686d49198657281b3a9883141782ecf39a3..f7bf92c52e3f6a5955e7d7c4090705c05c6a033c 100644 --- a/shared/workspaces/workspaces/capability/services/capability_queue.py +++ b/shared/workspaces/workspaces/capability/services/capability_queue.py @@ -5,8 +5,6 @@ from collections import namedtuple from queue import Empty, PriorityQueue, Queue from typing import NamedTuple, Optional -from messaging.router import Router - from workspaces.capability.enums import ExecutionState from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.services.capability_engine import CapabilityEngine @@ -32,14 +30,11 @@ class CapabilityQueue(CapabilityQueueIF): capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, max_concurrency: int, - message_router: Router, ): self.capability_info = capability_info self.queue = PriorityQueue() self.max_concurrency = max_concurrency - self.engine_list = self.init_engines( - max_concurrency, capability_info, workflow_service, message_router - ) + self.engine_list = self.init_engines(max_concurrency, capability_info, workflow_service) self.queue_manager = threading.Thread(target=self.process_executions, daemon=True) self.queue_manager.start() self.paused = False @@ -50,7 +45,6 @@ class CapabilityQueue(CapabilityQueueIF): num_engines: int, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF, - message_router: Router, ) -> EngineList: """ Initialize available engine queue and list of in-use engines @@ -59,9 +53,7 @@ class CapabilityQueue(CapabilityQueueIF): """ engine_list = cls.EngineList(available=Queue(), in_use=[]) for _ in range(num_engines): - engine_list.available.put( - CapabilityEngine(capability_info, workflow_service, message_router) - ) + engine_list.available.put(CapabilityEngine(capability_info, workflow_service)) return engine_list diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 1820de6e2fe53b1758891b0463d6b3a8cab0943b..c308205e14f0355c1b0e4d709aa8292bd9990994 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -168,3 +168,20 @@ class CapabilityService(CapabilityServiceIF): subject = message["subject"] request = self.capability_info.lookup_capability_request(subject["id"]) self.notify.notify_failed(request) + + @on_message(type="qa_ready") + def notify_qa_ready(self, **message: Dict): + logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") + + subject = message["subject"] + execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) + self.notify.notify_qa_ready(execution.capability_request) + + # sending step complete notification until QA is properly implemented + step_complete_msg = { + "service": "capability", + "routing_key": "capability", + "subject": execution.__json__(), + "type": "step-complete", + } + self.message_router.send_message(**step_complete_msg) diff --git a/shared/workspaces/workspaces/capability/services/execution_manager.py b/shared/workspaces/workspaces/capability/services/execution_manager.py index 2e125dc503d2c653ea19e39d6e2eaac2cf409391..9af2134fc3e8490103888edf97abee556f2fe349 100644 --- a/shared/workspaces/workspaces/capability/services/execution_manager.py +++ b/shared/workspaces/workspaces/capability/services/execution_manager.py @@ -165,6 +165,7 @@ class ExecutionManager: execution.state = ExecutionState.Waiting.name if execution.on_last_step(): + logger.info("ON LAST STEP!!!!!") # Execution is now complete execution = self._complete_execution(execution) else: @@ -231,7 +232,6 @@ class ExecutionManager: self.capability_info, self.workflow_service, execution.capability.max_jobs, - self.message_router ), ) queue.enqueue(execution, priority) diff --git a/shared/workspaces/workspaces/capability/services/interfaces.py b/shared/workspaces/workspaces/capability/services/interfaces.py index 095bc0175b2581b4de852a4e47a4ad593ccbb5f8..8e7bde8b136e2f75f2c3cac65e5c4998c8e69581 100644 --- a/shared/workspaces/workspaces/capability/services/interfaces.py +++ b/shared/workspaces/workspaces/capability/services/interfaces.py @@ -143,13 +143,7 @@ class CapabilityEngineIF(ABC): pass @abstractmethod - def step_complete(self, execution: CapabilityExecutionIF): - """ - Announce that the step is complete. A bit of a kludge, steps should probably - not know they are complete during the time they are being handled by the CapabilityEngine. - - :param execution: the execution which needs to be advanced - """ + def announce_qa_ready(self, execution_id: int): pass diff --git a/shared/workspaces/workspaces/notification/services/interfaces.py b/shared/workspaces/workspaces/notification/services/interfaces.py index 521edda39117cd35a0252d80f78f42aab0022a4c..6af3fededb8eb6b436cd6dbc4a69243719faa856 100644 --- a/shared/workspaces/workspaces/notification/services/interfaces.py +++ b/shared/workspaces/workspaces/notification/services/interfaces.py @@ -39,3 +39,6 @@ class NotificationServiceIF(ABC): def notify_failed(self, request: CapabilityRequestIF): raise NotImplementedError + + def notify_qa_ready(self, request: CapabilityRequestIF): + pass diff --git a/shared/workspaces/workspaces/notification/services/notification_service.py b/shared/workspaces/workspaces/notification/services/notification_service.py index 347f48ce12e0631fa1e08be2e31d6fb398105bfb..024a1a611b6f2b9e079a0df2fdb838ec96dcac7c 100644 --- a/shared/workspaces/workspaces/notification/services/notification_service.py +++ b/shared/workspaces/workspaces/notification/services/notification_service.py @@ -84,6 +84,7 @@ class NotificationServiceRESTClient(NotificationServiceIF): "destination_email": analyst_email, "request_id": request.id, "capability_name": request.capability.name, + "status_link": self.linkUrl + "/request-status/" + str(request.id), }, ) diff --git a/shared/workspaces/workspaces/system/schema.py b/shared/workspaces/workspaces/system/schema.py index 630e62404ff8eb909c1c5e9952a9f66229713a65..7965cc83ec82219649a229a015c451ace41cb59a 100644 --- a/shared/workspaces/workspaces/system/schema.py +++ b/shared/workspaces/workspaces/system/schema.py @@ -66,7 +66,7 @@ def get_engine(): url = capo.jdbcUrl.replace("jdbc:", "").replace( "://", f"://{capo.jdbcUsername}:{capo.jdbcPassword}@" ) - return create_engine(url, echo=True) + return create_engine(url) def get_session_factory(engine): diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index c6921c0d0426c092709ba60cc8a45f749feaa3f0..f84546ddd864de7868d2b35b05f05ac6980edabb 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -93,6 +93,17 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): return WorkflowRequest.from_json(result) + def announce_qa_ready(self, workflow_request_id: int): + """ + Announce results are available for QA + THIS IS A WORKAROUND PENDING THE IMPLEMENTATION OF THE QA SYSTEM! + <insert unhappy tears here> + :param workflow_request: completed workflow request + :param workflow_request_id: id of completed workflow + :return: + """ + requests.post(f"{self.url}/workflows/requests/{workflow_request_id}/qa") + class WorkflowService(WorkflowServiceIF): """ @@ -381,6 +392,18 @@ class WorkflowService(WorkflowServiceIF): else: logger.info("No delivery.json found") + def announce_qa_ready(self, wf_request_id): + wf_req = self.info.lookup_workflow_request(wf_request_id) + # TODO: find better way of listing QA applicable workflows. Enum, maybe? + logger.info("ANNOUNCING QA READY!") + qa_ready_msg = { + "service": "capability", + "routing_key": "capability", + "subject": wf_req.__json__(), + "type": "qa_ready", + } + self.message_router.send_message(**qa_ready_msg) + @on_message(service="workflow") def on_workflow_event(self, **message: Dict): """