Skip to content
Snippets Groups Projects
Commit 57b8277b authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Initial layout for Channel class and its methods filled in

parent d66650c4
No related branches found
No related tags found
No related merge requests found
from typing import Callable, Any
from typing import Callable, Any, Optional, Union
import pika
from marshmallow import Schema
......@@ -6,115 +6,98 @@ from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server given a CAPO profile
:param profile: Capo profile to use
:return: Established connection
"""
capo_cfg = CapoConfig(profile=profile)
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname,
credentials=pika.PlainCredentials(
username=username,
password=password
)
)
)
def configure_amqp_channel(
connection: pika.BlockingConnection,
exchange: str,
exchange_type: str
) -> pika.adapters.blocking_connection.BlockingChannel:
"""
Takes in a connection and exchange declaration parameters and creates a channel to the given
exchange, configured with the given parameters
:param connection: RabbitMQ BlockingConnection
:param exchange: Exchange name
:param exchange_type: Exchange type
:return: Channel for given connection
"""
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
return channel
def emit_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
msg: bytes
):
"""
Emit AMQP message to a given exchange through a given channel
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param msg: Message to be emitted
"""
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg)
def receive_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
callback: Callable,
auto_ack: bool = False
):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param callback: Callback function for when a message is consumed
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
queue_result = channel.queue_declare(queue='', exclusive=True)
queue_name = queue_result.method.queue
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
channel.start_consuming()
CONNECTION = None
CHANNEL = None
CONFIG = None
class Channel:
def __init__(self, schema: Schema):
self.schema = schema
def connect(self):
# FIXME implement this
pass
def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server given a CAPO profile
:return: Established connection
"""
global CONNECTION
global CHANNEL
global CONFIG
if not CONNECTION:
CONFIG = CapoConfig(
profile=kwargs.get('profile', None)
).settings('edu.nrao.archive.configuration.AmqpServer')
connection_parameters = pika.ConnectionParameters(
host=kwargs.get('hostname', CONFIG.hostname),
port=int(kwargs.get('port', CONFIG.port)),
# FIXME: Copied from events. Do these args need to be cast to int?
connection_attempts=kwargs.get('connection_attempts', 5),
socket_timeout=kwargs.get('socket_timeout', 5000),
retry_delay=kwargs.get('retry_delay', 500),
#
credentials=pika.PlainCredentials(
username=kwargs.get('username', CONFIG.username),
password=kwargs.get('password', CONFIG.password)
)
)
CONNECTION = pika.BlockingConnection(connection_parameters)
CHANNEL = CONNECTION.channel()
CHANNEL.exchange_declare(
# FIXME: CAPO config does not currently support 'exchange' value
exchange=kwargs.get('exchange', CONFIG.exchange),
exchange_type='topic',
durable=True,
auto_delete=False
)
def close(self):
# FIXME implement this
pass
if CONNECTION:
CONNECTION.close()
def send(self, event):
# I don't know how to type Event there for Python but it would be
# nice if we could, somehow.
def send(self, event: WorkflowEventSchema):
rendered = self.schema.dump(event)
# FIXME: transmit rendered over the underlying channel
pass
routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}'
CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered))
def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param callback: Callback function for when a message is consumed
:param pattern: Pattern to be used as routing key
Optional keyword arguments
:param exchange: AMQP exchange name
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
exchange = kwargs.get('exchange', CONFIG.exchange)
auto_ack = kwargs.get('auto_ack', False)
def listen(self, callback: Callable[[Any], None], pattern: str = '#'):
def unwrapping_callback(message):
event = self.schema.load(message)
callback(event)
# FIXME: ask the underlying channel to call us back
# but only for messages matching pattern
pass
def __enter__(self):
self.connect()
queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern)
CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
CHANNEL.start_consuming()
def __enter__(self, **kwargs: Union[int, str]):
"""
Keyword arguments for the AMQP connection. These do not need to be specified:
:param: hostname: Hostname to connect to
:param: port: Port to connect to
:param: connection_attempts: Number of connection attempts to try
:param: socket_timeout: Time to wait for a socket to connect
:param: retry_delay: Time to wait between retrying the connection
:param: username: Username to connect to as
:param: password: Password to use when connecting
:param: exchange: Exchange to use when connection
"""
self.connect(**kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment