Skip to content
Snippets Groups Projects

Message Architect

Merged Charlotte Hausman requested to merge messaging_architect into main
Files
2
"""
Tests for capability.message_architect
"""
from datetime import datetime
from typing import List
import pytest as pytest
import pytest
from immutable_views import DictView
from workspaces.capability.message_architect import CapabilityMessageArchitect
from workspaces.capability.schema import CapabilityRequest, CapabilityExecution
from workspaces.capability.schema import CapabilityRequest, CapabilityExecution, CapabilityVersion
pytest_plugins = ["testing.utils.conftest"]
request = CapabilityRequest(
id=-1,
state="Creates",
capability_name="null",
versions=[CapabilityVersion(version_number=1, parameters={})],
ingested=False,
created_at=datetime.now(),
updated_at=datetime.now(),
)
execution = CapabilityExecution(
state="Created",
version=CapabilityVersion(version_number=1, parameters={}),
current_step_idx=0,
steps="steps",
created_at=datetime.now(),
updated_at=datetime.now(),
)
class TestCapabilityMessageArchitect:
@pytest.mark.usefixtures("mock_capability_requests", "mock_capability_execution")
def test_get_message_template(
self, mock_capability_requests: List[CapabilityRequest], mock_capability_execution: CapabilityExecution
):
execution_msg = CapabilityMessageArchitect(execution=mock_capability_execution).get_message_template(
"execution_failed"
)
@pytest.mark.usefixtures("mock_capability_requests")
def test_get_message_template(self, mock_capability_requests: List[CapabilityRequest]):
execution_msg = CapabilityMessageArchitect(execution=execution).get_message_template("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
@@ -42,17 +56,13 @@ class TestCapabilityMessageArchitect:
}
)
def test_compose_msg(
self, mock_capability_requests: List[CapabilityRequest], mock_capability_execution: CapabilityExecution
):
execution_msg = CapabilityMessageArchitect(execution=mock_capability_execution).compose_message(
"execution_failed"
)
def test_compose_msg(self):
execution_msg = CapabilityMessageArchitect(execution=execution).compose_message("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": mock_capability_execution,
"subject": execution,
"type": "execution-failed",
}
)
Loading