Skip to content
Snippets Groups Projects
Commit 04b05b66 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added event_catcher.py with some notes on design and amqp_helpers.py, which...

Added event_catcher.py with some notes on design and amqp_helpers.py, which contains some helper functions for AMQP-related procedures
parent 2b55cff6
No related branches found
No related tags found
No related merge requests found
"""
CLI tool that queries AMQP server for events and does something with them
CL args:
- event type?
- job name?
- job id?
- Connect to AMQP server using make_amqp_connection()
- Establish channel and declare workspaces.workflow-service.workflow-status exchange
- Declare queue in channel
- Execute basic_consume on exchange with queue and with binding key '#' to collect all results
- Start consuming
- Finish consuming at some point?
"""
class EventCatcher:
from typing import Callable
import pika
from pycapo import CapoConfig
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server given a CAPO profile
:param profile: Capo profile to use
:return: Established connection
"""
capo_cfg = CapoConfig(profile=profile)
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname,
credentials=pika.PlainCredentials(
username=username,
password=password
)
)
)
def configure_amqp_channel(
connection: pika.BlockingConnection,
exchange: str,
exchange_type: str
) -> pika.adapters.blocking_connection.BlockingChannel:
"""
Takes in a connection and exchange declaration parameters and creates a channel to the given
exchange, configured with the given parameters
:param connection: RabbitMQ BlockingConnection
:param exchange: Exchange name
:param exchange_type: Exchange type
:return: Channel for given connection
"""
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
return channel
def emit_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
msg: bytes
):
"""
Emit AMQP message to a given exchange through a given channel
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param msg: Message to be emitted
"""
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg)
def receive_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
callback: Callable,
auto_ack: bool = False
):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param callback: Callback function for when a message is consumed
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
queue_result = channel.queue_declare(queue='', exclusive=True)
queue_name = queue_result.method.queue
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
channel.start_consuming()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment