Skip to content
Snippets Groups Projects
Commit 0d2ed104 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add a sketch of a channel API for simple use cases in other projects

parent 344025e4
No related branches found
No related tags found
No related merge requests found
from typing import Callable
from typing import Callable, Any
import pika
from marshmallow import Schema
from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
......@@ -82,3 +84,41 @@ def receive_amqp_message(
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
channel.start_consuming()
class Channel:
def __init__(self, schema: Schema):
self.schema = schema
def connect(self):
# FIXME implement this
pass
def close(self):
# FIXME implement this
pass
def send(self, event):
# I don't know how to type Event there for Python but it would be
# nice if we could, somehow.
rendered = self.schema.dump(event)
# FIXME: transmit rendered over the underlying channel
pass
def listen(self, callback: Callable[[Any], None], pattern: str = '#'):
def unwrapping_callback(message):
event = self.schema.load(message)
callback(event)
# FIXME: ask the underlying channel to call us back
# but only for messages matching pattern
pass
def __enter__(self):
self.connect()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# Predefined event channels for ease of use
workflow_events = Channel(WorkflowEventSchema())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment