Skip to content
Snippets Groups Projects
Commit 957e068d authored by Janet Goldstein's avatar Janet Goldstein
Browse files

WS-651: catches and saves RH CARTA event

* implemented lookup_capability_request_version
* implemented on_carta_ready
* Black reformatting
parent 61c76041
No related branches found
No related tags found
1 merge request!505WS-651: catches and saves RH CARTA event
Pipeline #2897 failed
""" Unit tests for Capability Service """
from unittest.mock import patch
# pylint: disable=C0301, E0401, R0201
import pytest
from workspaces.capability.schema import CapabilityExecution
from workspaces.capability.schema import CapabilityExecution, CapabilityVersion
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.workflow.schema import WorkflowRequest
pytest_plugins = ["testing.utils.conftest"]
......@@ -35,3 +40,45 @@ class TestCapabilityService:
assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1
(request,) = mock_capability_info.save_entity.call_args.args
assert request.ingested is True
def test_on_carta_ready(
self,
mock_capability_service: CapabilityService,
mock_capability_info: CapabilityInfo,
):
"""
Are we catching the "carta-ready" message?
:param mock_capability_service: stand-in for capability service
:param mock_capability_info: stand-in for capability info
:return:
"""
wf_request_id = -1
fake_request = WorkflowRequest(workflow_name="carta", workflow_request_id=wf_request_id)
fake_cr_version = CapabilityVersion(capability_request_id=wf_request_id)
carta_url = "decartes_image_carta_url"
fake_carta_ready_msg = {
"service": "capability",
"routing_key": "capability",
"carta_url": carta_url,
"subject": "Your CARTA Session is ready!",
"type": "carta-ready",
"wf_request_id": wf_request_id,
}
save_entity_old_call_count = mock_capability_info.save_entity.call_count
with patch(
"workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request",
return_value=fake_request,
):
with patch(
"workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request_version",
return_value=fake_cr_version,
) as mock_cr_version:
mock_capability_service.on_carta_ready(**fake_carta_ready_msg)
assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1
mock_cr_version.assert_called()
(request,) = mock_capability_info.save_entity.call_args.args
assert request.carta_url == carta_url
......@@ -5,6 +5,8 @@ import sys
from datetime import datetime
from typing import Dict, List
# pylint: disable=C0103, C0301, C0303, E0401, R0205, R0903, R0904, R0913, W1203
from sqlalchemy import desc, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
......@@ -26,8 +28,6 @@ from workspaces.capability.schema_interfaces import (
from workspaces.capability.services.interfaces import CapabilityInfoIF, QueueReport
from workspaces.products.schema_interfaces import FutureProductIF
# pylint: disable=E0401, R0205, R0903
Base = declarative_base()
......@@ -196,6 +196,16 @@ class CapabilityInfo(CapabilityInfoIF):
"""
return self.session.query(CapabilityRequest).filter_by(id=request_id).first()
def lookup_capability_request_version(self, request_id: int) -> CapabilityVersion:
"""
Finds the capability request version with the provided request ID
:param request_id: capability request ID of interest
:return:
"""
return self.session.query(CapabilityVersion).filter_by(capability_request_id=request_id).first()
def lookup_capability_execution(self, execution_id: int) -> CapabilityExecution:
"""
Finds the capability execution with the provided request ID
......
import logging
from typing import Dict, List
# pylint: disable=E0401, R0903, W1203
import transaction
from messaging.messenger import MessageSender
......@@ -39,11 +41,15 @@ class CapabilityService(CapabilityServiceIF):
self.messenger = MessageSender("capability")
self.router = Router("capability")
self.router.register(self)
self.execution_manager = ExecutionManager(capability_info, workflow_service, self.router, self.messenger)
self.execution_manager = ExecutionManager(
capability_info, workflow_service, self.router, self.messenger
)
self.capability_info = capability_info
self.notify = notification_service
def create_new_version(self, capability_request_id: int, parameters: List[Parameter] = None) -> CapabilityVersionIF:
def create_new_version(
self, capability_request_id: int, parameters: List[Parameter] = None
) -> CapabilityVersionIF:
logger.info(f"Creating new version of Capability Request #{capability_request_id}")
return self.capability_info.create_capability_version(capability_request_id, parameters)
......@@ -52,13 +58,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}")
execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Complete.name
self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"capability_complete"
)
capability_complete_msg = CapabilityMessageArchitect(
request=capability_request
).compose_message("capability_complete")
self.messenger.send_message(**capability_complete_msg)
@on_message(type="execution-failed")
......@@ -66,13 +74,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-FAILED: {message}")
execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Failed.name
self.capability_info.save_entity(capability_request)
capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"capability_failed"
)
capability_failed_msg = CapabilityMessageArchitect(
request=capability_request
).compose_message("capability_failed")
self.messenger.send_message(**capability_failed_msg)
@on_message(service="workflow", type="delivery")
......@@ -129,7 +139,9 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}")
subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
self.notify.notify_qa_ready(execution.capability_request)
@on_message(service="workflow", type="ingestion-complete")
......@@ -149,6 +161,24 @@ class CapabilityService(CapabilityServiceIF):
request.ingested = True
self.capability_info.save_entity(request)
@on_message(type="carta-ready")
def on_carta_ready(self, **message: Dict[str, str]):
"""
Catch the RH-flavored event and save it to the capability request version metadata
:param message: Ingestion-complete message
:return:
"""
logger.info(f"RECEIVED CARTA READY MESSAGE: {message}")
wf_request_id = int(message["wf_request_id"])
request = self.capability_info.lookup_capability_request(wf_request_id)
request_version = self.capability_info.lookup_capability_request_version(wf_request_id)
request.version_number = request_version
request.carta_url = message["carta_url"]
self.capability_info.save_entity(request)
class CapabilityLauncher:
"""
......
......@@ -13,6 +13,8 @@ from workspaces.capability.schema_interfaces import (
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import AbstractFile
# pylint: disable=C0116, R0903
CapabilityName = str
......@@ -84,13 +86,11 @@ class CapabilityQueueManagerIF(ABC):
"""
Notifies the queue manager that something new is requesting execution.
"""
pass
def execution_complete(self):
"""
Notifies the queue manager that something has finished executing.
"""
pass
def reconfigure(self):
"""
......@@ -169,12 +169,18 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
def lookup_capability_request(self, request_id) -> CapabilityRequestIF:
raise NotImplementedError
@abstractmethod
def lookup_capability_request_version(self, request_id) -> CapabilityRequestIF:
raise NotImplementedError
@abstractmethod
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int) -> CapabilityExecutionIF:
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
......@@ -210,7 +216,5 @@ class CapabilityQueueIF(ABC):
class CapabilityServiceIF(ABC):
"""
The capability service: clients access this to request capability runs
The capability service: clients access this to request capability runs. Just a phantom.
"""
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment