from abc import ABC, abstractmethod from typing import Callable, Any, Optional, Union, TypeVar, Protocol, Generic import pika from marshmallow import Schema from pika.adapters.blocking_connection import BlockingChannel from pycapo import CapoConfig from workspaces.json import WorkflowEventSchema from workspaces.schema import WorkflowEvent T = TypeVar('T', contravariant=True) CONN = None class ChannelDefinition(Protocol[T]): @abstractmethod def routing_key_for(self, message: T) -> str: pass @abstractmethod def exchange(self) -> str: pass @abstractmethod def schema(self) -> Schema: # ideally, the Schema would somehow be parameterized by T as well pass @abstractmethod def declarations(self, chan: BlockingChannel): pass class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]): EXCHANGE = 'workspaces.workflow-service.workflow-status' def schema(self) -> Schema: return WorkflowEventSchema() def declarations(self, chan: BlockingChannel): chan.exchange_declare(self.EXCHANGE, exchange_type='topic', durable=True, auto_delete=False) def routing_key_for(self, event: WorkflowEvent) -> str: return f'{event.job_name}.{event.job_id}.{event.type.name.lower()}' def exchange(self) -> str: return self.EXCHANGE ChannelDef = TypeVar('ChannelDef', bound=ChannelDefinition, covariant=True) class Channel(Generic[ChannelDef]): def __init__(self, definition: ChannelDefinition[T]): self.definition: ChannelDefinition[T] = definition self.chan: BlockingChannel = None self.config: CapoConfig = None def connect(self, **kwargs: Union[int, str]): """ Initialize connection to AMQP server given a CAPO profile Keyword arguments for the AMQP connection. These do not need to be specified: :param: hostname: Hostname to connect to :param: port: Port to connect to :param: connection_attempts: Number of connection attempts to try :param: socket_timeout: Time to wait for a socket to connect :param: retry_delay: Time to wait between retrying the connection :param: username: Username to connect to as :param: password: Password to use when connecting :param: exchange: Exchange to use when connection """ global CONN if not CONN: self.config = CapoConfig( profile=kwargs.get('profile', None) ).settings('edu.nrao.archive.configuration.AmqpServer') connection_parameters = pika.ConnectionParameters( host=kwargs.get('hostname', self.config.hostname), port=int(kwargs.get('port', self.config.port)), connection_attempts=int(kwargs.get('connection_attempts', 5)), socket_timeout=int(kwargs.get('socket_timeout', 5000)), retry_delay=int(kwargs.get('retry_delay', 500)), credentials=pika.PlainCredentials( username=kwargs.get('username', self.config.username), password=kwargs.get('password', self.config.password) ) ) CONN = pika.BlockingConnection(connection_parameters) self.chan = CONN.channel() self.definition.declarations(self.chan) def close(self): """ Close channel of connection to RabbitMQ server, if it's open """ if self.chan: self.chan.close() def send(self, event: WorkflowEventSchema, exchange: str): """ Send a given WorkflowEvent to the exchange :param event: The marshmallow schema of the event to be sent :param exchange: Name of the exchange the event will be sent to """ rendered = self.definition.schema().dump(event) routing_key = self.definition.routing_key_for(event) self.chan.basic_publish(exchange=exchange, routing_key=routing_key, body=str(rendered).encode('utf-8')) def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False): """ Establishes queue and binds it to a given channel and consumes messages matching the routing key from given exchange :param callback: Callback function for when a message is consumed :param pattern: Pattern to be used as routing key Optional keyword arguments :param auto_ack: If true, consumer automatically acknowledges when it consumes a message """ def unwrapping_callback(message): event = self.definition.schema.load(message) callback(event) queue = self.chan.queue_declare(queue='', exclusive=True).method.queue self.chan.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern) self.chan.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) self.chan.start_consuming() def __enter__(self, **kwargs: Union[int, str]): """ Keyword arguments for the AMQP connection. These do not need to be specified: :param: hostname: Hostname to connect to :param: port: Port to connect to :param: connection_attempts: Number of connection attempts to try :param: socket_timeout: Time to wait for a socket to connect :param: retry_delay: Time to wait between retrying the connection :param: username: Username to connect to as :param: password: Password to use when connecting :param: exchange: Exchange to use when connection """ self.connect(**kwargs) return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # Predefined event channels for ease of use workflow_events: Channel[WorkflowEvent] = Channel(WorkflowEventChannel())