Skip to content
Snippets Groups Projects
Commit 73ec3a91 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add ChannelDefinition, refactor Channel to use it

parent c7e86b5b
No related branches found
No related tags found
No related merge requests found
from typing import Callable, Any, Optional, Union
from abc import ABC, abstractmethod
from typing import Callable, Any, Optional, Union, TypeVar, Protocol
import pika
from marshmallow import Schema
from pika.adapters.blocking_connection import BlockingChannel
from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema
from workspaces.schema import WorkflowEvent
CONNECTION = None
......@@ -11,9 +14,50 @@ CHANNEL = None
CONFIG = None
class Channel:
def __init__(self, schema: Schema):
self.schema = schema
T = TypeVar('T')
class ChannelDefinition(ABC, Protocol[T]):
@abstractmethod
def routing_key_for(self, message: T) -> str:
pass
@abstractmethod
def exchange(self) -> str:
pass
@abstractmethod
def schema(self) -> Schema:
# ideally, the Schema would somehow be parameterized by T as well
pass
@abstractmethod
def declarations(self, chan: BlockingChannel):
pass
class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]):
EXCHANGE = 'workspaces.workflow-service.workflow-status'
def schema(self) -> Schema:
return WorkflowEventSchema()
def declarations(self, chan: BlockingChannel):
chan.exchange_declare(self.EXCHANGE,
exchange_type='topic',
durable=True,
auto_delete=False)
def routing_key_for(self, event: WorkflowEvent) -> str:
return f'{event.job_name}.{event.job_id}.{event.type.name.lower()}'
def exchange(self) -> str:
return self.EXCHANGE
class Channel(Protocol[T]):
def __init__(self, definition: ChannelDefinition[T]):
self.definition = definition
def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
"""
......@@ -44,22 +88,16 @@ class Channel:
)
CONNECTION = pika.BlockingConnection(connection_parameters)
CHANNEL = CONNECTION.channel()
CHANNEL.exchange_declare(
# FIXME: CAPO config does not currently support 'exchange' value
exchange=kwargs.get('exchange', CONFIG.exchange),
exchange_type='topic',
durable=True,
auto_delete=False
)
self.definition.declarations(CHANNEL)
def close(self):
if CONNECTION:
CONNECTION.close()
def send(self, event: WorkflowEventSchema):
rendered = self.schema.dump(event)
routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}'
CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered))
rendered = self.definition.schema.dump(event)
routing_key = self.definition.routing_key_for(event)
CHANNEL.basic_publish(routing_key=routing_key, body=rendered)
def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
"""
......@@ -71,17 +109,14 @@ class Channel:
:param exchange: AMQP exchange name
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
exchange = kwargs.get('exchange', CONFIG.exchange)
auto_ack = kwargs.get('auto_ack', False)
def unwrapping_callback(message):
event = self.schema.load(message)
event = self.definition.schema.load(message)
callback(event)
# FIXME: ask the underlying channel to call us back
# but only for messages matching pattern
queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern)
CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern)
CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
CHANNEL.start_consuming()
......@@ -104,4 +139,4 @@ class Channel:
# Predefined event channels for ease of use
workflow_events = Channel(WorkflowEventSchema())
workflow_events = Channel(WorkflowEventChannel())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment