Skip to content
Snippets Groups Projects
Commit 00cca147 authored by Janet Goldstein's avatar Janet Goldstein
Browse files

WS-651: catches and saves RH CARTA event

* implemented lookup_capability_request_version
* implemented on_carta_ready
* Black reformatting
parent e1073567
No related branches found
No related tags found
1 merge request!505WS-651: catches and saves RH CARTA event
This commit is part of merge request !505. Comments created here will be created in the context of that merge request.
""" Unit tests for Capability Service """
from unittest.mock import patch
# pylint: disable=C0301, E0401, R0201
import pytest
from workspaces.capability.schema import CapabilityExecution
from workspaces.capability.schema import CapabilityExecution, CapabilityVersion
from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService
from workspaces.workflow.schema import WorkflowRequest
pytest_plugins = ["testing.utils.conftest"]
......@@ -35,3 +40,45 @@ class TestCapabilityService:
assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1
(request,) = mock_capability_info.save_entity.call_args.args
assert request.ingested is True
def test_on_carta_ready(
self,
mock_capability_service: CapabilityService,
mock_capability_info: CapabilityInfo,
):
"""
Are we catching the "carta-ready" message?
:param mock_capability_service: stand-in for capability service
:param mock_capability_info: stand-in for capability info
:return:
"""
wf_request_id = -1
fake_request = WorkflowRequest(workflow_name="carta", workflow_request_id=wf_request_id)
fake_cr_version = CapabilityVersion(capability_request_id=wf_request_id)
carta_url = "decartes_image_carta_url"
fake_carta_ready_msg = {
"service": "capability",
"routing_key": "capability",
"carta_url": carta_url,
"subject": "Your CARTA Session is ready!",
"type": "carta-ready",
"wf_request_id": wf_request_id,
}
save_entity_old_call_count = mock_capability_info.save_entity.call_count
with patch(
"workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request",
return_value=fake_request,
):
with patch(
"workspaces.capability.services.capability_info.CapabilityInfo.lookup_capability_request_version",
return_value=fake_cr_version,
) as mock_cr_version:
mock_capability_service.on_carta_ready(**fake_carta_ready_msg)
assert mock_capability_info.save_entity.call_count == save_entity_old_call_count + 1
mock_cr_version.assert_called()
(request,) = mock_capability_info.save_entity.call_args.args
assert request.carta_url == carta_url
......@@ -5,6 +5,8 @@ import sys
from datetime import datetime
from typing import Dict, List
# pylint: disable=C0103, C0301, C0303, E0401, R0205, R0903, R0904, R0913, W1203
from sqlalchemy import desc, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
......@@ -26,8 +28,6 @@ from workspaces.capability.schema_interfaces import (
from workspaces.capability.services.interfaces import CapabilityInfoIF, QueueReport
from workspaces.products.schema_interfaces import FutureProductIF
# pylint: disable=E0401, R0205, R0903
Base = declarative_base()
......@@ -196,6 +196,16 @@ class CapabilityInfo(CapabilityInfoIF):
"""
return self.session.query(CapabilityRequest).filter_by(id=request_id).first()
def lookup_capability_request_version(self, request_id: int) -> CapabilityVersion:
"""
Finds the capability request version with the provided request ID
:param request_id: capability request ID of interest
:return:
"""
return self.session.query(CapabilityVersion).filter_by(capability_request_id=request_id).first()
def lookup_capability_execution(self, execution_id: int) -> CapabilityExecution:
"""
Finds the capability execution with the provided request ID
......
import logging
from typing import Dict, List
# pylint: disable=E0401, R0903, W1203
import transaction
from messaging.messenger import MessageSender
......@@ -39,11 +41,15 @@ class CapabilityService(CapabilityServiceIF):
self.messenger = MessageSender("capability")
self.router = Router("capability")
self.router.register(self)
self.execution_manager = ExecutionManager(capability_info, workflow_service, self.router, self.messenger)
self.execution_manager = ExecutionManager(
capability_info, workflow_service, self.router, self.messenger
)
self.capability_info = capability_info
self.notify = notification_service
def create_new_version(self, capability_request_id: int, parameters: List[Parameter] = None) -> CapabilityVersionIF:
def create_new_version(
self, capability_request_id: int, parameters: List[Parameter] = None
) -> CapabilityVersionIF:
logger.info(f"Creating new version of Capability Request #{capability_request_id}")
return self.capability_info.create_capability_version(capability_request_id, parameters)
......@@ -52,13 +58,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}")
execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Complete.name
self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"capability_complete"
)
capability_complete_msg = CapabilityMessageArchitect(
request=capability_request
).compose_message("capability_complete")
self.messenger.send_message(**capability_complete_msg)
@on_message(type="execution-failed")
......@@ -66,13 +74,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-FAILED: {message}")
execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Failed.name
self.capability_info.save_entity(capability_request)
capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"capability_failed"
)
capability_failed_msg = CapabilityMessageArchitect(
request=capability_request
).compose_message("capability_failed")
self.messenger.send_message(**capability_failed_msg)
@on_message(service="workflow", type="delivery")
......@@ -129,7 +139,9 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}")
subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
self.notify.notify_qa_ready(execution.capability_request)
@on_message(service="workflow", type="ingestion-complete")
......@@ -149,6 +161,24 @@ class CapabilityService(CapabilityServiceIF):
request.ingested = True
self.capability_info.save_entity(request)
@on_message(type="carta-ready")
def on_carta_ready(self, **message: Dict[str, str]):
"""
Catch the RH-flavored event and save it to the capability request version metadata
:param message: Ingestion-complete message
:return:
"""
logger.info(f"RECEIVED CARTA READY MESSAGE: {message}")
wf_request_id = int(message["wf_request_id"])
request = self.capability_info.lookup_capability_request(wf_request_id)
request_version = self.capability_info.lookup_capability_request_version(wf_request_id)
request.version_number = request_version
request.carta_url = message["carta_url"]
self.capability_info.save_entity(request)
class CapabilityLauncher:
"""
......
......@@ -13,6 +13,8 @@ from workspaces.capability.schema_interfaces import (
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import AbstractFile
# pylint: disable=C0116, R0903
CapabilityName = str
......@@ -84,13 +86,11 @@ class CapabilityQueueManagerIF(ABC):
"""
Notifies the queue manager that something new is requesting execution.
"""
pass
def execution_complete(self):
"""
Notifies the queue manager that something has finished executing.
"""
pass
def reconfigure(self):
"""
......@@ -169,12 +169,18 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
def lookup_capability_request(self, request_id) -> CapabilityRequestIF:
raise NotImplementedError
@abstractmethod
def lookup_capability_request_version(self, request_id) -> CapabilityRequestIF:
raise NotImplementedError
@abstractmethod
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int) -> CapabilityExecutionIF:
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod
......@@ -210,7 +216,5 @@ class CapabilityQueueIF(ABC):
class CapabilityServiceIF(ABC):
"""
The capability service: clients access this to request capability runs
The capability service: clients access this to request capability runs. Just a phantom.
"""
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment