Messaging System Design and Implementation Plan
===============================================

This page details the design and implementation plan for the Workspaces
Messaging System refactor.

-  `Problem Background <#MessagingSystemDesignandImplementationP>`__

-  `Proposed Design
   Solution <#MessagingSystemDesignandImplementationP>`__

   -  `Implement more organized AMQP
      setup <#MessagingSystemDesignandImplementationP>`__

   -  `Implement a new object:
      Router <#MessagingSystemDesignandImplementationP>`__

      -  `Use of Python decorators to identify subscriber
         methods <#MessagingSystemDesignandImplementationP>`__

   -  `Implement a new object:
      Messenger <#MessagingSystemDesignandImplementationP>`__

   -  `Replace pika with
      kombu <#MessagingSystemDesignandImplementationP>`__

   -  `Implement Messaging
      priority <#MessagingSystemDesignandImplementationP>`__

-  `Work Required <#MessagingSystemDesignandImplementationP>`__

-  `Message Patterns <#MessagingSystemDesignandImplementationP>`__

   -  `Principles: <#MessagingSystemDesignandImplementationP>`__

   -  `Known Events <#MessagingSystemDesignandImplementationP>`__

      -  `Capability <#MessagingSystemDesignandImplementationP>`__

      -  `Workflow <#MessagingSystemDesignandImplementationP>`__

   -  `Questions <#MessagingSystemDesignandImplementationP>`__

Problem Background
------------------

Messaging is a primary feature of the Workspaces system. It uses
"events" (objects translated from received AMQP messages) to update
state between the Capability and Workflow systems and to set off
auxiliary functionality such as the Notification Service. It also
enables a layer of persistence within the system ensuring that it can
survive restarts, since events will stack up in the external AMQP
server's message queues if the system isn't retrieving them.

The current implementation came about in the frenzied rush to WS-0.1 and
was developed primarily as a means to solve the existing needs in the
system, which lead to an impromptu, incomplete, and hard to scale end
result. This rework serves as the effort to give that system the
attention it needs to reliably serve all the goals of the system now and
going forward.

Proposed Design Solution
------------------------

Implement more organized AMQP setup
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

We will create an AMQP exchange for each primary service of the system
(so far, that's just Capability and Workflow). Each exchange will have a
queue for internal events that get sent and processed just within the
subsystem (wf_rest_queue in the diagram below), and a queue for events
that need to go to another subsystem (cp_wf_rest_queue in the diagram
below). Messages can be sent to one or the other or both based on custom
routing keys.

.. image:: media/image1.png

Implement a new object: Router
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Inspired by the publish/subscribe design pattern, this object will route
received messages where they need to go. It will allow for registering
methods as callbacks for specific messages (subscribers). This object
will also act as the routing 'gateway' for messages to be sent on to the
new Messenger object (described below) for submission to AMQP.

Use of Python decorators to identify subscriber methods
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Decorator use will allow for easy identifiability of methods that
require messages to execute properly. They will also simply the code by
offloading some of the message parsing.

Implement a new object: Messenger
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This object will consume messages from our AMQP queues and send them to
its router instance to be routed to their correct destinations. This
object will also be able to publish messages back to AMQP.

All interaction with this object will occur via its Router instance. No
service should be connecting the Messenger object directly.

.. image:: media/image2.png

Relationship between a subsystem (and any components thereof), its
router, and its messenger

.. image:: media/image3.png

Sequence diagram illustrating the process of sending and receiving
messages to/from AMQP

.. image:: media/image4.png

Relationship between the subsystems, routers, and messengers ("Kombu
listener/broadcaster mixin thing")

Replace pika with kombu
~~~~~~~~~~~~~~~~~~~~~~~

Kombu will provide a simpler and more flexible interface between the
workspaces system and the AMQP server as well as helpful "mixin" classes
that provide robust AMQP connection handling and lifetime management.
These classes can be inherited from and modified to correspond the needs
of the system.

Implement Messaging priority
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This will allow for high priority messages - such as pause_request or
cancel_request or just high priority project processing - to be received
and processed as fast as possible.

Work Required
-------------

`WS-127 <https://open-jira.nrao.edu/browse/WS-127>`__ - Getting issue
details... STATUS

1. Refactor channels package into messaging package using kombu instead
   of pika for AMQP connection.
   `WS-128 <https://open-jira.nrao.edu/browse/WS-128>`__

   -  Ensure that kombu connects to AMQP server

   -  Ensure kombu correctly generates the two service exchanges
      (capability and workflow)

   -  Ensure kombu correctly generates the queues for each exchange.
      (Make sure that the queues are accessible for debugging, sometimes
      the current system locks us out)

2. Implement Router object
   `WS-130 <https://open-jira.nrao.edu/browse/WS-130>`__

   1. Implement callback registering (using decorators)

1. Implement Messenger object
   `WS-129 <https://open-jira.nrao.edu/browse/WS-129>`__

2. Ensure that messages can be sent with the rewritten system from the
   subsystem through to the AMQP server (using the new Router/Messenger)
   `WS-131 <https://open-jira.nrao.edu/browse/WS-131>`__

3. Ensure that messages can be consumed from AMQP with the rewritten
   system (using the new Router/Messenger)
   `WS-132 <https://open-jira.nrao.edu/browse/WS-132>`__

   1. ensure that received messages are passed correctly to the
         appropriate callback/consumer method

4. Ensure that functionality dependent on messaging is carried out
   correctly. `WS-133 <https://open-jira.nrao.edu/browse/WS-133>`__

5. Ensure that the changes made by callback/consumer methods are
   persisted to the database correctly.
   `WS-134 <https://open-jira.nrao.edu/browse/WS-134>`__

Diagram illustrating overall changes to the system:

.. image:: media/image5.png

Message Patterns
----------------

All the messages should have these common fields:

-  **service**, the service that generated the event—either workflow  or
   capability 

-  **subject**, a compound dictionary with:

   -  **type**, the name of a model object class (e.g. CapabilityRequest
      , CapabilityExecution , WorkflowRequest )

   -  **id**, the identifier of that model object class

   -  **other properties**, whatever they may be for this object (will
      be generated by calling that instance's json  method)

-  **type**, the type of the event (do we need this?)

-  **status**, the current status of the subject (e.g. complete ,
   executing , etc.)

Principles:
~~~~~~~~~~~

1. **Code-friendliness:** Messages should be easy to pattern-match on
   and have useful fields for that kind of matching

2. **Human-readable:** Messages should be somewhat human-friendly for
   debuggability

3. **Parsimonious:** Do not include useless information in the message

4. **Receiver-friendly:** Don't force the receiver to do a database
   lookup for something you had when you sent the message

5. **Efficient:** Don't look up stuff in the database just to include it
   in the message

Known Events
~~~~~~~~~~~~

Capability
^^^^^^^^^^

+--------------------+------------------------------------------------+
| **event**          | **example message**                            |
+====================+================================================+
| execution starting | {service: capability, subject: {type:          |
|                    | CapabilityRequest, id: 23, capability_name:    |
|                    | test_download, ...}, status: Executing}        |
+--------------------+------------------------------------------------+
| step complete      | {service: capability, subject: {type:          |
|                    | CapabilityExecution, id: 43, request_id: 23,   |
|                    | ...}, status: Executing Step AWAIT WORKFLOW    |
|                    | test_download} ?                               |
|                    | {service: capability, subject: {type:          |
|                    | CapabilityExecution, id: 43, request_id: 23,   |
|                    | ...}, status: Executing, type: Step Complete}  |
|                    | ?                                              |
+--------------------+------------------------------------------------+
| execution complete | {service: capability, subject: {type:          |
|                    | CapabilityExecution, id: 43, request_id: 23,   |
|                    | ...}, status: Complete}                        |
+--------------------+------------------------------------------------+
| request complete   | {service: capability, subject: {type:          |
|                    | CapabilityRequest, id: 23, ...}, status:       |
|                    | Complete}                                      |
+--------------------+------------------------------------------------+

Workflow
^^^^^^^^

+------------------+--------------------------------------------------+
| **event**        | **example message**                              |
+==================+==================================================+
| request starting | {service: workflow, subject: {type:              |
|                  | WorkflowExecution, id: 91, workflow_name:        |
|                  | test_download, ...}, status: Executing}          |
+------------------+--------------------------------------------------+
| request complete | {service: workflow, subject: {type:              |
|                  | WorkflowExecution, id: 91, workflow_name:        |
|                  | test_download, ...}, status: Complete}           |
+------------------+--------------------------------------------------+

Questions
~~~~~~~~~

1. How do we encode "step complete" events? What information do we need
   to include in these?

2. Do we need event types?

3. What other information are we going to want to send in these events?

4. Do we need timestamp-related information in these events?

5. Does delivery send a message with the destination files got written
   to, or is there some other mechanism for passing that information
   back to the capability execution?

6. How do we document the system as we add new events to it?