Skip to content
Snippets Groups Projects
Commit 7e34d16d authored by Janet Goldstein's avatar Janet Goldstein
Browse files

Merge branch 'WS-651-catch-and-save-RH-CARTA-event' of...

Merge branch 'WS-651-catch-and-save-RH-CARTA-event' of https://gitlab.nrao.edu/ssa/workspaces into WS-651-catch-and-save-RH-CARTA-event
parents 41b5fd92 16613826
No related branches found
No related tags found
1 merge request!505WS-651: catches and saves RH CARTA event
Pipeline #2926 passed
...@@ -5,6 +5,8 @@ import sys ...@@ -5,6 +5,8 @@ import sys
from datetime import datetime from datetime import datetime
from typing import Dict, List from typing import Dict, List
# pylint: disable=C0103, C0301, C0303, E0401, R0205, R0903, R0904, R0913, W1203
from sqlalchemy import desc, text from sqlalchemy import desc, text
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
...@@ -26,8 +28,6 @@ from workspaces.capability.schema_interfaces import ( ...@@ -26,8 +28,6 @@ from workspaces.capability.schema_interfaces import (
from workspaces.capability.services.interfaces import CapabilityInfoIF, QueueReport from workspaces.capability.services.interfaces import CapabilityInfoIF, QueueReport
from workspaces.products.schema_interfaces import FutureProductIF from workspaces.products.schema_interfaces import FutureProductIF
# pylint: disable=E0401, R0205, R0903
Base = declarative_base() Base = declarative_base()
......
...@@ -41,11 +41,15 @@ class CapabilityService(CapabilityServiceIF): ...@@ -41,11 +41,15 @@ class CapabilityService(CapabilityServiceIF):
self.messenger = MessageSender("capability") self.messenger = MessageSender("capability")
self.router = Router("capability") self.router = Router("capability")
self.router.register(self) self.router.register(self)
self.execution_manager = ExecutionManager(capability_info, workflow_service, self.router, self.messenger) self.execution_manager = ExecutionManager(
capability_info, workflow_service, self.router, self.messenger
)
self.capability_info = capability_info self.capability_info = capability_info
self.notify = notification_service self.notify = notification_service
def create_new_version(self, capability_request_id: int, parameters: List[Parameter] = None) -> CapabilityVersionIF: def create_new_version(
self, capability_request_id: int, parameters: List[Parameter] = None
) -> CapabilityVersionIF:
logger.info(f"Creating new version of Capability Request #{capability_request_id}") logger.info(f"Creating new version of Capability Request #{capability_request_id}")
return self.capability_info.create_capability_version(capability_request_id, parameters) return self.capability_info.create_capability_version(capability_request_id, parameters)
...@@ -54,13 +58,15 @@ class CapabilityService(CapabilityServiceIF): ...@@ -54,13 +58,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}") logger.info(f"RECEIVED EXECUTION-COMPLETE: {message}")
execution = message["subject"] execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"]) capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Complete.name capability_request.state = CapabilityRequestState.Complete.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message( capability_complete_msg = CapabilityMessageArchitect(
"capability_complete" request=capability_request
) ).compose_message("capability_complete")
self.messenger.send_message(**capability_complete_msg) self.messenger.send_message(**capability_complete_msg)
@on_message(type="execution-failed") @on_message(type="execution-failed")
...@@ -68,13 +74,15 @@ class CapabilityService(CapabilityServiceIF): ...@@ -68,13 +74,15 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED EXECUTION-FAILED: {message}") logger.info(f"RECEIVED EXECUTION-FAILED: {message}")
execution = message["subject"] execution = message["subject"]
capability_request = self.capability_info.lookup_capability_request(execution["capability_request_id"]) capability_request = self.capability_info.lookup_capability_request(
execution["capability_request_id"]
)
capability_request.state = CapabilityRequestState.Failed.name capability_request.state = CapabilityRequestState.Failed.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message( capability_failed_msg = CapabilityMessageArchitect(
"capability_failed" request=capability_request
) ).compose_message("capability_failed")
self.messenger.send_message(**capability_failed_msg) self.messenger.send_message(**capability_failed_msg)
@on_message(service="workflow", type="delivery") @on_message(service="workflow", type="delivery")
...@@ -131,7 +139,9 @@ class CapabilityService(CapabilityServiceIF): ...@@ -131,7 +139,9 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}") logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}")
subject = message["subject"] subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
self.notify.notify_qa_ready(execution.capability_request) self.notify.notify_qa_ready(execution.capability_request)
@on_message(service="workflow", type="ingestion-complete") @on_message(service="workflow", type="ingestion-complete")
......
...@@ -13,6 +13,8 @@ from workspaces.capability.schema_interfaces import ( ...@@ -13,6 +13,8 @@ from workspaces.capability.schema_interfaces import (
from workspaces.products.schema_interfaces import FutureProductIF from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import AbstractFile from workspaces.system.schema import AbstractFile
# pylint: disable=C0116, R0903
CapabilityName = str CapabilityName = str
...@@ -84,13 +86,11 @@ class CapabilityQueueManagerIF(ABC): ...@@ -84,13 +86,11 @@ class CapabilityQueueManagerIF(ABC):
""" """
Notifies the queue manager that something new is requesting execution. Notifies the queue manager that something new is requesting execution.
""" """
pass
def execution_complete(self): def execution_complete(self):
""" """
Notifies the queue manager that something has finished executing. Notifies the queue manager that something has finished executing.
""" """
pass
def reconfigure(self): def reconfigure(self):
""" """
...@@ -173,10 +173,6 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta): ...@@ -173,10 +173,6 @@ class CapabilityInfoIF(QueueReporterIF, metaclass=ABCMeta):
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int) -> CapabilityExecutionIF:
raise NotImplementedError
@abstractmethod @abstractmethod
def save_execution(self, execution: CapabilityExecutionIF): def save_execution(self, execution: CapabilityExecutionIF):
pass pass
...@@ -210,7 +206,5 @@ class CapabilityQueueIF(ABC): ...@@ -210,7 +206,5 @@ class CapabilityQueueIF(ABC):
class CapabilityServiceIF(ABC): class CapabilityServiceIF(ABC):
""" """
The capability service: clients access this to request capability runs The capability service: clients access this to request capability runs.
""" """
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment