Skip to content
Snippets Groups Projects
Commit cc135adb authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Refactored amqp_helpers.py to use a global connection object (I was

originally going to use a singleton pattern but ran into issues with
that)
parent 5a510249
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,7 @@ from pathlib import Path ...@@ -13,7 +13,7 @@ from pathlib import Path
import pendulum import pendulum
from ._version import ___version___ as VERSION from ._version import ___version___ as VERSION
from channels.amqp_helpers import make_amqp_connection, configure_amqp_channel, emit_amqp_message from channels.amqp_helpers import workflow_events
from workspaces.schema import WorkflowEvent, WorkflowEventType from workspaces.schema import WorkflowEvent, WorkflowEventType
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
...@@ -239,23 +239,26 @@ def main(): ...@@ -239,23 +239,26 @@ def main():
# Parse command-line args # Parse command-line args
args = make_arg_parser().parse_args() args = make_arg_parser().parse_args()
monitor = WorkflowMonitor(args.log_path) monitor = WorkflowMonitor(args.log_path)
with workflow_events as wfe:
# Probably want to refactor this so that it doesn't create its own connection just for this, for event in monitor.events:
# since creating connections is expensive wfe.send(event)
decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection)
decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel) # # Probably want to refactor this so that it doesn't create its own connection just for this,
decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message) # # since creating connections is expensive
# decorated_connect = log_decorator_factory('Making connection...')(make_amqp_connection)
with decorated_connect(args.profile) as connection: # decorated_config_channel = log_decorator_factory('Configuring channel...')(configure_amqp_channel)
with decorated_config_channel( # decorated_emit = log_decorator_factory('Emitting message...')(emit_amqp_message)
connection=connection, #
exchange=WORKFLOW_STATUS_EXCH, # with decorated_connect(args.profile) as connection:
exchange_type='topic' # with decorated_config_channel(
) as channel: # connection=connection,
for event in monitor.events: # exchange=WORKFLOW_STATUS_EXCH,
decorated_emit( # exchange_type='topic'
channel=channel, # ) as channel:
exchange=WORKFLOW_STATUS_EXCH, # for event in monitor.events:
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', # decorated_emit(
msg=event.json() # channel=channel,
) # exchange=WORKFLOW_STATUS_EXCH,
# routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
# msg=event.json()
# )
...@@ -9,6 +9,7 @@ from workspaces.json import WorkflowEventSchema ...@@ -9,6 +9,7 @@ from workspaces.json import WorkflowEventSchema
from workspaces.schema import WorkflowEvent from workspaces.schema import WorkflowEvent
T = TypeVar('T', contravariant=True) T = TypeVar('T', contravariant=True)
CONN = None
class ChannelDefinition(Protocol[T]): class ChannelDefinition(Protocol[T]):
...@@ -53,12 +54,10 @@ ChannelDef = TypeVar('ChannelDef', bound=ChannelDefinition, covariant=True) ...@@ -53,12 +54,10 @@ ChannelDef = TypeVar('ChannelDef', bound=ChannelDefinition, covariant=True)
class Channel(Generic[ChannelDef]): class Channel(Generic[ChannelDef]):
CONNECTION : pika.BlockingConnection = None
CHANNEL : BlockingChannel = None
CONFIG : CapoConfig = None
def __init__(self, definition: ChannelDefinition[T]): def __init__(self, definition: ChannelDefinition[T]):
self.definition = definition self.definition: ChannelDefinition[T] = definition
self.chan: BlockingChannel = None
self.config: CapoConfig = None
def connect(self, **kwargs: Union[int, str]): def connect(self, **kwargs: Union[int, str]):
""" """
...@@ -74,32 +73,34 @@ class Channel(Generic[ChannelDef]): ...@@ -74,32 +73,34 @@ class Channel(Generic[ChannelDef]):
:param: password: Password to use when connecting :param: password: Password to use when connecting
:param: exchange: Exchange to use when connection :param: exchange: Exchange to use when connection
""" """
if not self.CONNECTION: global CONN
self.CONFIG = CapoConfig(
if not CONN:
self.config = CapoConfig(
profile=kwargs.get('profile', None) profile=kwargs.get('profile', None)
).settings('edu.nrao.archive.configuration.AmqpServer') ).settings('edu.nrao.archive.configuration.AmqpServer')
connection_parameters = pika.ConnectionParameters( connection_parameters = pika.ConnectionParameters(
host=kwargs.get('hostname', self.CONFIG.hostname), host=kwargs.get('hostname', self.config.hostname),
port=int(kwargs.get('port', self.CONFIG.port)), port=int(kwargs.get('port', self.config.port)),
connection_attempts=int(kwargs.get('connection_attempts', 5)), connection_attempts=int(kwargs.get('connection_attempts', 5)),
socket_timeout=int(kwargs.get('socket_timeout', 5000)), socket_timeout=int(kwargs.get('socket_timeout', 5000)),
retry_delay=int(kwargs.get('retry_delay', 500)), retry_delay=int(kwargs.get('retry_delay', 500)),
credentials=pika.PlainCredentials( credentials=pika.PlainCredentials(
username=kwargs.get('username', self.CONFIG.username), username=kwargs.get('username', self.config.username),
password=kwargs.get('password', self.CONFIG.password) password=kwargs.get('password', self.config.password)
) )
) )
self.CONNECTION = pika.BlockingConnection(connection_parameters) CONN = pika.BlockingConnection(connection_parameters)
self.CHANNEL = self.CONNECTION.channel() self.chan = CONN.channel()
self.definition.declarations(self.CHANNEL) self.definition.declarations(self.chan)
def close(self): def close(self):
""" """
Close connection to RabbitMQ server, if it's open Close channel of connection to RabbitMQ server, if it's open
""" """
if self.CONNECTION: if self.chan:
self.CONNECTION.close() self.chan.close()
def send(self, event: WorkflowEventSchema): def send(self, event: WorkflowEventSchema):
""" """
...@@ -108,7 +109,7 @@ class Channel(Generic[ChannelDef]): ...@@ -108,7 +109,7 @@ class Channel(Generic[ChannelDef]):
""" """
rendered = self.definition.schema.dump(event) rendered = self.definition.schema.dump(event)
routing_key = self.definition.routing_key_for(event) routing_key = self.definition.routing_key_for(event)
self.CHANNEL.basic_publish(routing_key=routing_key, body=rendered) self.chan.basic_publish(routing_key=routing_key, body=rendered)
def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False): def listen(self, callback: Optional[Callable], pattern: str = '#', auto_ack: bool = False):
""" """
...@@ -123,10 +124,10 @@ class Channel(Generic[ChannelDef]): ...@@ -123,10 +124,10 @@ class Channel(Generic[ChannelDef]):
event = self.definition.schema.load(message) event = self.definition.schema.load(message)
callback(event) callback(event)
queue = self.CHANNEL.queue_declare(queue='', exclusive=True).method.queue queue = self.chan.queue_declare(queue='', exclusive=True).method.queue
self.CHANNEL.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern) self.chan.queue_bind(queue=queue, exchange=self.definition.exchange(), routing_key=pattern)
self.CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) self.chan.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
self.CHANNEL.start_consuming() self.chan.start_consuming()
def __enter__(self, **kwargs: Union[int, str]): def __enter__(self, **kwargs: Union[int, str]):
""" """
...@@ -141,10 +142,11 @@ class Channel(Generic[ChannelDef]): ...@@ -141,10 +142,11 @@ class Channel(Generic[ChannelDef]):
:param: exchange: Exchange to use when connection :param: exchange: Exchange to use when connection
""" """
self.connect(**kwargs) self.connect(**kwargs)
return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.close() self.close()
# Predefined event channels for ease of use # Predefined event channels for ease of use
workflow_events : Channel[WorkflowEvent] = Channel(WorkflowEventChannel()) workflow_events: Channel[WorkflowEvent] = Channel(WorkflowEventChannel())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment