import pytest from channels.amqp_helpers import WorkflowEventChannel, workflow_events from workspaces.workflow.enum import WorkflowEventType from workspaces.workflow.schema import WorkflowEvent @pytest.mark.skip(reason="Test requires AMQP, therefore it's an integration test") def test_workflow_event_sending(): """ Tests that a workflow event can be properly sent to the AMQP exchange, routed through a queue, and get received on the other side """ workflow_events.connect(profile="docker") channel = workflow_events.chan.connection.channel() queue = channel.queue_declare(queue="", exclusive=True).method.queue channel.queue_bind( queue=queue, exchange=WorkflowEventChannel.EXCHANGE, routing_key="#" ) workflow_events.send( WorkflowEvent( "-1", "name", "-1", WorkflowEventType.OTHER, "2020-01-01", "Nothing to log" ) ) method, header, body = channel.basic_get(queue) if method: print(body)