Skip to content
Snippets Groups Projects
Commit cb53de56 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Message Architect

parent d2a7e499
No related branches found
No related tags found
1 merge request!488Message Architect
Pipeline #2855 passed
"""
Tests for capability.message_architect
"""
from datetime import datetime
from typing import List
import pytest
import pytest as pytest
from immutable_views import DictView
from workspaces.capability.message_architect import CapabilityMessageArchitect
from workspaces.capability.schema import CapabilityRequest, CapabilityExecution, CapabilityVersion
request = CapabilityRequest(
id=-1,
state="Creates",
capability_name="null",
versions=[CapabilityVersion(version_number=1, parameters={})],
ingested=False,
created_at=datetime.now(),
updated_at=datetime.now(),
)
execution = CapabilityExecution(
state="Created",
version=CapabilityVersion(version_number=1, parameters={}),
current_step_idx=0,
steps="steps",
created_at=datetime.now(),
updated_at=datetime.now(),
)
class TestCapabilityMessageArchitect:
@pytest.mark.skip("tests hanging in pipeline")
def test_get_message_template(self, mock_capability_requests: List[CapabilityRequest]):
execution_msg = CapabilityMessageArchitect(execution=execution).get_message_template("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "execution-failed",
}
)
request_msg = CapabilityMessageArchitect(request=mock_capability_requests[0]).get_message_template(
"capability_complete"
)
assert request_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "capability-complete",
}
)
@pytest.mark.skip("tests hanging in pipeline")
def test_compose_msg(self):
execution_msg = CapabilityMessageArchitect(execution=execution).compose_message("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": execution,
"type": "execution-failed",
}
)
"""
Tests for workflow.message_architect
"""
from datetime import datetime
import pytest
from immutable_views import DictView
from workspaces.workflow.message_architect import WorkflowMessageArchitect
from workspaces.workflow.schema import WorkflowRequest
r1 = WorkflowRequest(
workflow_request_id=-1,
workflow_name="null",
argument={"parameters": "-g"},
state="Ready",
results_dir="",
created_at=datetime(1996, 9, 10),
updated_at=datetime(1996, 9, 10),
)
class TestCapabilityMessageArchitect:
@pytest.mark.skip("tests hanging in pipeline")
def test_get_message_template(self):
execution_msg = WorkflowMessageArchitect(request=r1).get_message_template("workflow_failed")
assert execution_msg == DictView(
{
"service": "workflow",
"routing_key": "workflow",
"subject": None,
"type": "workflow-failed",
}
)
request_msg = WorkflowMessageArchitect(previous_info=r1).get_message_template("ingestion_complete")
assert request_msg == DictView(
{
"service": "workflow",
"routing_key": "workflow",
"subject": None,
"type": "ingestion-complete",
}
)
@pytest.mark.skip("tests hanging in pipeline")
def test_compose_msg(self):
execution_msg = WorkflowMessageArchitect(request=r1).compose_message("workflow_failed")
assert execution_msg == DictView(
{
"service": "workflow",
"routing_key": "workflow",
"subject": r1,
"type": "workflow-failed",
}
)
import logging
from immutable_views import DictView
from workspaces.capability.schema_interfaces import CapabilityExecutionIF, CapabilityRequestIF
from workspaces.shared_interfaces import MessageArchitectIF
logger = logging.getLogger(__name__)
"""
Class which creates an immutable dictionary of amqp message templates for easy access
"""
# DictView(Dict[msg_type, msg_content]
capability_msg_templates = DictView(
{
"capability_submitted": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "capability-submitted",
},
"capability_complete": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "capability-complete",
},
"capability_failed": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "capability-failed",
},
"execution_failed": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "execution-failed",
},
"execution_complete": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "execution-complete",
},
"qa_pass": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "qa-pass",
},
"qa_fail": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "qa-fail",
},
}
)
class CapabilityMessageArchitect(MessageArchitectIF):
def __init__(self, execution: CapabilityExecutionIF = None, request: CapabilityRequestIF = None):
self.subject = execution or request
@staticmethod
def get_message_template(msg_type: str) -> DictView:
return capability_msg_templates[msg_type]
def compose_message(self, msg_type: str) -> DictView:
template = self.get_message_template(msg_type)
template = template.copy()
template["subject"] = self.subject
return DictView(template)
...@@ -8,10 +8,10 @@ from messaging.router import Router, on_message ...@@ -8,10 +8,10 @@ from messaging.router import Router, on_message
from workspaces.capability.enums import CapabilityRequestState from workspaces.capability.enums import CapabilityRequestState
from workspaces.capability.helpers import Parameter from workspaces.capability.helpers import Parameter
from workspaces.capability.message_architect import CapabilityMessageArchitect
from workspaces.capability.schema import CapabilityRequest from workspaces.capability.schema import CapabilityRequest
from workspaces.capability.schema_interfaces import ( from workspaces.capability.schema_interfaces import (
CapabilityExecutionIF, CapabilityExecutionIF,
CapabilityRequestIF,
CapabilityVersionIF, CapabilityVersionIF,
) )
from workspaces.capability.services.execution_manager import ExecutionManager from workspaces.capability.services.execution_manager import ExecutionManager
...@@ -20,7 +20,6 @@ from workspaces.capability.services.interfaces import ( ...@@ -20,7 +20,6 @@ from workspaces.capability.services.interfaces import (
CapabilityServiceIF, CapabilityServiceIF,
) )
from workspaces.notification.services.interfaces import NotificationServiceIF from workspaces.notification.services.interfaces import NotificationServiceIF
from workspaces.products.schema import FutureProduct
from workspaces.workflow.services.interfaces import WorkflowServiceIF from workspaces.workflow.services.interfaces import WorkflowServiceIF
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -57,12 +56,9 @@ class CapabilityService(CapabilityServiceIF): ...@@ -57,12 +56,9 @@ class CapabilityService(CapabilityServiceIF):
capability_request.state = CapabilityRequestState.Complete.name capability_request.state = CapabilityRequestState.Complete.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_complete_msg = { capability_complete_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"service": "capability", "capability_complete"
"routing_key": "capability", )
"subject": execution,
"type": "capability-complete",
}
self.messenger.send_message(**capability_complete_msg) self.messenger.send_message(**capability_complete_msg)
@on_message(type="execution-failed") @on_message(type="execution-failed")
...@@ -74,12 +70,9 @@ class CapabilityService(CapabilityServiceIF): ...@@ -74,12 +70,9 @@ class CapabilityService(CapabilityServiceIF):
capability_request.state = CapabilityRequestState.Failed.name capability_request.state = CapabilityRequestState.Failed.name
self.capability_info.save_entity(capability_request) self.capability_info.save_entity(capability_request)
capability_failed_msg = { capability_failed_msg = CapabilityMessageArchitect(request=capability_request).compose_message(
"service": "capability", "capability_failed"
"routing_key": "capability", )
"subject": capability_request,
"type": "capability-failed",
}
self.messenger.send_message(**capability_failed_msg) self.messenger.send_message(**capability_failed_msg)
@on_message(service="workflow", type="delivery") @on_message(service="workflow", type="delivery")
...@@ -120,7 +113,7 @@ class CapabilityService(CapabilityServiceIF): ...@@ -120,7 +113,7 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY-COMPLETE: {message}") logger.info(f"RECEIVED CAPABILITY-COMPLETE: {message}")
subject = message["subject"] subject = message["subject"]
request = self.capability_info.lookup_capability_request(subject["capability_request_id"]) request = self.capability_info.lookup_capability_request(subject["id"])
self.notify.notify_complete(request) self.notify.notify_complete(request)
@on_message(type="capability-failed") @on_message(type="capability-failed")
...@@ -139,15 +132,6 @@ class CapabilityService(CapabilityServiceIF): ...@@ -139,15 +132,6 @@ class CapabilityService(CapabilityServiceIF):
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"]) execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
self.notify.notify_qa_ready(execution.capability_request) self.notify.notify_qa_ready(execution.capability_request)
# sending step complete notification until QA is properly implemented
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "step-complete",
}
self.messenger.send_message(**step_complete_msg)
@on_message(service="workflow", type="ingestion-complete") @on_message(service="workflow", type="ingestion-complete")
def on_ingestion_complete(self, **message: Dict[str, str]): def on_ingestion_complete(self, **message: Dict[str, str]):
""" """
...@@ -198,12 +182,9 @@ class CapabilityLauncher: ...@@ -198,12 +182,9 @@ class CapabilityLauncher:
transaction.commit() transaction.commit()
# now pass it back to the CapabilityService to do the next thing # now pass it back to the CapabilityService to do the next thing
capability_submitted_msg = { capability_submitted_msg = CapabilityMessageArchitect(execution=execution).compose_message(
"service": "capability", "capability_submitted"
"routing_key": "capability", )
"subject": execution.__json__(),
"type": "capability-submitted",
}
self.message_router.send_message(**capability_submitted_msg) self.message_router.send_message(**capability_submitted_msg)
return execution return execution
...@@ -13,6 +13,7 @@ from workspaces.capability.enums import ( ...@@ -13,6 +13,7 @@ from workspaces.capability.enums import (
ExecutionState, ExecutionState,
) )
from workspaces.capability.helpers import CapabilitySequence from workspaces.capability.helpers import CapabilitySequence
from workspaces.capability.message_architect import CapabilityMessageArchitect
from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.schema_interfaces import CapabilityExecutionIF
from workspaces.capability.services.interfaces import ( from workspaces.capability.services.interfaces import (
CapabilityInfoIF, CapabilityInfoIF,
...@@ -135,12 +136,7 @@ class ExecutionManager(ExecutionManagerIF): ...@@ -135,12 +136,7 @@ class ExecutionManager(ExecutionManagerIF):
if execution: if execution:
execution.state = ExecutionState.Failed.name execution.state = ExecutionState.Failed.name
execution_failed_msg = { execution_failed_msg = CapabilityMessageArchitect(execution=execution).compose_message("execution_failed")
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "execution-failed",
}
self.messenger.send_message(**execution_failed_msg) self.messenger.send_message(**execution_failed_msg)
self.capability_info.save_execution(execution) self.capability_info.save_execution(execution)
else: else:
...@@ -259,12 +255,7 @@ class ExecutionManager(ExecutionManagerIF): ...@@ -259,12 +255,7 @@ class ExecutionManager(ExecutionManagerIF):
""" """
execution.state = ExecutionState.Complete.name execution.state = ExecutionState.Complete.name
execution_complete_msg = { execution_complete_msg = CapabilityMessageArchitect(execution=execution).compose_message("execution_complete")
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "execution-complete",
}
self.messenger.send_message(**execution_complete_msg) self.messenger.send_message(**execution_complete_msg)
return execution return execution
......
from abc import ABC, abstractmethod
from immutable_views import DictView
class MessageArchitectIF(ABC):
"""
Generic methods to build AMQP Messages
"""
@staticmethod
@abstractmethod
def get_message_template(msg_type: str) -> DictView:
"""
Retrieves a DictView of the template for the message type provided
:param msg_type: type of message to send
:return: template DictView
"""
raise NotImplementedError
@abstractmethod
def compose_message(self, msg_type: str) -> DictView:
"""
Uses a templated message to construct a real AMQP message and returns it
:param msg_type: type of message to send
:return: message DictView
"""
raise NotImplementedError
import logging
from immutable_views import DictView
from workspaces.shared_interfaces import MessageArchitectIF
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
logger = logging.getLogger(__name__)
"""
Class which create an immutable dictionary of amqp message templates for easy access
"""
workflow_msg_templates = DictView(
{
"workflow_failed": {
"service": "workflow",
"routing_key": "workflow",
"subject": None,
"type": "workflow-failed",
},
"qa_ready": {
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "qa_ready",
},
"delivery": {
"subject": None,
"service": "workflow",
"type": "delivery",
"delivery": None,
},
"ingestion_complete": {
"service": "workflow",
"routing_key": "workflow",
"subject": None,
"type": "ingestion-complete",
},
}
)
archive_msg_templates = DictView(
{
"carta_ready": {
"service": "archive",
"routing_key": None,
"application": "workflow",
"carta_url": None,
"subject": None,
"message": "carta ready",
},
"carta_failed": {
"service": "archive",
"routing_key": None,
"application": "workflow",
"carta_url": None,
"subject": None,
"message": "carta failed",
},
"seci_failed": {
"service": "archive",
"routing_key": None,
"application": "workflow",
"subject": None,
"message": "seci failed",
"status": "failed",
},
"seci_complete": {
"service": "archive",
"routing_key": None,
"application": "workflow",
"subject": None,
"message": "seci complete",
"status": "complete",
},
}
)
class WorkflowMessageArchitect(MessageArchitectIF):
def __init__(
self,
request: WorkflowRequestIF = None,
previous_info=None,
delivery_info=None,
):
self.subject = request or previous_info
self.delivery = delivery_info
@staticmethod
def get_message_template(msg_type: str) -> DictView:
return workflow_msg_templates[msg_type]
def compose_message(self, msg_type: str) -> DictView:
template = self.get_message_template(msg_type)
template = template.copy()
template["subject"] = self.subject
if self.delivery is not None:
template["delivery"] = self.delivery
return DictView(template)
class ArchiveMessageArchitect(MessageArchitectIF):
def __init__(self, routing_key: str, request: WorkflowRequestIF, carta_url: str = None):
self.key = routing_key
self.subject = request
self.carta_url = carta_url
@staticmethod
def get_message_template(msg_type: str) -> DictView:
return archive_msg_templates[msg_type]
def compose_message(self, msg_type: str) -> DictView:
template = self.get_message_template(msg_type)
template = template.copy()
template["subject"] = self.subject
template["routing_key"] = self.key
if self.carta_url is not None:
template["carta_url"] = self.carta_url
return DictView(template)
...@@ -18,6 +18,7 @@ from pycapo import CapoConfig ...@@ -18,6 +18,7 @@ from pycapo import CapoConfig
from requests import Response from requests import Response
from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState
from workspaces.workflow.message_architect import ArchiveMessageArchitect, WorkflowMessageArchitect
from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
...@@ -184,16 +185,11 @@ class WorkflowService(WorkflowServiceIF): ...@@ -184,16 +185,11 @@ class WorkflowService(WorkflowServiceIF):
:param carta_url: JSON blob with CARTA URL :param carta_url: JSON blob with CARTA URL
""" """
logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!") logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!")
workflow_request = self.info.lookup_workflow_request(workflow_request_id) wf_request = self.info.lookup_workflow_request(workflow_request_id)
routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}" routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}"
carta_url_msg = { carta_url_msg = ArchiveMessageArchitect(
"service": "archive", routing_key=routing_key, request=wf_request, carta_url=carta_url
"routing_key": routing_key, ).compose_message("carta_ready")
"application": "workflow",
"carta_url": carta_url,
"subject": workflow_request.__json__(),
"message": "carta ready",
}
self.archive_messenger.send_message(**carta_url_msg) self.archive_messenger.send_message(**carta_url_msg)
def execute(self, request: WorkflowRequest): def execute(self, request: WorkflowRequest):
...@@ -339,12 +335,7 @@ class WorkflowService(WorkflowServiceIF): ...@@ -339,12 +335,7 @@ class WorkflowService(WorkflowServiceIF):
else: else:
logger.error(wf_json.decode()) logger.error(wf_json.decode())
logger.info("SENDING WORKFLOW FAIL MESSAGE!") logger.info("SENDING WORKFLOW FAIL MESSAGE!")
failed_msg = { failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed")
"service": "workflow",
"routing_key": "workflow",
"subject": wf_request,
"type": "workflow-failed",
}
self.messenger.send_message(**failed_msg) self.messenger.send_message(**failed_msg)
return wf_request return wf_request
...@@ -465,14 +456,9 @@ class WorkflowService(WorkflowServiceIF): ...@@ -465,14 +456,9 @@ class WorkflowService(WorkflowServiceIF):
] ]
def announce_qa_ready(self, wf_request_id): def announce_qa_ready(self, wf_request_id):
wf_req = self.info.lookup_workflow_request(wf_request_id) wf_request = self.info.lookup_workflow_request(wf_request_id)
logger.info("ANNOUNCING QA READY!") logger.info("ANNOUNCING QA READY!")
qa_ready_msg = { qa_ready_msg = WorkflowMessageArchitect(request=wf_request).compose_message("qa_ready")
"service": "capability",
"routing_key": "capability",
"subject": wf_req.__json__(),
"type": "qa_ready",
}
self.messenger.send_message(**qa_ready_msg) self.messenger.send_message(**qa_ready_msg)
...@@ -495,10 +481,10 @@ class WorkflowMessageHandler: ...@@ -495,10 +481,10 @@ class WorkflowMessageHandler:
""" """
logger.info("RECEIVED WORKFLOW COMPLETE MESSAGE. SENDING DELIVERY MESSAGE.") logger.info("RECEIVED WORKFLOW COMPLETE MESSAGE. SENDING DELIVERY MESSAGE.")
# look up the workflow request to get the path # look up the workflow request to get the path
request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"]) wf_request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"])
# find the path to the delivery.json file # find the path to the delivery.json file
delivery_file = pathlib.Path(request.results_dir) / "delivery.json" delivery_file = pathlib.Path(wf_request.results_dir) / "delivery.json"
# does it exist? if so, we need to send out a new event with its contents # does it exist? if so, we need to send out a new event with its contents
if delivery_file.exists(): if delivery_file.exists():
...@@ -507,12 +493,9 @@ class WorkflowMessageHandler: ...@@ -507,12 +493,9 @@ class WorkflowMessageHandler:
delivery_info = json.load(delivery_fo) delivery_info = json.load(delivery_fo)
# construct the message # construct the message
message = { message = WorkflowMessageArchitect(
"subject": message["subject"], previous_info=message["subject"], delivery_info=delivery_info
"service": "workflow", ).compose_message("delivery")
"type": "delivery",
"delivery": delivery_info,
}
# send the message # send the message
self.messenger.send_message(**message) self.messenger.send_message(**message)
...@@ -528,18 +511,15 @@ class WorkflowMessageHandler: ...@@ -528,18 +511,15 @@ class WorkflowMessageHandler:
""" """
subject = message["subject"] subject = message["subject"]
wf_req_id = subject["workflow_request_id"] wf_req_id = subject["workflow_request_id"]
request = self.info.lookup_workflow_request(wf_req_id) wf_request = self.info.lookup_workflow_request(wf_req_id)
if "ingest" in request.workflow_name: if "ingest" in wf_request.workflow_name:
logger.info("SENDING INGESTION COMPLETE MESSAGE!") logger.info("SENDING INGESTION COMPLETE MESSAGE!")
subject["execution_wf_id"] = request.argument["parent_wf_request_id"] subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"]
ingestion_complete_msg = { ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message(
"subject": subject, "ingestion_complete"
"service": "workflow", )
"routing_key": "workflow",
"type": "ingestion-complete",
}
self.messenger.send_message(**ingestion_complete_msg) self.messenger.send_message(**ingestion_complete_msg)
@on_message(service="workflow") @on_message(service="workflow")
...@@ -605,49 +585,34 @@ class WorkflowMessageHandler: ...@@ -605,49 +585,34 @@ class WorkflowMessageHandler:
def send_archive_failed_event(self, **message: Dict): def send_archive_failed_event(self, **message: Dict):
subject = message["subject"] subject = message["subject"]
wf_id = subject["workflow_request_id"] wf_id = subject["workflow_request_id"]
request = self.info.lookup_workflow_request(wf_id) wf_request = self.info.lookup_workflow_request(wf_id)
if request.workflow_name == ArchiveWorkflows.CARTA.value and request.argument["need_data"] is True: if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["need_data"] is True:
logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!") logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!")
routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" routing_key = f"ws-workflow.carta-instance-ready.{wf_id}"
carta_url_msg = { carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"service": "archive", "carta_failed"
"routing_key": routing_key, )
"application": "workflow",
"carta_url": None,
"subject": request.__json__(),
"message": "carta failed",
}
self.archive_messenger.send_message(**carta_url_msg) self.archive_messenger.send_message(**carta_url_msg)
if request.workflow_name == ArchiveWorkflows.SECI.value: if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!")
routing_key = f"ws-workflow.seci.{wf_id}" routing_key = f"ws-workflow.seci.{wf_id}"
seci_msg = { seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"service": "archive", "seci_failed"
"routing_key": routing_key, )
"application": "workflow",
"subject": request.__json__(),
"message": "seci failed",
"status": "failed",
}
self.archive_messenger.send_message(**seci_msg) self.archive_messenger.send_message(**seci_msg)
def send_archive_complete_event(self, **message: Dict): def send_archive_complete_event(self, **message: Dict):
subject = message["subject"] subject = message["subject"]
wf_id = subject["workflow_request_id"] wf_id = subject["workflow_request_id"]
request = self.info.lookup_workflow_request(wf_id) wf_request = self.info.lookup_workflow_request(wf_id)
if request.workflow_name == ArchiveWorkflows.SECI.value: if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!")
routing_key = f"ws-workflow.seci.{wf_id}" routing_key = f"ws-workflow.seci.{wf_id}"
seci_msg = { seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
"service": "archive", "seci_complete"
"routing_key": routing_key, )
"application": "workflow",
"subject": request.__json__(),
"message": "seci complete",
"status": "complete",
}
self.archive_messenger.send_message(**seci_msg) self.archive_messenger.send_message(**seci_msg)
def clean_remote_workflow(self, request: WorkflowRequestIF): def clean_remote_workflow(self, request: WorkflowRequestIF):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment