Skip to content
Snippets Groups Projects
Commit 64a8929a authored by Janet Goldstein's avatar Janet Goldstein
Browse files

Merge branch 'WS-651-catch-and-save-RH-CARTA-event' of...

Merge branch 'WS-651-catch-and-save-RH-CARTA-event' of https://gitlab.nrao.edu/ssa/workspaces into WS-651-catch-and-save-RH-CARTA-event
parents 50c27377 957e068d
No related branches found
No related tags found
1 merge request!505WS-651: catches and saves RH CARTA event
Pipeline #2909 canceled
""" Unit tests for Capability Service """ """ Unit tests for Capability Service """
from unittest.mock import patch
from unittest.mock import patch
# pylint: disable=C0301, E0401, R0201 # pylint: disable=C0301, E0401, R0201
import pytest import pytest
from workspaces.capability.schema import CapabilityExecution from workspaces.capability.schema import CapabilityExecution, CapabilityVersion
from workspaces.capability.services.capability_info import CapabilityInfo from workspaces.capability.services.capability_info import CapabilityInfo
from workspaces.capability.services.capability_service import CapabilityService from workspaces.capability.services.capability_service import CapabilityService
from workspaces.workflow.schema import WorkflowRequest
pytest_plugins = ["testing.utils.conftest"] pytest_plugins = ["testing.utils.conftest"]
......
...@@ -196,6 +196,16 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -196,6 +196,16 @@ class CapabilityInfo(CapabilityInfoIF):
""" """
return self.session.query(CapabilityRequest).filter_by(id=request_id).first() return self.session.query(CapabilityRequest).filter_by(id=request_id).first()
def lookup_capability_request_version(self, request_id: int) -> CapabilityVersion:
"""
Finds the capability request version with the provided request ID
:param request_id: capability request ID of interest
:return:
"""
return self.session.query(CapabilityVersion).filter_by(capability_request_id=request_id).first()
def lookup_capability_execution(self, execution_id: int) -> CapabilityExecution: def lookup_capability_execution(self, execution_id: int) -> CapabilityExecution:
""" """
Finds the capability execution with the provided request ID Finds the capability execution with the provided request ID
......
...@@ -41,15 +41,11 @@ class CapabilityService(CapabilityServiceIF): ...@@ -41,15 +41,11 @@ class CapabilityService(CapabilityServiceIF):
self.messenger = MessageSender("capability") self.messenger = MessageSender("capability")
self.router = Router("capability") self.router = Router("capability")
self.router.register(self) self.router.register(self)
self.execution_manager = ExecutionManager( self.execution_manager = ExecutionManager(capability_info, workflow_service, self.router, self.messenger)
capability_info, workflow_service, self.router, self.messenger
)
self.capability_info = capability_info self.capability_info = capability_info
self.notify = notification_service self.notify = notification_service
def create_new_version( def create_new_version(self, capability_request_id: int, parameters: List[Parameter] = None) -> CapabilityVersionIF:
self, capability_request_id: int, parameters: List[Parameter] = None
) -> CapabilityVersionIF:
logger.info(f"Creating new version of Capability Request #{capability_request_id}") logger.info(f"Creating new version of Capability Request #{capability_request_id}")
return self.capability_info.create_capability_version(capability_request_id, parameters) return self.capability_info.create_capability_version(capability_request_id, parameters)
...@@ -58,15 +54,13 @@ class CapabilityService(CapabilityServiceIF): ...@@ -58,15 +54,13 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}") logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}")
execution = message["subject"] execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request( capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Complete.name capability_request.state = CapabilityRequestState.Complete.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect( capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
request=capability_request "capability_complete"
).compose_message("capability_complete") )
self.messenger.send_message(**capability_complete_msg) self.messenger.send_message(**capability_complete_msg)
@on_message(type="execution-failed") @on_message(type="execution-failed")
...@@ -74,15 +68,13 @@ class CapabilityService(CapabilityServiceIF): ...@@ -74,15 +68,13 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-FAILED: {message}") logger.info(f"RECEIVED EXECUTION-FAILED: {message}")
execution = message["subject"] execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request( capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"])
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Failed.name capability_request.state = CapabilityRequestState.Failed.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_failed_msg = CapabilityMessageArchitect( capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
request=capability_request "capability_failed"
).compose_message("capability_failed") )
self.messenger.send_message(**capability_failed_msg) self.messenger.send_message(**capability_failed_msg)
@on_message(service="workflow", type="delivery") @on_message(service="workflow", type="delivery")
...@@ -139,9 +131,7 @@ class CapabilityService(CapabilityServiceIF): ...@@ -139,9 +131,7 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}")
subject = message["subject"] subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id( execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
subject["workflow_request_id"]
)
self.notify.notify_qa_ready(execution.capability_request) self.notify.notify_qa_ready(execution.capability_request)
@on_message(service="workflow", type="ingestion-complete") @on_message(service="workflow", type="ingestion-complete")
......
...@@ -170,6 +170,10 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta): ...@@ -170,6 +170,10 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
raise NotImplementedError raise NotImplementedError
@abstractmethod
def lookup_capability_request_version(self, request_id) -> CapabilityRequestIF:
raise NotImplementedError
@abstractmethod @abstractmethod
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError raise NotImplementedError
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment