diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index 3b509e32f310600520ba49a14bb1bf057d1dbf40..302750ca4b2e639c03fd51b2ea29fac9a956076c 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -1,9 +1,12 @@ -from typing import Callable, Any, Optional, Union +from abc import ABC, abstractmethod +from typing import Callable, Any, Optional, Union, TypeVar, Protocol import pika from marshmallow import Schema +from pika.adapters.blocking_connection import BlockingChannel from pycapo import CapoConfig from workspaces.json import WorkflowEventSchema +from workspaces.schema import WorkflowEvent CONNECTION = None @@ -11,9 +14,50 @@ CHANNEL = None CONFIG = None -class Channel: - def __init__(self, schema: Schema): - self.schema = schema +T = TypeVar('T') + + +class ChannelDefinition(ABC, Protocol[T]): + @abstractmethod + def routing_key_for(self, message: T) -> str: + pass + + @abstractmethod + def exchange(self) -> str: + pass + + @abstractmethod + def schema(self) -> Schema: + # ideally, the Schema would somehow be parameterized by T as well + pass + + @abstractmethod + def declarations(self, chan: BlockingChannel): + pass + + +class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]): + EXCHANGE = 'workspaces.workflow-service.workflow-status' + + def schema(self) -> Schema: + return WorkflowEventSchema() + + def declarations(self, chan: BlockingChannel): + chan.exchange_declare(self.EXCHANGE, + exchange_type='topic', + durable=True, + auto_delete=False) + + def routing_key_for(self, event: WorkflowEvent) -> str: + return f'{event.job_name}.{event.job_id}.{event.type.name.lower()}' + + def exchange(self) -> str: + return self.EXCHANGE + + +class Channel(Protocol[T]): + def __init__(self, definition: ChannelDefinition[T]): + self.definition = definition def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection: """ @@ -44,22 +88,16 @@ class Channel: ) CONNECTION = pika.BlockingConnection(connection_parameters) CHANNEL = CONNECTION.channel() - CHANNEL.exchange_declare( - # FIXME: CAPO config does not currently support 'exchange' value - exchange=kwargs.get('exchange', CONFIG.exchange), - exchange_type='topic', - durable=True, - auto_delete=False - ) + self.definition.declarations(CHANNEL) def close(self): if CONNECTION: CONNECTION.close() def send(self, event: WorkflowEventSchema): - rendered = self.schema.dump(event) - routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}' - CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered)) + rendered = self.definition.schema.dump(event) + routing_key = self.definition.routing_key_for(event) + CHANNEL.basic_publish(routing_key=routing_key, body=rendered) def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]): """ @@ -71,17 +109,14 @@ class Channel: :param exchange: AMQP exchange name :param auto_ack: If true, consumer automatically acknowledges when it consumes a message """ - exchange = kwargs.get('exchange', CONFIG.exchange) auto_ack = kwargs.get('auto_ack', False) def unwrapping_callback(message): - event = self.schema.load(message) + event = self.definition.schema.load(message) callback(event) - # FIXME: ask the underlying channel to call us back - # but only for messages matching pattern queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue - CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern) + CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern) CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) CHANNEL.start_consuming() @@ -104,4 +139,4 @@ class Channel: # Predefined event channels for ease of use -workflow_events = Channel(WorkflowEventSchema()) +workflow_events = Channel(WorkflowEventChannel())