From 7926ea30ea62a931727e235ea0ab86129d629be8 Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Mon, 14 Sep 2020 10:41:21 -0600 Subject: [PATCH] Updated amqp_helpers to ensure the sending functionality works; updated wf_monitor to use new amqp_helpers setup; updated wf_monitor tests; updated WorkflowEventSchema to accept a string in the timestamp field instead of a datetime (we could probably change this, but this is how it's set up for now) --- .../wf_monitor/src/wf_monitor/monitor.py | 2 +- .../wf_monitor/test/test_wf_monitor.py | 26 ++++++------------- shared/channels/src/channels/amqp_helpers.py | 9 ++++--- shared/workspaces/src/workspaces/json.py | 2 +- 4 files changed, 15 insertions(+), 24 deletions(-) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index c02248d9e..899c61be8 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -241,7 +241,7 @@ def main(): monitor = WorkflowMonitor(args.log_path) with workflow_events as wfe: for event in monitor.events: - wfe.send(event) + wfe.send(event, WORKFLOW_STATUS_EXCH) # # Probably want to refactor this so that it doesn't create its own connection just for this, # # since creating connections is expensive diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index f0f13d281..c60bb88c2 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -3,7 +3,7 @@ from pytest_mock import MockerFixture from workspaces.schema import WorkflowEventType from wf_monitor.monitor import WorkflowMonitor -from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message +from channels.amqp_helpers import Channel, workflow_events log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) @@ -75,23 +75,13 @@ def test_monitor_events(mocker: MockerFixture): """ mock_list = [ 'pika.adapters.blocking_connection.BlockingChannel.basic_publish', - 'channels.amqp_helpers.make_amqp_connection', - 'channels.amqp_helpers.configure_amqp_channel', - 'channels.amqp_helpers.emit_amqp_message', + 'channels.amqp_helpers.Channel.connect', + 'channels.amqp_helpers.Channel.send', ] [mocker.patch(mock) for mock in mock_list] - with make_amqp_connection('nmtest') as connection: - with configure_amqp_channel( - connection=connection, - exchange=WORKFLOW_STATUS_EXCH, - exchange_type='topic' - ) as channel: - for event in WorkflowMonitor(log_path).events: - emit_amqp_message( - channel=channel, - exchange=WORKFLOW_STATUS_EXCH, - routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - msg=event - ) - assert channel.basic_publish.call_count == 8 + channel = workflow_events + for event in WorkflowMonitor(log_path).events: + channel.send(event, WORKFLOW_STATUS_EXCH) + + assert channel.send.call_count == 8 diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index c5b93a0b7..e2a08c791 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -90,7 +90,7 @@ class Channel(Generic[ChannelDef]): username=kwargs.get('username', self.config.username), password=kwargs.get('password', self.config.password) ) - ) + ) CONN = pika.BlockingConnection(connection_parameters) self.chan = CONN.channel() self.definition.declarations(self.chan) @@ -102,14 +102,15 @@ class Channel(Generic[ChannelDef]): if self.chan: self.chan.close() - def send(self, event: WorkflowEventSchema): + def send(self, event: WorkflowEventSchema, exchange: str): """ Send a given WorkflowEvent to the exchange :param event: The marshmallow schema of the event to be sent + :param exchange: Name of the exchange the event will be sent to """ - rendered = self.definition.schema.dump(event) + rendered = self.definition.schema().dump(event) routing_key = self.definition.routing_key_for(event) - self.chan.basic_publish(routing_key=routing_key, body=rendered) + self.chan.basic_publish(exchange=exchange, routing_key=routing_key, body=str(rendered).encode('utf-8')) def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False): """ diff --git a/shared/workspaces/src/workspaces/json.py b/shared/workspaces/src/workspaces/json.py index 0244672cb..651e1d699 100644 --- a/shared/workspaces/src/workspaces/json.py +++ b/shared/workspaces/src/workspaces/json.py @@ -10,7 +10,7 @@ class WorkflowEventSchema(Schema): job_name = fields.String() job_id = fields.String() event_type = fields.Method("get_type", deserialize="load_type") - timestamp = fields.DateTime() + timestamp = fields.String() log = fields.String() retval = fields.Integer(allow_none=True) -- GitLab