diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index c02248d9e97c358bd06b28b96ba9c380d4396ccd..899c61be8279bb3cfbb2dc1f6cd0a7bfa82c4ce2 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -241,7 +241,7 @@ def main(): monitor = WorkflowMonitor(args.log_path) with workflow_events as wfe: for event in monitor.events: - wfe.send(event) + wfe.send(event, WORKFLOW_STATUS_EXCH) # # Probably want to refactor this so that it doesn't create its own connection just for this, # # since creating connections is expensive diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index f0f13d281771d4d67e95f828db92366489e7abb4..c60bb88c27eda31ccc550d17e8029496e8e8600a 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -3,7 +3,7 @@ from pytest_mock import MockerFixture from workspaces.schema import WorkflowEventType from wf_monitor.monitor import WorkflowMonitor -from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message +from channels.amqp_helpers import Channel, workflow_events log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) @@ -75,23 +75,13 @@ def test_monitor_events(mocker: MockerFixture): """ mock_list = [ 'pika.adapters.blocking_connection.BlockingChannel.basic_publish', - 'channels.amqp_helpers.make_amqp_connection', - 'channels.amqp_helpers.configure_amqp_channel', - 'channels.amqp_helpers.emit_amqp_message', + 'channels.amqp_helpers.Channel.connect', + 'channels.amqp_helpers.Channel.send', ] [mocker.patch(mock) for mock in mock_list] - with make_amqp_connection('nmtest') as connection: - with configure_amqp_channel( - connection=connection, - exchange=WORKFLOW_STATUS_EXCH, - exchange_type='topic' - ) as channel: - for event in WorkflowMonitor(log_path).events: - emit_amqp_message( - channel=channel, - exchange=WORKFLOW_STATUS_EXCH, - routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - msg=event - ) - assert channel.basic_publish.call_count == 8 + channel = workflow_events + for event in WorkflowMonitor(log_path).events: + channel.send(event, WORKFLOW_STATUS_EXCH) + + assert channel.send.call_count == 8 diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index c5b93a0b7340dd3ec91b4062ca240f8df6ee2dd2..e2a08c79182dba5682c60b32cf4e5e3fd9692735 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -90,7 +90,7 @@ class Channel(Generic[ChannelDef]): username=kwargs.get('username', self.config.username), password=kwargs.get('password', self.config.password) ) - ) + ) CONN = pika.BlockingConnection(connection_parameters) self.chan = CONN.channel() self.definition.declarations(self.chan) @@ -102,14 +102,15 @@ class Channel(Generic[ChannelDef]): if self.chan: self.chan.close() - def send(self, event: WorkflowEventSchema): + def send(self, event: WorkflowEventSchema, exchange: str): """ Send a given WorkflowEvent to the exchange :param event: The marshmallow schema of the event to be sent + :param exchange: Name of the exchange the event will be sent to """ - rendered = self.definition.schema.dump(event) + rendered = self.definition.schema().dump(event) routing_key = self.definition.routing_key_for(event) - self.chan.basic_publish(routing_key=routing_key, body=rendered) + self.chan.basic_publish(exchange=exchange, routing_key=routing_key, body=str(rendered).encode('utf-8')) def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False): """ diff --git a/shared/workspaces/src/workspaces/json.py b/shared/workspaces/src/workspaces/json.py index 0244672cbe09d14d19502afda10410a0533b9d76..651e1d6995fc9d922dce9fc89df6eb97ec033a31 100644 --- a/shared/workspaces/src/workspaces/json.py +++ b/shared/workspaces/src/workspaces/json.py @@ -10,7 +10,7 @@ class WorkflowEventSchema(Schema): job_name = fields.String() job_id = fields.String() event_type = fields.Method("get_type", deserialize="load_type") - timestamp = fields.DateTime() + timestamp = fields.String() log = fields.String() retval = fields.Integer(allow_none=True)