Skip to content
Snippets Groups Projects
Commit 8622dcf8 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add terrible, awful, horrible, no good very bad unit test

parent bc1b0b4e
No related branches found
No related tags found
No related merge requests found
from channels.amqp_helpers import workflow_events
from channels.amqp_helpers import workflow_events, CONN, WorkflowEventChannel
from workspaces.schema import WorkflowEvent, WorkflowEventType
def show_log(log):
print(log)
def test_workflow_event():
def test_workflow_event_sending():
workflow_events.connect(profile='local')
channel = workflow_events.chan.connection.channel()
queue = channel.queue_declare(queue='', exclusive=True).method.queue
channel.queue_bind(queue=queue, exchange=WorkflowEventChannel.EXCHANGE, routing_key='#')
workflow_events.send(WorkflowEvent('name', 'id', WorkflowEventType.OTHER, '2020-01-01', 'Nothing to log'))
#workflow_events.listen(show_log, auto_ack=True) # need to make this synchronous somehow
method, header, body = channel.basic_get(queue)
if method:
print(body)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment