From 57b8277b9f8daac1023b3f4eca7d26ad08cb951a Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Wed, 9 Sep 2020 16:35:38 -0600 Subject: [PATCH] Initial layout for Channel class and its methods filled in --- shared/channels/src/channels/amqp_helpers.py | 169 +++++++++---------- 1 file changed, 76 insertions(+), 93 deletions(-) diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index 4b9cf8482..3b509e32f 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -1,4 +1,4 @@ -from typing import Callable, Any +from typing import Callable, Any, Optional, Union import pika from marshmallow import Schema @@ -6,115 +6,98 @@ from pycapo import CapoConfig from workspaces.json import WorkflowEventSchema -def make_amqp_connection(profile: str) -> pika.BlockingConnection: - """ - Initialize connection to AMQP server given a CAPO profile - :param profile: Capo profile to use - :return: Established connection - """ - capo_cfg = CapoConfig(profile=profile) - - hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') - - return pika.BlockingConnection( - pika.ConnectionParameters( - host=hostname, - credentials=pika.PlainCredentials( - username=username, - password=password - ) - ) - ) - - -def configure_amqp_channel( - connection: pika.BlockingConnection, - exchange: str, - exchange_type: str -) -> pika.adapters.blocking_connection.BlockingChannel: - """ - Takes in a connection and exchange declaration parameters and creates a channel to the given - exchange, configured with the given parameters - :param connection: RabbitMQ BlockingConnection - :param exchange: Exchange name - :param exchange_type: Exchange type - :return: Channel for given connection - """ - channel = connection.channel() - channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True) - return channel - - -def emit_amqp_message( - channel: pika.adapters.blocking_connection.BlockingChannel, - exchange: str, - routing_key: str, - msg: bytes -): - """ - Emit AMQP message to a given exchange through a given channel - :param channel: AMQP connection channel - :param exchange: AMQP exchange name - :param routing_key: Routing key - :param msg: Message to be emitted - """ - channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg) - - -def receive_amqp_message( - channel: pika.adapters.blocking_connection.BlockingChannel, - exchange: str, - routing_key: str, - callback: Callable, - auto_ack: bool = False -): - """ - Establishes queue and binds it to a given channel and consumes messages matching the - routing key from given exchange - :param channel: AMQP connection channel - :param exchange: AMQP exchange name - :param routing_key: Routing key - :param callback: Callback function for when a message is consumed - :param auto_ack: If true, consumer automatically acknowledges when it consumes a message - """ - queue_result = channel.queue_declare(queue='', exclusive=True) - queue_name = queue_result.method.queue - channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) - channel.start_consuming() +CONNECTION = None +CHANNEL = None +CONFIG = None class Channel: def __init__(self, schema: Schema): self.schema = schema - def connect(self): - # FIXME implement this - pass + def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection: + """ + Initialize connection to AMQP server given a CAPO profile + :return: Established connection + """ + global CONNECTION + global CHANNEL + global CONFIG + + if not CONNECTION: + CONFIG = CapoConfig( + profile=kwargs.get('profile', None) + ).settings('edu.nrao.archive.configuration.AmqpServer') + + connection_parameters = pika.ConnectionParameters( + host=kwargs.get('hostname', CONFIG.hostname), + port=int(kwargs.get('port', CONFIG.port)), + # FIXME: Copied from events. Do these args need to be cast to int? + connection_attempts=kwargs.get('connection_attempts', 5), + socket_timeout=kwargs.get('socket_timeout', 5000), + retry_delay=kwargs.get('retry_delay', 500), + # + credentials=pika.PlainCredentials( + username=kwargs.get('username', CONFIG.username), + password=kwargs.get('password', CONFIG.password) + ) + ) + CONNECTION = pika.BlockingConnection(connection_parameters) + CHANNEL = CONNECTION.channel() + CHANNEL.exchange_declare( + # FIXME: CAPO config does not currently support 'exchange' value + exchange=kwargs.get('exchange', CONFIG.exchange), + exchange_type='topic', + durable=True, + auto_delete=False + ) def close(self): - # FIXME implement this - pass + if CONNECTION: + CONNECTION.close() - def send(self, event): - # I don't know how to type Event there for Python but it would be - # nice if we could, somehow. + def send(self, event: WorkflowEventSchema): rendered = self.schema.dump(event) - # FIXME: transmit rendered over the underlying channel - pass + routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}' + CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered)) + + def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]): + """ + Establishes queue and binds it to a given channel and consumes messages matching the + routing key from given exchange + :param callback: Callback function for when a message is consumed + :param pattern: Pattern to be used as routing key + Optional keyword arguments + :param exchange: AMQP exchange name + :param auto_ack: If true, consumer automatically acknowledges when it consumes a message + """ + exchange = kwargs.get('exchange', CONFIG.exchange) + auto_ack = kwargs.get('auto_ack', False) - def listen(self, callback: Callable[[Any], None], pattern: str = '#'): def unwrapping_callback(message): event = self.schema.load(message) callback(event) # FIXME: ask the underlying channel to call us back # but only for messages matching pattern - pass - def __enter__(self): - self.connect() + queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue + CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern) + CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) + CHANNEL.start_consuming() + + def __enter__(self, **kwargs: Union[int, str]): + """ + Keyword arguments for the AMQP connection. These do not need to be specified: + :param: hostname: Hostname to connect to + :param: port: Port to connect to + :param: connection_attempts: Number of connection attempts to try + :param: socket_timeout: Time to wait for a socket to connect + :param: retry_delay: Time to wait between retrying the connection + :param: username: Username to connect to as + :param: password: Password to use when connecting + :param: exchange: Exchange to use when connection + """ + self.connect(**kwargs) def __exit__(self, exc_type, exc_val, exc_tb): self.close() -- GitLab