diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py
index 4b9cf84820748c3042bfe66c4975666ffb7334ba..3b509e32f310600520ba49a14bb1bf057d1dbf40 100644
--- a/shared/channels/src/channels/amqp_helpers.py
+++ b/shared/channels/src/channels/amqp_helpers.py
@@ -1,4 +1,4 @@
-from typing import Callable, Any
+from typing import Callable, Any, Optional, Union
 
 import pika
 from marshmallow import Schema
@@ -6,115 +6,98 @@ from pycapo import CapoConfig
 from workspaces.json import WorkflowEventSchema
 
 
-def make_amqp_connection(profile: str) -> pika.BlockingConnection:
-    """
-    Initialize connection to AMQP server given a CAPO profile
-    :param profile: Capo profile to use
-    :return: Established connection
-    """
-    capo_cfg = CapoConfig(profile=profile)
-
-    hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
-    username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
-    password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
-
-    return pika.BlockingConnection(
-        pika.ConnectionParameters(
-            host=hostname,
-            credentials=pika.PlainCredentials(
-                username=username,
-                password=password
-            )
-        )
-    )
-
-
-def configure_amqp_channel(
-        connection: pika.BlockingConnection,
-        exchange: str,
-        exchange_type: str
-) -> pika.adapters.blocking_connection.BlockingChannel:
-    """
-    Takes in a connection and exchange declaration parameters and creates a channel to the given
-    exchange, configured with the given parameters
-    :param connection: RabbitMQ BlockingConnection
-    :param exchange: Exchange name
-    :param exchange_type: Exchange type
-    :return: Channel for given connection
-    """
-    channel = connection.channel()
-    channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
-    return channel
-
-
-def emit_amqp_message(
-        channel: pika.adapters.blocking_connection.BlockingChannel,
-        exchange: str,
-        routing_key: str,
-        msg: bytes
-):
-    """
-    Emit AMQP message to a given exchange through a given channel
-    :param channel: AMQP connection channel
-    :param exchange: AMQP exchange name
-    :param routing_key: Routing key
-    :param msg: Message to be emitted
-    """
-    channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg)
-
-
-def receive_amqp_message(
-        channel: pika.adapters.blocking_connection.BlockingChannel,
-        exchange: str,
-        routing_key: str,
-        callback: Callable,
-        auto_ack: bool = False
-):
-    """
-    Establishes queue and binds it to a given channel and consumes messages matching the
-    routing key from given exchange
-    :param channel: AMQP connection channel
-    :param exchange: AMQP exchange name
-    :param routing_key: Routing key
-    :param callback: Callback function for when a message is consumed
-    :param auto_ack: If true, consumer automatically acknowledges when it consumes a message
-    """
-    queue_result = channel.queue_declare(queue='', exclusive=True)
-    queue_name = queue_result.method.queue
-    channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
-    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
-    channel.start_consuming()
+CONNECTION = None
+CHANNEL = None
+CONFIG = None
 
 
 class Channel:
     def __init__(self, schema: Schema):
         self.schema = schema
 
-    def connect(self):
-        # FIXME implement this
-        pass
+    def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
+        """
+        Initialize connection to AMQP server given a CAPO profile
+        :return: Established connection
+        """
+        global CONNECTION
+        global CHANNEL
+        global CONFIG
+
+        if not CONNECTION:
+            CONFIG = CapoConfig(
+                profile=kwargs.get('profile', None)
+            ).settings('edu.nrao.archive.configuration.AmqpServer')
+
+            connection_parameters = pika.ConnectionParameters(
+                host=kwargs.get('hostname', CONFIG.hostname),
+                port=int(kwargs.get('port', CONFIG.port)),
+                # FIXME: Copied from events. Do these args need to be cast to int?
+                connection_attempts=kwargs.get('connection_attempts', 5),
+                socket_timeout=kwargs.get('socket_timeout', 5000),
+                retry_delay=kwargs.get('retry_delay', 500),
+                #
+                credentials=pika.PlainCredentials(
+                    username=kwargs.get('username', CONFIG.username),
+                    password=kwargs.get('password', CONFIG.password)
+                )
+            )
+            CONNECTION = pika.BlockingConnection(connection_parameters)
+            CHANNEL = CONNECTION.channel()
+            CHANNEL.exchange_declare(
+                # FIXME: CAPO config does not currently support 'exchange' value
+                exchange=kwargs.get('exchange', CONFIG.exchange),
+                exchange_type='topic',
+                durable=True,
+                auto_delete=False
+            )
 
     def close(self):
-        # FIXME implement this
-        pass
+        if CONNECTION:
+            CONNECTION.close()
 
-    def send(self, event):
-        # I don't know how to type Event there for Python but it would be
-        # nice if we could, somehow.
+    def send(self, event: WorkflowEventSchema):
         rendered = self.schema.dump(event)
-        # FIXME: transmit rendered over the underlying channel
-        pass
+        routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}'
+        CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered))
+
+    def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
+        """
+        Establishes queue and binds it to a given channel and consumes messages matching the
+        routing key from given exchange
+        :param callback: Callback function for when a message is consumed
+        :param pattern: Pattern to be used as routing key
+        Optional keyword arguments
+        :param exchange: AMQP exchange name
+        :param auto_ack: If true, consumer automatically acknowledges when it consumes a message
+        """
+        exchange = kwargs.get('exchange', CONFIG.exchange)
+        auto_ack = kwargs.get('auto_ack', False)
 
-    def listen(self, callback: Callable[[Any], None], pattern: str = '#'):
         def unwrapping_callback(message):
             event = self.schema.load(message)
             callback(event)
         # FIXME: ask the underlying channel to call us back
         # but only for messages matching pattern
-        pass
 
-    def __enter__(self):
-        self.connect()
+        queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
+        CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern)
+        CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
+        CHANNEL.start_consuming()
+
+    def __enter__(self, **kwargs: Union[int, str]):
+        """
+        Keyword arguments for the AMQP connection. These do not need to be specified:
+        :param: hostname: Hostname to connect to
+        :param: port: Port to connect to
+        :param: connection_attempts: Number of connection attempts to try
+        :param: socket_timeout: Time to wait for a socket to connect
+        :param: retry_delay: Time to wait between retrying the connection
+        :param: username: Username to connect to as
+        :param: password: Password to use when connecting
+        :param: exchange: Exchange to use when connection
+        """
+        self.connect(**kwargs)
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.close()