diff --git a/shared/channels/amqp_helpers.py b/shared/channels/amqp_helpers.py index e73e900c21fede0963bd85c8679cff4b48f29c70..4b9cf84820748c3042bfe66c4975666ffb7334ba 100644 --- a/shared/channels/amqp_helpers.py +++ b/shared/channels/amqp_helpers.py @@ -1,7 +1,9 @@ -from typing import Callable +from typing import Callable, Any import pika +from marshmallow import Schema from pycapo import CapoConfig +from workspaces.json import WorkflowEventSchema def make_amqp_connection(profile: str) -> pika.BlockingConnection: @@ -82,3 +84,41 @@ def receive_amqp_message( channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) channel.start_consuming() + + +class Channel: + def __init__(self, schema: Schema): + self.schema = schema + + def connect(self): + # FIXME implement this + pass + + def close(self): + # FIXME implement this + pass + + def send(self, event): + # I don't know how to type Event there for Python but it would be + # nice if we could, somehow. + rendered = self.schema.dump(event) + # FIXME: transmit rendered over the underlying channel + pass + + def listen(self, callback: Callable[[Any], None], pattern: str = '#'): + def unwrapping_callback(message): + event = self.schema.load(message) + callback(event) + # FIXME: ask the underlying channel to call us back + # but only for messages matching pattern + pass + + def __enter__(self): + self.connect() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +# Predefined event channels for ease of use +workflow_events = Channel(WorkflowEventSchema())