Skip to content
Snippets Groups Projects
Commit d2297066 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Small changes to amqp_helpers.py

parent 73ec3a91
No related branches found
No related tags found
No related merge requests found
......@@ -8,12 +8,6 @@ from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema
from workspaces.schema import WorkflowEvent
CONNECTION = None
CHANNEL = None
CONFIG = None
T = TypeVar('T')
......@@ -56,69 +50,73 @@ class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]):
class Channel(Protocol[T]):
CONNECTION = None
CHANNEL = None
CONFIG = None
def __init__(self, definition: ChannelDefinition[T]):
self.definition = definition
def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
def connect(self, **kwargs: Union[int, str]):
"""
Initialize connection to AMQP server given a CAPO profile
:return: Established connection
"""
global CONNECTION
global CHANNEL
global CONFIG
if not CONNECTION:
CONFIG = CapoConfig(
Keyword arguments for the AMQP connection. These do not need to be specified:
:param: hostname: Hostname to connect to
:param: port: Port to connect to
:param: connection_attempts: Number of connection attempts to try
:param: socket_timeout: Time to wait for a socket to connect
:param: retry_delay: Time to wait between retrying the connection
:param: username: Username to connect to as
:param: password: Password to use when connecting
:param: exchange: Exchange to use when connection
"""
if not self.CONNECTION:
self.CONFIG = CapoConfig(
profile=kwargs.get('profile', None)
).settings('edu.nrao.archive.configuration.AmqpServer')
connection_parameters = pika.ConnectionParameters(
host=kwargs.get('hostname', CONFIG.hostname),
port=int(kwargs.get('port', CONFIG.port)),
# FIXME: Copied from events. Do these args need to be cast to int?
connection_attempts=kwargs.get('connection_attempts', 5),
socket_timeout=kwargs.get('socket_timeout', 5000),
retry_delay=kwargs.get('retry_delay', 500),
#
host=kwargs.get('hostname', self.CONFIG.hostname),
port=int(kwargs.get('port', self.CONFIG.port)),
connection_attempts=int(kwargs.get('connection_attempts', 5)),
socket_timeout=int(kwargs.get('socket_timeout', 5000)),
retry_delay=int(kwargs.get('retry_delay', 500)),
credentials=pika.PlainCredentials(
username=kwargs.get('username', CONFIG.username),
password=kwargs.get('password', CONFIG.password)
username=kwargs.get('username', self.CONFIG.username),
password=kwargs.get('password', self.CONFIG.password)
)
)
CONNECTION = pika.BlockingConnection(connection_parameters)
CHANNEL = CONNECTION.channel()
self.definition.declarations(CHANNEL)
self.CONNECTION = pika.BlockingConnection(connection_parameters)
self.CHANNEL = self.CONNECTION.channel()
self.definition.declarations(self.CHANNEL)
def close(self):
if CONNECTION:
CONNECTION.close()
if self.CONNECTION:
self.CONNECTION.close()
def send(self, event: WorkflowEventSchema):
rendered = self.definition.schema.dump(event)
routing_key = self.definition.routing_key_for(event)
CHANNEL.basic_publish(routing_key=routing_key, body=rendered)
self.CHANNEL.basic_publish(routing_key=routing_key, body=rendered)
def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param callback: Callback function for when a message is consumed
:param pattern: Pattern to be used as routing key
Optional keyword arguments
:param exchange: AMQP exchange name
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
auto_ack = kwargs.get('auto_ack', False)
def unwrapping_callback(message):
event = self.definition.schema.load(message)
callback(event)
queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern)
CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
CHANNEL.start_consuming()
queue = self.CHANNEL.queue_declare(queue='', exclusive=True).method.queue
self.CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern)
self.CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
self.CHANNEL.start_consuming()
def __enter__(self, **kwargs: Union[int, str]):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment