diff --git a/apps/cli/utilities/wf_monitor/setup.py b/apps/cli/utilities/wf_monitor/setup.py index e60436e3775ad039460d964fae30ee346ffa9ec0..0cb2deadce2e3407a25b7f43e104afe8b73ab2ce 100644 --- a/apps/cli/utilities/wf_monitor/setup.py +++ b/apps/cli/utilities/wf_monitor/setup.py @@ -10,6 +10,8 @@ README = Path('README.md').read_text() requires = [ 'pika==1.1', 'pendulum==2.1.2', + 'workspaces', + 'amqp_helpers' ] tests_require = [ 'pytest>=5.4,<6.0', diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index bd3a7073a49b7ee146f33ed316909dd50e08360f..71c4b54b79b06de533024ac67444bbd0e2cef02d 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -12,11 +12,11 @@ from pathlib import Path import pika import pendulum -import workspaces from pycapo import CapoConfig from workspaces.schema import WorkflowEvent, WorkflowEventType from ._version import ___version___ as VERSION +from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' @@ -237,44 +237,21 @@ def make_arg_parser() -> argparse.ArgumentParser: return parser -@log_decorator_factory('Establishing connection to the AMQP server...') -def make_amqp_connection(profile: str) -> pika.BlockingConnection: - """ - Initialize connection to AMQP server - :param profile: Capo profile to use - :return: Established connection - """ - capo_cfg = CapoConfig(profile=profile) - - hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') - - return pika.BlockingConnection( - pika.ConnectionParameters( - hostname, - credentials=pika.PlainCredentials( - username, - password - ) - ) - ) - - -@log_decorator_factory('Sending event data to the channel...') -def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): - """ - Takes in a JSON-formatted string and sends it off to the workflow status exchange - :param event: JSON string of event metadata - """ - # Send data to exchange - channel = connection.channel() - channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) - return channel.basic_publish( - exchange=WORKFLOW_STATUS_EXCH, - routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - body=workspaces.json.workflow_event.dump(event) - ) +# @log_decorator_factory('Sending event data to the channel...') +# def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): +# """ +# Takes in a JSON-formatted string and sends it off to the workflow status exchange +# :param connection: Connection to the RabbitMQ server +# :param event: JSON string of event metadata +# """ +# # Send data to exchange +# channel = connection.channel() +# channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) +# return channel.basic_publish( +# exchange=WORKFLOW_STATUS_EXCH, +# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', +# body=workspaces.json.workflow_event.dump(event) +# ) def main(): @@ -283,5 +260,18 @@ def main(): connection = make_amqp_connection(args.profile) monitor = WorkflowMonitor(args.log_path) - for event in monitor.events: - send_event_data(connection, event) + # Probably want to refactor this so that it doesn't create its own connection just for this, + # since creating connections is expensive + with make_amqp_connection(args.profile) as connection: + with configure_amqp_channel( + connection=connection, + exchange=WORKFLOW_STATUS_EXCH, + exchange_type='topic' + ) as channel: + for event in monitor.events: + emit_amqp_message( + channel=channel, + exchange=WORKFLOW_STATUS_EXCH, + routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + msg=event + ) diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index 2fe39cb990466c8f513be600644001a4ac829754..d1fd22bc955ccada1472d10144b182450494840d 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -1,6 +1,7 @@ import pytest -from wf_monitor.wf_event import EventType -from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection +from workspaces.schema import EventType +from wf_monitor.monitor import WorkflowMonitor +from shared.channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) diff --git a/shared/channels/README.md b/shared/channels/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ae2729b2b2dbb272c11ae5106f41807f26ef8478 --- /dev/null +++ b/shared/channels/README.md @@ -0,0 +1 @@ +Area for AMQP helper functions. May be expanded in the future to include other AMQP utilities. \ No newline at end of file diff --git a/shared/channels/setup.py b/shared/channels/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..c3acc82c1253f280d52452ed215c0d0c9652e9d0 --- /dev/null +++ b/shared/channels/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +from pathlib import Path +from setuptools import setup, find_packages + +VERSION = open('src/amqp_helpers/_version.py').readlines()[-1].split()[-1].strip("\"'") +README = Path('README.md').read_text() + +setup( + name=Path().absolute().name, + version=VERSION, + description='Workspaces AMQP utility area', + long_description=README, + author='NRAO SSA Team', + author_email='dms-ssa@nrao.edu', + url='TBD', + license="GPL", + install_requires=['pika', 'pycapo'], + keywords=[], + packages=['amqp_helpers'], + package_dir={'':'src'}, + classifiers=[ + 'Programming Language :: Python :: 3.8' + ] +) diff --git a/shared/channels/src/amqp_helpers/__init__.py b/shared/channels/src/amqp_helpers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/shared/channels/src/amqp_helpers/_version.py b/shared/channels/src/amqp_helpers/_version.py new file mode 100644 index 0000000000000000000000000000000000000000..f27d146a3f39885ce269bacf9ab4510254147c8d --- /dev/null +++ b/shared/channels/src/amqp_helpers/_version.py @@ -0,0 +1,2 @@ +""" Version information for this package, don't put anything else here. """ +___version___ = '4.0.0a1.dev1' diff --git a/shared/channels/amqp_helpers.py b/shared/channels/src/amqp_helpers/amqp_helpers.py similarity index 100% rename from shared/channels/amqp_helpers.py rename to shared/channels/src/amqp_helpers/amqp_helpers.py