From 957e068dbe70458d63010c8e1ddb84736ecf8a50 Mon Sep 17 00:00:00 2001 From: "Janet L. Goldstein" <jgoldste@nrao.edu> Date: Wed, 15 Sep 2021 11:54:44 -0600 Subject: [PATCH] WS-651: catches and saves RH CARTA event * implemented lookup_capability_request_version * implemented on_carta_ready * Black reformatting --- .../test/test_capability_service.py | 49 ++++++++++++++++- .../capability/services/capability_info.py | 14 ++++- .../capability/services/capability_service.py | 52 +++++++++++++++---- .../capability/services/interfaces.py | 16 +++--- 4 files changed, 111 insertions(+), 20 deletions(-) diff --git a/shared/workspaces/test/test_capability_service.py b/shared/workspaces/test/test_capability_service.py index d87e64a69..95d58648e 100644 --- a/shared/workspaces/test/test_capability_service.py +++ b/shared/workspaces/test/test_capability_service.py @@ -1,10 +1,15 @@ +""" Unit tests for Capability Service """ + from unittest.mock import patch +# pylint: disable=C0301, E0401, R0201 + import pytest -from workspaces.capability.schema import CapabilityExecution +from workspaces.capability.schema import CapabilityExecution, CapabilityVersion from workspaces.capability.services.capability_info import CapabilityInfo from workspaces.capability.services.capability_service import CapabilityService +from workspaces.workflow.schema import WorkflowRequest pytest_plugins = ["testing.utils.conftest"] @@ -35,3 +40,45 @@ class TestCapabilityService: assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1 (request,) = mock_capability_info.save_entity.call_args.args assert request.ingested is True + + def test_on_carta_ready( + self, + mock_capability_service: CapabilityService, + mock_capability_info: CapabilityInfo, + ): + """ + Are we catching the "carta-ready" message? + + :param mock_capability_service: stand-in for capability service + :param mock_capability_info: stand-in for capability info + :return: + """ + + wf_request_id = -1 + fake_request = WorkflowRequest(workflow_name="carta", workflow_request_id=wf_request_id) + fake_cr_version = CapabilityVersion(capability_request_id=wf_request_id) + carta_url = "decartes_image_carta_url" + fake_carta_ready_msg = { + "service": "capability", + "routing_key": "capability", + "carta_url": carta_url, + "subject": "Your CARTA Session is ready!", + "type": "carta-ready", + "wf_request_id": wf_request_id, + } + + save_entity_old_call_count = mock_capability_info.save_entity.call_count + with patch( + "workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request", + return_value=fake_request, + ): + with patch( + "workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request_version", + return_value=fake_cr_version, + ) as mock_cr_version: + mock_capability_service.on_carta_ready(**fake_carta_ready_msg) + assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1 + mock_cr_version.assert_called() + + (request,) = mock_capability_info.save_entity.call_args.args + assert request.carta_url == carta_url diff --git a/shared/workspaces/workspaces/capability/services/capability_info.py b/shared/workspaces/workspaces/capability/services/capability_info.py index a707e4498..8d60da941 100644 --- a/shared/workspaces/workspaces/capability/services/capability_info.py +++ b/shared/workspaces/workspaces/capability/services/capability_info.py @@ -5,6 +5,8 @@ import sys from datetime import datetime from typing import Dict, List +# pylint: disable=C0103, C0301, C0303, E0401, R0205, R0903, R0904, R0913, W1203 + from sqlalchemy import desc, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session @@ -26,8 +28,6 @@ from workspaces.capability.schema_interfaces import ( from workspaces.capability.services.interfaces import CapabilityInfoIF, QueueReport from workspaces.products.schema_interfaces import FutureProductIF -# pylint: disable=E0401, R0205, R0903 - Base = declarative_base() @@ -196,6 +196,16 @@ class CapabilityInfo(CapabilityInfoIF): """ return self.session.query(CapabilityRequest).filter_by(id=request_id).first() + def lookup_capability_request_version(self, request_id: int) -> CapabilityVersion: + """ + Finds the capability request version with the provided request ID + + :param request_id: capability request ID of interest + + :return: + """ + return self.session.query(CapabilityVersion).filter_by(capability_request_id=request_id).first() + def lookup_capability_execution(self, execution_id: int) -> CapabilityExecution: """ Finds the capability execution with the provided request ID diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 052be85e5..ad7b0dd10 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -1,6 +1,8 @@ import logging from typing import Dict, List +# pylint: disable=E0401, R0903, W1203 + import transaction from messaging.messenger import MessageSender @@ -39,11 +41,15 @@ class CapabilityService(CapabilityServiceIF): self.messenger = MessageSender("capability") self.router = Router("capability") self.router.register(self) - self.execution_manager = ExecutionManager(capability_info, workflow_service, self.router, self.messenger) + self.execution_manager = ExecutionManager( + capability_info, workflow_service, self.router, self.messenger + ) self.capability_info = capability_info self.notify = notification_service - def create_new_version(self, capability_request_id: int, parameters: List[Parameter] = None) -> CapabilityVersionIF: + def create_new_version( + self, capability_request_id: int, parameters: List[Parameter] = None + ) -> CapabilityVersionIF: logger.info(f"Creating new version of Capability Request #{capability_request_id}") return self.capability_info.create_capability_version(capability_request_id, parameters) @@ -52,13 +58,15 @@ class CapabilityService(CapabilityServiceIF): logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}") execution = message["subject"] - capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"]) + capability_request = self.capability_info.lookup_capability_request( + execution["capability_request_id"] + ) capability_request.state = CapabilityRequestState.Complete.name self.capability_info.save_entity(capability_request) - capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message( - "capability_complete" - ) + capability_complete_msg = CapabilityMessageArchitect( + request=capability_request + ).compose_message("capability_complete") self.messenger.send_message(**capability_complete_msg) @on_message(type="execution-failed") @@ -66,13 +74,15 @@ class CapabilityService(CapabilityServiceIF): logger.info(f"RECEIVED EXECUTION-FAILED: {message}") execution = message["subject"] - capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"]) + capability_request = self.capability_info.lookup_capability_request( + execution["capability_request_id"] + ) capability_request.state = CapabilityRequestState.Failed.name self.capability_info.save_entity(capability_request) - capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message( - "capability_failed" - ) + capability_failed_msg = CapabilityMessageArchitect( + request=capability_request + ).compose_message("capability_failed") self.messenger.send_message(**capability_failed_msg) @on_message(service="workflow", type="delivery") @@ -129,7 +139,9 @@ class CapabilityService(CapabilityServiceIF): logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") subject = message["subject"] - execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) + execution = self.capability_info.lookup_execution_by_workflow_request_id( + subject["workflow_request_id"] + ) self.notify.notify_qa_ready(execution.capability_request) @on_message(service="workflow", type="ingestion-complete") @@ -149,6 +161,24 @@ class CapabilityService(CapabilityServiceIF): request.ingested = True self.capability_info.save_entity(request) + @on_message(type="carta-ready") + def on_carta_ready(self, **message: Dict[str, str]): + """ + Catch the RH-flavored event and save it to the capability request version metadata + + :param message: Ingestion-complete message + :return: + """ + logger.info(f"RECEIVED CARTA READY MESSAGE: {message}") + wf_request_id = int(message["wf_request_id"]) + + request = self.capability_info.lookup_capability_request(wf_request_id) + request_version = self.capability_info.lookup_capability_request_version(wf_request_id) + request.version_number = request_version + request.carta_url = message["carta_url"] + + self.capability_info.save_entity(request) + class CapabilityLauncher: """ diff --git a/shared/workspaces/workspaces/capability/services/interfaces.py b/shared/workspaces/workspaces/capability/services/interfaces.py index 543716ee0..236693fa9 100644 --- a/shared/workspaces/workspaces/capability/services/interfaces.py +++ b/shared/workspaces/workspaces/capability/services/interfaces.py @@ -13,6 +13,8 @@ from workspaces.capability.schema_interfaces import ( from workspaces.products.schema_interfaces import FutureProductIF from workspaces.system.schema import AbstractFile +# pylint: disable=C0116, R0903 + CapabilityName = str @@ -84,13 +86,11 @@ class CapabilityQueueManagerIF(ABC): """ Notifies the queue manager that something new is requesting execution. """ - pass def execution_complete(self): """ Notifies the queue manager that something has finished executing. """ - pass def reconfigure(self): """ @@ -169,12 +169,18 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta): def lookup_capability_request(self, request_id) -> CapabilityRequestIF: raise NotImplementedError + @abstractmethod + def lookup_capability_request_version(self, request_id) -> CapabilityRequestIF: + raise NotImplementedError + @abstractmethod def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: raise NotImplementedError @abstractmethod - def lookup_execution_by_workflow_request_id(self, workflow_request_id: int) -> CapabilityExecutionIF: + def lookup_execution_by_workflow_request_id( + self, workflow_request_id: int + ) -> CapabilityExecutionIF: raise NotImplementedError @abstractmethod @@ -210,7 +216,5 @@ class CapabilityQueueIF(ABC): class CapabilityServiceIF(ABC): """ - The capability service: clients access this to request capability runs + The capability service: clients access this to request capability runs. Just a phantom. """ - - pass -- GitLab