diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index bf3fdad02fe9ec6aaaf621c97a56cf42cff9dc3a..c02248d9e97c358bd06b28b96ba9c380d4396ccd 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -13,7 +13,7 @@ from pathlib import Path import pendulum from ._version import ___version___ as VERSION -from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message +from channels.amqp_helpers import workflow_events from workspaces.schema import WorkflowEvent, WorkflowEventType WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' @@ -239,23 +239,26 @@ def main(): # Parse command-line args args = make_arg_parser().parse_args() monitor = WorkflowMonitor(args.log_path) - - # Probably want to refactor this so that it doesn't create its own connection just for this, - # since creating connections is expensive - decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection) - decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel) - decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message) - - with decorated_connect(args.profile) as connection: - with decorated_config_channel( - connection=connection, - exchange=WORKFLOW_STATUS_EXCH, - exchange_type='topic' - ) as channel: - for event in monitor.events: - decorated_emit( - channel=channel, - exchange=WORKFLOW_STATUS_EXCH, - routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - msg=event.json() - ) + with workflow_events as wfe: + for event in monitor.events: + wfe.send(event) + + # # Probably want to refactor this so that it doesn't create its own connection just for this, + # # since creating connections is expensive + # decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection) + # decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel) + # decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message) + # + # with decorated_connect(args.profile) as connection: + # with decorated_config_channel( + # connection=connection, + # exchange=WORKFLOW_STATUS_EXCH, + # exchange_type='topic' + # ) as channel: + # for event in monitor.events: + # decorated_emit( + # channel=channel, + # exchange=WORKFLOW_STATUS_EXCH, + # routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + # msg=event.json() + # ) diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index 2203c0994ceff0683b940ab9930835a32562b686..c5b93a0b7340dd3ec91b4062ca240f8df6ee2dd2 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -9,6 +9,7 @@ from workspaces.json import WorkflowEventSchema from workspaces.schema import WorkflowEvent T = TypeVar('T', contravariant=True) +CONN = None class ChannelDefinition(Protocol[T]): @@ -53,12 +54,10 @@ ChannelDef = TypeVar('ChannelDef', bound=ChannelDefinition, covariant=True) class Channel(Generic[ChannelDef]): - CONNECTION : pika.BlockingConnection = None - CHANNEL : BlockingChannel = None - CONFIG : CapoConfig = None - def __init__(self, definition: ChannelDefinition[T]): - self.definition = definition + self.definition: ChannelDefinition[T] = definition + self.chan: BlockingChannel = None + self.config: CapoConfig = None def connect(self, **kwargs: Union[int, str]): """ @@ -74,32 +73,34 @@ class Channel(Generic[ChannelDef]): :param: password: Password to use when connecting :param: exchange: Exchange to use when connection """ - if not self.CONNECTION: - self.CONFIG = CapoConfig( + global CONN + + if not CONN: + self.config = CapoConfig( profile=kwargs.get('profile', None) ).settings('edu.nrao.archive.configuration.AmqpServer') connection_parameters = pika.ConnectionParameters( - host=kwargs.get('hostname', self.CONFIG.hostname), - port=int(kwargs.get('port', self.CONFIG.port)), + host=kwargs.get('hostname', self.config.hostname), + port=int(kwargs.get('port', self.config.port)), connection_attempts=int(kwargs.get('connection_attempts', 5)), socket_timeout=int(kwargs.get('socket_timeout', 5000)), retry_delay=int(kwargs.get('retry_delay', 500)), credentials=pika.PlainCredentials( - username=kwargs.get('username', self.CONFIG.username), - password=kwargs.get('password', self.CONFIG.password) + username=kwargs.get('username', self.config.username), + password=kwargs.get('password', self.config.password) ) ) - self.CONNECTION = pika.BlockingConnection(connection_parameters) - self.CHANNEL = self.CONNECTION.channel() - self.definition.declarations(self.CHANNEL) + CONN = pika.BlockingConnection(connection_parameters) + self.chan = CONN.channel() + self.definition.declarations(self.chan) def close(self): """ - Close connection to RabbitMQ server, if it's open + Close channel of connection to RabbitMQ server, if it's open """ - if self.CONNECTION: - self.CONNECTION.close() + if self.chan: + self.chan.close() def send(self, event: WorkflowEventSchema): """ @@ -108,7 +109,7 @@ class Channel(Generic[ChannelDef]): """ rendered = self.definition.schema.dump(event) routing_key = self.definition.routing_key_for(event) - self.CHANNEL.basic_publish(routing_key=routing_key, body=rendered) + self.chan.basic_publish(routing_key=routing_key, body=rendered) def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False): """ @@ -123,10 +124,10 @@ class Channel(Generic[ChannelDef]): event = self.definition.schema.load(message) callback(event) - queue = self.CHANNEL.queue_declare(queue='', exclusive=True).method.queue - self.CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern) - self.CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) - self.CHANNEL.start_consuming() + queue = self.chan.queue_declare(queue='', exclusive=True).method.queue + self.chan.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern) + self.chan.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) + self.chan.start_consuming() def __enter__(self, **kwargs: Union[int, str]): """ @@ -141,10 +142,11 @@ class Channel(Generic[ChannelDef]): :param: exchange: Exchange to use when connection """ self.connect(**kwargs) + return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # Predefined event channels for ease of use -workflow_events : Channel[WorkflowEvent] = Channel(WorkflowEventChannel()) +workflow_events: Channel[WorkflowEvent] = Channel(WorkflowEventChannel())