diff --git a/services/workflow/src/workflow/event_catcher.py b/services/workflow/src/workflow/event_catcher.py new file mode 100644 index 0000000000000000000000000000000000000000..60d454c59988ce74bb9bbfad7e88da6926d698f2 --- /dev/null +++ b/services/workflow/src/workflow/event_catcher.py @@ -0,0 +1,17 @@ +""" +CLI tool that queries AMQP server for events and does something with them + +CL args: +- event type? +- job name? +- job id? + +- Connect to AMQP server using make_amqp_connection() +- Establish channel and declare workspaces.workflow-service.workflow-status exchange +- Declare queue in channel +- Execute basic_consume on exchange with queue and with binding key '#' to collect all results +- Start consuming +- Finish consuming at some point? +""" + +class EventCatcher: diff --git a/shared/channels/amqp_helpers.py b/shared/channels/amqp_helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..e73e900c21fede0963bd85c8679cff4b48f29c70 --- /dev/null +++ b/shared/channels/amqp_helpers.py @@ -0,0 +1,84 @@ +from typing import Callable + +import pika +from pycapo import CapoConfig + + +def make_amqp_connection(profile: str) -> pika.BlockingConnection: + """ + Initialize connection to AMQP server given a CAPO profile + :param profile: Capo profile to use + :return: Established connection + """ + capo_cfg = CapoConfig(profile=profile) + + hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') + username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') + password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') + + return pika.BlockingConnection( + pika.ConnectionParameters( + host=hostname, + credentials=pika.PlainCredentials( + username=username, + password=password + ) + ) + ) + + +def configure_amqp_channel( + connection: pika.BlockingConnection, + exchange: str, + exchange_type: str +) -> pika.adapters.blocking_connection.BlockingChannel: + """ + Takes in a connection and exchange declaration parameters and creates a channel to the given + exchange, configured with the given parameters + :param connection: RabbitMQ BlockingConnection + :param exchange: Exchange name + :param exchange_type: Exchange type + :return: Channel for given connection + """ + channel = connection.channel() + channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True) + return channel + + +def emit_amqp_message( + channel: pika.adapters.blocking_connection.BlockingChannel, + exchange: str, + routing_key: str, + msg: bytes +): + """ + Emit AMQP message to a given exchange through a given channel + :param channel: AMQP connection channel + :param exchange: AMQP exchange name + :param routing_key: Routing key + :param msg: Message to be emitted + """ + channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg) + + +def receive_amqp_message( + channel: pika.adapters.blocking_connection.BlockingChannel, + exchange: str, + routing_key: str, + callback: Callable, + auto_ack: bool = False +): + """ + Establishes queue and binds it to a given channel and consumes messages matching the + routing key from given exchange + :param channel: AMQP connection channel + :param exchange: AMQP exchange name + :param routing_key: Routing key + :param callback: Callback function for when a message is consumed + :param auto_ack: If true, consumer automatically acknowledges when it consumes a message + """ + queue_result = channel.queue_declare(queue='', exclusive=True) + queue_name = queue_result.method.queue + channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) + channel.start_consuming()