Skip to content
Snippets Groups Projects
Commit 7926ea30 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Updated amqp_helpers to ensure the sending functionality works; updated

wf_monitor to use new amqp_helpers setup; updated wf_monitor tests;
updated WorkflowEventSchema to accept a string in the timestamp field
instead of a datetime (we could probably change this, but this is how
it's set up for now)
parent cc135adb
No related branches found
No related tags found
No related merge requests found
......@@ -241,7 +241,7 @@ def main():
monitor = WorkflowMonitor(args.log_path)
with workflow_events as wfe:
for event in monitor.events:
wfe.send(event)
wfe.send(event, WORKFLOW_STATUS_EXCH)
# # Probably want to refactor this so that it doesn't create its own connection just for this,
# # since creating connections is expensive
......
......@@ -3,7 +3,7 @@ from pytest_mock import MockerFixture
from workspaces.schema import WorkflowEventType
from wf_monitor.monitor import WorkflowMonitor
from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message
from channels.amqp_helpers import Channel, workflow_events
log_path = 'logs/condor.log'
test_monitor = WorkflowMonitor(log_path)
......@@ -75,23 +75,13 @@ def test_monitor_events(mocker: MockerFixture):
"""
mock_list = [
'pika.adapters.blocking_connection.BlockingChannel.basic_publish',
'channels.amqp_helpers.make_amqp_connection',
'channels.amqp_helpers.configure_amqp_channel',
'channels.amqp_helpers.emit_amqp_message',
'channels.amqp_helpers.Channel.connect',
'channels.amqp_helpers.Channel.send',
]
[mocker.patch(mock) for mock in mock_list]
with make_amqp_connection('nmtest') as connection:
with configure_amqp_channel(
connection=connection,
exchange=WORKFLOW_STATUS_EXCH,
exchange_type='topic'
) as channel:
for event in WorkflowMonitor(log_path).events:
emit_amqp_message(
channel=channel,
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
msg=event
)
assert channel.basic_publish.call_count == 8
channel = workflow_events
for event in WorkflowMonitor(log_path).events:
channel.send(event, WORKFLOW_STATUS_EXCH)
assert channel.send.call_count == 8
......@@ -90,7 +90,7 @@ class Channel(Generic[ChannelDef]):
username=kwargs.get('username', self.config.username),
password=kwargs.get('password', self.config.password)
)
)
)
CONN = pika.BlockingConnection(connection_parameters)
self.chan = CONN.channel()
self.definition.declarations(self.chan)
......@@ -102,14 +102,15 @@ class Channel(Generic[ChannelDef]):
if self.chan:
self.chan.close()
def send(self, event: WorkflowEventSchema):
def send(self, event: WorkflowEventSchema, exchange: str):
"""
Send a given WorkflowEvent to the exchange
:param event: The marshmallow schema of the event to be sent
:param exchange: Name of the exchange the event will be sent to
"""
rendered = self.definition.schema.dump(event)
rendered = self.definition.schema().dump(event)
routing_key = self.definition.routing_key_for(event)
self.chan.basic_publish(routing_key=routing_key, body=rendered)
self.chan.basic_publish(exchange=exchange, routing_key=routing_key, body=str(rendered).encode('utf-8'))
def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False):
"""
......
......@@ -10,7 +10,7 @@ class WorkflowEventSchema(Schema):
job_name = fields.String()
job_id = fields.String()
event_type = fields.Method("get_type", deserialize="load_type")
timestamp = fields.DateTime()
timestamp = fields.String()
log = fields.String()
retval = fields.Integer(allow_none=True)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment