From 73ec3a917931217ff3b85c6fc513e6e6e4e0382b Mon Sep 17 00:00:00 2001
From: Daniel K Lyons <dlyons@nrao.edu>
Date: Thu, 10 Sep 2020 16:48:13 -0600
Subject: [PATCH] Add ChannelDefinition, refactor Channel to use it

---
 shared/channels/src/channels/amqp_helpers.py | 75 ++++++++++++++------
 1 file changed, 55 insertions(+), 20 deletions(-)

diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py
index 3b509e32f..302750ca4 100644
--- a/shared/channels/src/channels/amqp_helpers.py
+++ b/shared/channels/src/channels/amqp_helpers.py
@@ -1,9 +1,12 @@
-from typing import Callable, Any, Optional, Union
+from abc import ABC, abstractmethod
+from typing import Callable, Any, Optional, Union, TypeVar, Protocol
 
 import pika
 from marshmallow import Schema
+from pika.adapters.blocking_connection import BlockingChannel
 from pycapo import CapoConfig
 from workspaces.json import WorkflowEventSchema
+from workspaces.schema import WorkflowEvent
 
 
 CONNECTION = None
@@ -11,9 +14,50 @@ CHANNEL = None
 CONFIG = None
 
 
-class Channel:
-    def __init__(self, schema: Schema):
-        self.schema = schema
+T = TypeVar('T')
+
+
+class ChannelDefinition(ABC, Protocol[T]):
+    @abstractmethod
+    def routing_key_for(self, message: T) -> str:
+        pass
+
+    @abstractmethod
+    def exchange(self) -> str:
+        pass
+
+    @abstractmethod
+    def schema(self) -> Schema:
+        # ideally, the Schema would somehow be parameterized by T as well
+        pass
+
+    @abstractmethod
+    def declarations(self, chan: BlockingChannel):
+        pass
+
+
+class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]):
+    EXCHANGE = 'workspaces.workflow-service.workflow-status'
+
+    def schema(self) -> Schema:
+        return WorkflowEventSchema()
+
+    def declarations(self, chan: BlockingChannel):
+        chan.exchange_declare(self.EXCHANGE,
+                              exchange_type='topic',
+                              durable=True,
+                              auto_delete=False)
+
+    def routing_key_for(self, event: WorkflowEvent) -> str:
+        return f'{event.job_name}.{event.job_id}.{event.type.name.lower()}'
+
+    def exchange(self) -> str:
+        return self.EXCHANGE
+
+
+class Channel(Protocol[T]):
+    def __init__(self, definition: ChannelDefinition[T]):
+        self.definition = definition
 
     def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
         """
@@ -44,22 +88,16 @@ class Channel:
             )
             CONNECTION = pika.BlockingConnection(connection_parameters)
             CHANNEL = CONNECTION.channel()
-            CHANNEL.exchange_declare(
-                # FIXME: CAPO config does not currently support 'exchange' value
-                exchange=kwargs.get('exchange', CONFIG.exchange),
-                exchange_type='topic',
-                durable=True,
-                auto_delete=False
-            )
+            self.definition.declarations(CHANNEL)
 
     def close(self):
         if CONNECTION:
             CONNECTION.close()
 
     def send(self, event: WorkflowEventSchema):
-        rendered = self.schema.dump(event)
-        routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}'
-        CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered))
+        rendered = self.definition.schema.dump(event)
+        routing_key = self.definition.routing_key_for(event)
+        CHANNEL.basic_publish(routing_key=routing_key, body=rendered)
 
     def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
         """
@@ -71,17 +109,14 @@ class Channel:
         :param exchange: AMQP exchange name
         :param auto_ack: If true, consumer automatically acknowledges when it consumes a message
         """
-        exchange = kwargs.get('exchange', CONFIG.exchange)
         auto_ack = kwargs.get('auto_ack', False)
 
         def unwrapping_callback(message):
-            event = self.schema.load(message)
+            event = self.definition.schema.load(message)
             callback(event)
-        # FIXME: ask the underlying channel to call us back
-        # but only for messages matching pattern
 
         queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
-        CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern)
+        CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern)
         CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
         CHANNEL.start_consuming()
 
@@ -104,4 +139,4 @@ class Channel:
 
 
 # Predefined event channels for ease of use
-workflow_events = Channel(WorkflowEventSchema())
+workflow_events = Channel(WorkflowEventChannel())
-- 
GitLab