Skip to content
Snippets Groups Projects

Message Architect

Merged Charlotte Hausman requested to merge messaging_architect into main
Files
8
"""
Tests for capability.message_architect
"""
from datetime import datetime
from typing import List
import pytest
import pytest as pytest
from immutable_views import DictView
from workspaces.capability.message_architect import CapabilityMessageArchitect
from workspaces.capability.schema import CapabilityRequest, CapabilityExecution, CapabilityVersion
request = CapabilityRequest(
id=-1,
state="Creates",
capability_name="null",
versions=[CapabilityVersion(version_number=1, parameters={})],
ingested=False,
created_at=datetime.now(),
updated_at=datetime.now(),
)
execution = CapabilityExecution(
state="Created",
version=CapabilityVersion(version_number=1, parameters={}),
current_step_idx=0,
steps="steps",
created_at=datetime.now(),
updated_at=datetime.now(),
)
class TestCapabilityMessageArchitect:
@pytest.mark.skip("tests hanging in pipeline")
def test_get_message_template(self, mock_capability_requests: List[CapabilityRequest]):
execution_msg = CapabilityMessageArchitect(execution=execution).get_message_template("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "execution-failed",
}
)
request_msg = CapabilityMessageArchitect(request=mock_capability_requests[0]).get_message_template(
"capability_complete"
)
assert request_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": None,
"type": "capability-complete",
}
)
@pytest.mark.skip("tests hanging in pipeline")
def test_compose_msg(self):
execution_msg = CapabilityMessageArchitect(execution=execution).compose_message("execution_failed")
assert execution_msg == DictView(
{
"service": "capability",
"routing_key": "capability",
"subject": execution,
"type": "execution-failed",
}
)
Loading