From 553f94cae00c5d12fcec68b96321681f006205fa Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Wed, 9 Sep 2020 13:37:14 -0600 Subject: [PATCH] Updated wf_monitor stuff to work with refactored AMQP helper functions --- .../wf_monitor/src/wf_monitor/monitor.py | 34 ++++--------- .../wf_monitor/test/test_wf_monitor.py | 49 ++++++++++++------- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 71c4b54b7..bf3fdad02 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -10,13 +10,11 @@ import tracemalloc from typing import List, Union, Dict, Callable, Tuple, Any from pathlib import Path -import pika import pendulum -from pycapo import CapoConfig -from workspaces.schema import WorkflowEvent, WorkflowEventType from ._version import ___version___ as VERSION from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message +from workspaces.schema import WorkflowEvent, WorkflowEventType WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' @@ -237,41 +235,27 @@ def make_arg_parser() -> argparse.ArgumentParser: return parser -# @log_decorator_factory('Sending event data to the channel...') -# def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): -# """ -# Takes in a JSON-formatted string and sends it off to the workflow status exchange -# :param connection: Connection to the RabbitMQ server -# :param event: JSON string of event metadata -# """ -# # Send data to exchange -# channel = connection.channel() -# channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) -# return channel.basic_publish( -# exchange=WORKFLOW_STATUS_EXCH, -# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', -# body=workspaces.json.workflow_event.dump(event) -# ) - - def main(): # Parse command-line args args = make_arg_parser().parse_args() - connection = make_amqp_connection(args.profile) monitor = WorkflowMonitor(args.log_path) # Probably want to refactor this so that it doesn't create its own connection just for this, # since creating connections is expensive - with make_amqp_connection(args.profile) as connection: - with configure_amqp_channel( + decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection) + decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel) + decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message) + + with decorated_connect(args.profile) as connection: + with decorated_config_channel( connection=connection, exchange=WORKFLOW_STATUS_EXCH, exchange_type='topic' ) as channel: for event in monitor.events: - emit_amqp_message( + decorated_emit( channel=channel, exchange=WORKFLOW_STATUS_EXCH, routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - msg=event + msg=event.json() ) diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index d1fd22bc9..7f0cf4e95 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -1,10 +1,13 @@ import pytest -from workspaces.schema import EventType +from pytest_mock import MockerFixture + +from workspaces.schema import WorkflowEventType from wf_monitor.monitor import WorkflowMonitor -from shared.channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message +from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) +WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' def test_read_log(): @@ -28,14 +31,14 @@ def test_read_log_timeout(capsys): def test_parse_log(): test_event_types = [ - EventType.SUBMITTED, - EventType.EXECUTING, - EventType.OTHER, - EventType.TERMINATED, - EventType.SUBMITTED, - EventType.EXECUTING, - EventType.OTHER, - EventType.TERMINATED + WorkflowEventType.SUBMITTED, + WorkflowEventType.EXECUTING, + WorkflowEventType.OTHER, + WorkflowEventType.TERMINATED, + WorkflowEventType.SUBMITTED, + WorkflowEventType.EXECUTING, + WorkflowEventType.OTHER, + WorkflowEventType.TERMINATED ] for e, e_type in zip(test_monitor.events, test_event_types): assert e.type == e_type @@ -47,16 +50,26 @@ def test_parse_log_error(): assert val_err.type == ValueError -def test_send_event_data(mocker): +def test_monitor_events(mocker: MockerFixture): mock_list = [ 'pika.adapters.blocking_connection.BlockingChannel.basic_publish', - 'wf_monitor.monitor.make_amqp_connection', - 'wf_monitor.monitor.send_event_data', + 'channels.amqp_helpers.make_amqp_connection', + 'channels.amqp_helpers.configure_amqp_channel', + 'channels.amqp_helpers.emit_amqp_message', ] [mocker.patch(mock) for mock in mock_list] - connection = make_amqp_connection('nmtest') - - for event in WorkflowMonitor(log_path).events: - send_event_data(connection, event) - assert connection.channel().basic_publish.call_count == 8 + with make_amqp_connection('nmtest') as connection: + with configure_amqp_channel( + connection=connection, + exchange=WORKFLOW_STATUS_EXCH, + exchange_type='topic' + ) as channel: + for event in WorkflowMonitor(log_path).events: + emit_amqp_message( + channel=channel, + exchange=WORKFLOW_STATUS_EXCH, + routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + msg=event + ) + assert channel.basic_publish.call_count == 8 -- GitLab