Skip to content
Snippets Groups Projects
Commit 553f94ca authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Updated wf_monitor stuff to work with refactored AMQP helper functions

parent a4444190
No related branches found
No related tags found
No related merge requests found
...@@ -10,13 +10,11 @@ import tracemalloc ...@@ -10,13 +10,11 @@ import tracemalloc
from typing import List, Union, Dict, Callable, Tuple, Any from typing import List, Union, Dict, Callable, Tuple, Any
from pathlib import Path from pathlib import Path
import pika
import pendulum import pendulum
from pycapo import CapoConfig
from workspaces.schema import WorkflowEvent, WorkflowEventType
from ._version import ___version___ as VERSION from ._version import ___version___ as VERSION
from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message
from workspaces.schema import WorkflowEvent, WorkflowEventType
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
...@@ -237,41 +235,27 @@ def make_arg_parser() -> argparse.ArgumentParser: ...@@ -237,41 +235,27 @@ def make_arg_parser() -> argparse.ArgumentParser:
return parser return parser
# @log_decorator_factory('Sending event data to the channel...')
# def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
# """
# Takes in a JSON-formatted string and sends it off to the workflow status exchange
# :param connection: Connection to the RabbitMQ server
# :param event: JSON string of event metadata
# """
# # Send data to exchange
# channel = connection.channel()
# channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
# return channel.basic_publish(
# exchange=WORKFLOW_STATUS_EXCH,
# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
# body=workspaces.json.workflow_event.dump(event)
# )
def main(): def main():
# Parse command-line args # Parse command-line args
args = make_arg_parser().parse_args() args = make_arg_parser().parse_args()
connection = make_amqp_connection(args.profile)
monitor = WorkflowMonitor(args.log_path) monitor = WorkflowMonitor(args.log_path)
# Probably want to refactor this so that it doesn't create its own connection just for this, # Probably want to refactor this so that it doesn't create its own connection just for this,
# since creating connections is expensive # since creating connections is expensive
with make_amqp_connection(args.profile) as connection: decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection)
with configure_amqp_channel( decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel)
decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message)
with decorated_connect(args.profile) as connection:
with decorated_config_channel(
connection=connection, connection=connection,
exchange=WORKFLOW_STATUS_EXCH, exchange=WORKFLOW_STATUS_EXCH,
exchange_type='topic' exchange_type='topic'
) as channel: ) as channel:
for event in monitor.events: for event in monitor.events:
emit_amqp_message( decorated_emit(
channel=channel, channel=channel,
exchange=WORKFLOW_STATUS_EXCH, exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
msg=event msg=event.json()
) )
import pytest import pytest
from workspaces.schema import EventType from pytest_mock import MockerFixture
from workspaces.schema import WorkflowEventType
from wf_monitor.monitor import WorkflowMonitor from wf_monitor.monitor import WorkflowMonitor
from shared.channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message
log_path = 'logs/condor.log' log_path = 'logs/condor.log'
test_monitor = WorkflowMonitor(log_path) test_monitor = WorkflowMonitor(log_path)
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
def test_read_log(): def test_read_log():
...@@ -28,14 +31,14 @@ def test_read_log_timeout(capsys): ...@@ -28,14 +31,14 @@ def test_read_log_timeout(capsys):
def test_parse_log(): def test_parse_log():
test_event_types = [ test_event_types = [
EventType.SUBMITTED, WorkflowEventType.SUBMITTED,
EventType.EXECUTING, WorkflowEventType.EXECUTING,
EventType.OTHER, WorkflowEventType.OTHER,
EventType.TERMINATED, WorkflowEventType.TERMINATED,
EventType.SUBMITTED, WorkflowEventType.SUBMITTED,
EventType.EXECUTING, WorkflowEventType.EXECUTING,
EventType.OTHER, WorkflowEventType.OTHER,
EventType.TERMINATED WorkflowEventType.TERMINATED
] ]
for e, e_type in zip(test_monitor.events, test_event_types): for e, e_type in zip(test_monitor.events, test_event_types):
assert e.type == e_type assert e.type == e_type
...@@ -47,16 +50,26 @@ def test_parse_log_error(): ...@@ -47,16 +50,26 @@ def test_parse_log_error():
assert val_err.type == ValueError assert val_err.type == ValueError
def test_send_event_data(mocker): def test_monitor_events(mocker: MockerFixture):
mock_list = [ mock_list = [
'pika.adapters.blocking_connection.BlockingChannel.basic_publish', 'pika.adapters.blocking_connection.BlockingChannel.basic_publish',
'wf_monitor.monitor.make_amqp_connection', 'channels.amqp_helpers.make_amqp_connection',
'wf_monitor.monitor.send_event_data', 'channels.amqp_helpers.configure_amqp_channel',
'channels.amqp_helpers.emit_amqp_message',
] ]
[mocker.patch(mock) for mock in mock_list] [mocker.patch(mock) for mock in mock_list]
connection = make_amqp_connection('nmtest') with make_amqp_connection('nmtest') as connection:
with configure_amqp_channel(
for event in WorkflowMonitor(log_path).events: connection=connection,
send_event_data(connection, event) exchange=WORKFLOW_STATUS_EXCH,
assert connection.channel().basic_publish.call_count == 8 exchange_type='topic'
) as channel:
for event in WorkflowMonitor(log_path).events:
emit_amqp_message(
channel=channel,
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
msg=event
)
assert channel.basic_publish.call_count == 8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment