Skip to content
Snippets Groups Projects
Commit ecd4df00 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Connections can now be threaded with no errors

parent 2c2ebbbd
No related branches found
No related tags found
No related merge requests found
......@@ -81,10 +81,9 @@ class Channel(Generic[ChannelDef]):
self.chan: BlockingChannel = None
self.config: CapoConfig = None
def connect(self, threaded: bool = False, **kwargs: Union[int, str]):
def connect(self, **kwargs: Union[int, str]):
"""
Initialize connection to AMQP server given a CAPO profile
:param threaded: Specifies if the caller is a separate thread
Keyword arguments for the AMQP connection. These do not need to be specified:
:param: hostname: Hostname to connect to
......@@ -98,7 +97,7 @@ class Channel(Generic[ChannelDef]):
"""
global CONN
if not CONN or threaded is True:
if not CONN:
self.config = CapoConfig(profile=kwargs.get("profile", None)).settings(
"edu.nrao.archive.configuration.AmqpServer"
)
......@@ -115,8 +114,44 @@ class Channel(Generic[ChannelDef]):
),
)
CONN = pika.BlockingConnection(connection_parameters)
self.chan = CONN.channel()
self.chan = CONN.channel()
self.definition.declarations(self.chan)
@classmethod
def threaded_connect(cls, definition: ChannelDefinition[T], **kwargs: Union[int, str]):
"""
Initialize connection to AMQP server given a CAPO profile
:param definition: Channel definition
Keyword arguments for the AMQP connection. These do not need to be specified:
:param: hostname: Hostname to connect to
:param: port: Port to connect to
:param: connection_attempts: Number of connection attempts to try
:param: socket_timeout: Time to wait for a socket to connect
:param: retry_delay: Time to wait between retrying the connection
:param: username: Username to connect to as
:param: password: Password to use when connecting
:param: exchange: Exchange to use when connection
"""
self = cls(definition)
self.config = CapoConfig(profile=kwargs.get("profile", None)).settings(
"edu.nrao.archive.configuration.AmqpServer"
)
connection_parameters = pika.ConnectionParameters(
host=kwargs.get("hostname", self.config.hostname),
port=int(kwargs.get("port", self.config.port)),
connection_attempts=int(kwargs.get("connection_attempts", 5)),
socket_timeout=int(kwargs.get("socket_timeout", 5000)),
retry_delay=int(kwargs.get("retry_delay", 500)),
credentials=pika.PlainCredentials(
username=kwargs.get("username", self.config.username),
password=kwargs.get("password", self.config.password),
),
)
connection = pika.BlockingConnection(connection_parameters)
self.chan = connection.channel()
self.definition.declarations(self.chan)
return self
def close(self):
"""
......@@ -157,7 +192,8 @@ class Channel(Generic[ChannelDef]):
event = self.definition.schema().loads(message)
callback(event)
self.connect(threaded)
if not threaded:
self.connect()
queue = self.chan.queue_declare(queue="", exclusive=True).method.queue
self.chan.queue_bind(
queue=queue, exchange=self.definition.exchange(), routing_key=pattern
......
......@@ -137,6 +137,7 @@ class FutureProduct(FutureProductIF):
class PrepareAndRunWorkflow(CapabilityStep):
def execute(self, engine: CapabilityEngineIF, execution: CapabilityExecutionIF):
print("Executing workflow!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
workflow_name = self.step_value
workflow_args = self.step_args
# FIXME: Add support for files
......
......@@ -206,8 +206,8 @@ class CapabilityService(CapabilityServiceIF):
to update capability executions
:return:
"""
thread_workflow_events = Channel(WorkflowEventChannel())
thread_workflow_events.listen(callback=self.update_execution)
thread_workflow_events = Channel.threaded_connect(WorkflowEventChannel())
thread_workflow_events.listen(callback=self.update_execution, threaded=True)
class CapabilityInfo(CapabilityInfoIF):
......@@ -468,7 +468,7 @@ class WorkflowService(WorkflowServiceIF):
# Start listening for events from the wf_monitor stream
self.listener = threading.Thread(
target=lambda: workflow_events.listen(self.on_workflow_event, threaded=True)
target=lambda: Channel.threaded_connect(WorkflowEventChannel()).listen(self.on_workflow_event, threaded=True)
)
self.listener.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment