From 0d2ed104a11185032e25c77fefe234616f0ba7c1 Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Tue, 8 Sep 2020 17:00:37 -0600 Subject: [PATCH] Add a sketch of a channel API for simple use cases in other projects --- shared/channels/amqp_helpers.py | 42 ++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/shared/channels/amqp_helpers.py b/shared/channels/amqp_helpers.py index e73e900c2..4b9cf8482 100644 --- a/shared/channels/amqp_helpers.py +++ b/shared/channels/amqp_helpers.py @@ -1,7 +1,9 @@ -from typing import Callable +from typing import Callable, Any import pika +from marshmallow import Schema from pycapo import CapoConfig +from workspaces.json import WorkflowEventSchema def make_amqp_connection(profile: str) -> pika.BlockingConnection: @@ -82,3 +84,41 @@ def receive_amqp_message( channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) channel.start_consuming() + + +class Channel: + def __init__(self, schema: Schema): + self.schema = schema + + def connect(self): + # FIXME implement this + pass + + def close(self): + # FIXME implement this + pass + + def send(self, event): + # I don't know how to type Event there for Python but it would be + # nice if we could, somehow. + rendered = self.schema.dump(event) + # FIXME: transmit rendered over the underlying channel + pass + + def listen(self, callback: Callable[[Any], None], pattern: str = '#'): + def unwrapping_callback(message): + event = self.schema.load(message) + callback(event) + # FIXME: ask the underlying channel to call us back + # but only for messages matching pattern + pass + + def __enter__(self): + self.connect() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + +# Predefined event channels for ease of use +workflow_events = Channel(WorkflowEventSchema()) -- GitLab