From 8a8bdbe3e0cdefa8bfa51ecc7c4b2e36f950363a Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Tue, 8 Sep 2020 17:08:33 -0600 Subject: [PATCH] Made amqp_helpers directory structure match the other subprojects. Made some small changes to monitor.py and test_monitor.py --- apps/cli/utilities/wf_monitor/setup.py | 2 + .../wf_monitor/src/wf_monitor/monitor.py | 72 ++++++++----------- .../wf_monitor/test/test_wf_monitor.py | 5 +- shared/channels/README.md | 1 + shared/channels/setup.py | 26 +++++++ shared/channels/src/amqp_helpers/__init__.py | 0 shared/channels/src/amqp_helpers/_version.py | 2 + .../{ => src/amqp_helpers}/amqp_helpers.py | 0 8 files changed, 65 insertions(+), 43 deletions(-) create mode 100644 shared/channels/README.md create mode 100644 shared/channels/setup.py create mode 100644 shared/channels/src/amqp_helpers/__init__.py create mode 100644 shared/channels/src/amqp_helpers/_version.py rename shared/channels/{ => src/amqp_helpers}/amqp_helpers.py (100%) diff --git a/apps/cli/utilities/wf_monitor/setup.py b/apps/cli/utilities/wf_monitor/setup.py index e60436e37..0cb2deadc 100644 --- a/apps/cli/utilities/wf_monitor/setup.py +++ b/apps/cli/utilities/wf_monitor/setup.py @@ -10,6 +10,8 @@ README = Path('README.md').read_text() requires = [ 'pika==1.1', 'pendulum==2.1.2', + 'workspaces', + 'amqp_helpers' ] tests_require = [ 'pytest>=5.4,<6.0', diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index bd3a7073a..71c4b54b7 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -12,11 +12,11 @@ from pathlib import Path import pika import pendulum -import workspaces from pycapo import CapoConfig from workspaces.schema import WorkflowEvent, WorkflowEventType from ._version import ___version___ as VERSION +from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' @@ -237,44 +237,21 @@ def make_arg_parser() -> argparse.ArgumentParser: return parser -@log_decorator_factory('Establishing connection to the AMQP server...') -def make_amqp_connection(profile: str) -> pika.BlockingConnection: - """ - Initialize connection to AMQP server - :param profile: Capo profile to use - :return: Established connection - """ - capo_cfg = CapoConfig(profile=profile) - - hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') - - return pika.BlockingConnection( - pika.ConnectionParameters( - hostname, - credentials=pika.PlainCredentials( - username, - password - ) - ) - ) - - -@log_decorator_factory('Sending event data to the channel...') -def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): - """ - Takes in a JSON-formatted string and sends it off to the workflow status exchange - :param event: JSON string of event metadata - """ - # Send data to exchange - channel = connection.channel() - channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) - return channel.basic_publish( - exchange=WORKFLOW_STATUS_EXCH, - routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - body=workspaces.json.workflow_event.dump(event) - ) +# @log_decorator_factory('Sending event data to the channel...') +# def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): +# """ +# Takes in a JSON-formatted string and sends it off to the workflow status exchange +# :param connection: Connection to the RabbitMQ server +# :param event: JSON string of event metadata +# """ +# # Send data to exchange +# channel = connection.channel() +# channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) +# return channel.basic_publish( +# exchange=WORKFLOW_STATUS_EXCH, +# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', +# body=workspaces.json.workflow_event.dump(event) +# ) def main(): @@ -283,5 +260,18 @@ def main(): connection = make_amqp_connection(args.profile) monitor = WorkflowMonitor(args.log_path) - for event in monitor.events: - send_event_data(connection, event) + # Probably want to refactor this so that it doesn't create its own connection just for this, + # since creating connections is expensive + with make_amqp_connection(args.profile) as connection: + with configure_amqp_channel( + connection=connection, + exchange=WORKFLOW_STATUS_EXCH, + exchange_type='topic' + ) as channel: + for event in monitor.events: + emit_amqp_message( + channel=channel, + exchange=WORKFLOW_STATUS_EXCH, + routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + msg=event + ) diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index 2fe39cb99..d1fd22bc9 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -1,6 +1,7 @@ import pytest -from wf_monitor.wf_event import EventType -from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection +from workspaces.schema import EventType +from wf_monitor.monitor import WorkflowMonitor +from shared.channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) diff --git a/shared/channels/README.md b/shared/channels/README.md new file mode 100644 index 000000000..ae2729b2b --- /dev/null +++ b/shared/channels/README.md @@ -0,0 +1 @@ +Area for AMQP helper functions. May be expanded in the future to include other AMQP utilities. \ No newline at end of file diff --git a/shared/channels/setup.py b/shared/channels/setup.py new file mode 100644 index 000000000..c3acc82c1 --- /dev/null +++ b/shared/channels/setup.py @@ -0,0 +1,26 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +from pathlib import Path +from setuptools import setup, find_packages + +VERSION = open('src/amqp_helpers/_version.py').readlines()[-1].split()[-1].strip("\"'") +README = Path('README.md').read_text() + +setup( + name=Path().absolute().name, + version=VERSION, + description='Workspaces AMQP utility area', + long_description=README, + author='NRAO SSA Team', + author_email='dms-ssa@nrao.edu', + url='TBD', + license="GPL", + install_requires=['pika', 'pycapo'], + keywords=[], + packages=['amqp_helpers'], + package_dir={'':'src'}, + classifiers=[ + 'Programming Language :: Python :: 3.8' + ] +) diff --git a/shared/channels/src/amqp_helpers/__init__.py b/shared/channels/src/amqp_helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/shared/channels/src/amqp_helpers/_version.py b/shared/channels/src/amqp_helpers/_version.py new file mode 100644 index 000000000..f27d146a3 --- /dev/null +++ b/shared/channels/src/amqp_helpers/_version.py @@ -0,0 +1,2 @@ +""" Version information for this package, don't put anything else here. """ +___version___ = '4.0.0a1.dev1' diff --git a/shared/channels/amqp_helpers.py b/shared/channels/src/amqp_helpers/amqp_helpers.py similarity index 100% rename from shared/channels/amqp_helpers.py rename to shared/channels/src/amqp_helpers/amqp_helpers.py -- GitLab