Skip to content
Snippets Groups Projects
Commit 8a8bdbe3 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Made amqp_helpers directory structure match the other subprojects. Made some...

Made amqp_helpers directory structure match the other subprojects. Made some small changes to monitor.py and test_monitor.py
parent fcc8bbea
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,8 @@ README = Path('README.md').read_text() ...@@ -10,6 +10,8 @@ README = Path('README.md').read_text()
requires = [ requires = [
'pika==1.1', 'pika==1.1',
'pendulum==2.1.2', 'pendulum==2.1.2',
'workspaces',
'amqp_helpers'
] ]
tests_require = [ tests_require = [
'pytest>=5.4,<6.0', 'pytest>=5.4,<6.0',
......
...@@ -12,11 +12,11 @@ from pathlib import Path ...@@ -12,11 +12,11 @@ from pathlib import Path
import pika import pika
import pendulum import pendulum
import workspaces
from pycapo import CapoConfig from pycapo import CapoConfig
from workspaces.schema import WorkflowEvent, WorkflowEventType from workspaces.schema import WorkflowEvent, WorkflowEventType
from ._version import ___version___ as VERSION from ._version import ___version___ as VERSION
from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
...@@ -237,44 +237,21 @@ def make_arg_parser() -> argparse.ArgumentParser: ...@@ -237,44 +237,21 @@ def make_arg_parser() -> argparse.ArgumentParser:
return parser return parser
@log_decorator_factory('Establishing connection to the AMQP server...') # @log_decorator_factory('Sending event data to the channel...')
def make_amqp_connection(profile: str) -> pika.BlockingConnection: # def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
""" # """
Initialize connection to AMQP server # Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param profile: Capo profile to use # :param connection: Connection to the RabbitMQ server
:return: Established connection # :param event: JSON string of event metadata
""" # """
capo_cfg = CapoConfig(profile=profile) # # Send data to exchange
# channel = connection.channel()
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') # channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') # return channel.basic_publish(
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') # exchange=WORKFLOW_STATUS_EXCH,
# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
return pika.BlockingConnection( # body=workspaces.json.workflow_event.dump(event)
pika.ConnectionParameters( # )
hostname,
credentials=pika.PlainCredentials(
username,
password
)
)
)
@log_decorator_factory('Sending event data to the channel...')
def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event: JSON string of event metadata
"""
# Send data to exchange
channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
body=workspaces.json.workflow_event.dump(event)
)
def main(): def main():
...@@ -283,5 +260,18 @@ def main(): ...@@ -283,5 +260,18 @@ def main():
connection = make_amqp_connection(args.profile) connection = make_amqp_connection(args.profile)
monitor = WorkflowMonitor(args.log_path) monitor = WorkflowMonitor(args.log_path)
for event in monitor.events: # Probably want to refactor this so that it doesn't create its own connection just for this,
send_event_data(connection, event) # since creating connections is expensive
with make_amqp_connection(args.profile) as connection:
with configure_amqp_channel(
connection=connection,
exchange=WORKFLOW_STATUS_EXCH,
exchange_type='topic'
) as channel:
for event in monitor.events:
emit_amqp_message(
channel=channel,
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
msg=event
)
import pytest import pytest
from wf_monitor.wf_event import EventType from workspaces.schema import EventType
from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection from wf_monitor.monitor import WorkflowMonitor
from shared.channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message
log_path = 'logs/condor.log' log_path = 'logs/condor.log'
test_monitor = WorkflowMonitor(log_path) test_monitor = WorkflowMonitor(log_path)
......
Area for AMQP helper functions. May be expanded in the future to include other AMQP utilities.
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import setup, find_packages
VERSION = open('src/amqp_helpers/_version.py').readlines()[-1].split()[-1].strip("\"'")
README = Path('README.md').read_text()
setup(
name=Path().absolute().name,
version=VERSION,
description='Workspaces AMQP utility area',
long_description=README,
author='NRAO SSA Team',
author_email='dms-ssa@nrao.edu',
url='TBD',
license="GPL",
install_requires=['pika', 'pycapo'],
keywords=[],
packages=['amqp_helpers'],
package_dir={'':'src'},
classifiers=[
'Programming Language :: Python :: 3.8'
]
)
""" Version information for this package, don't put anything else here. """
___version___ = '4.0.0a1.dev1'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment