Skip to content
Snippets Groups Projects
messaging-system-design-implementation.rst 10.69 KiB

Messaging System Design and Implementation Plan

This page details the design and implementation plan for the Workspaces Messaging System refactor.

Problem Background

Messaging is a primary feature of the Workspaces system. It uses "events" (objects translated from received AMQP messages) to update state between the Capability and Workflow systems and to set off auxiliary functionality such as the Notification Service. It also enables a layer of persistence within the system ensuring that it can survive restarts, since events will stack up in the external AMQP server's message queues if the system isn't retrieving them.

The current implementation came about in the frenzied rush to WS-0.1 and was developed primarily as a means to solve the existing needs in the system, which lead to an impromptu, incomplete, and hard to scale end result. This rework serves as the effort to give that system the attention it needs to reliably serve all the goals of the system now and going forward.

Proposed Design Solution

Implement more organized AMQP setup

We will create an AMQP exchange for each primary service of the system (so far, that's just Capability and Workflow). Each exchange will have a queue for internal events that get sent and processed just within the subsystem (wf_rest_queue in the diagram below), and a queue for events that need to go to another subsystem (cp_wf_rest_queue in the diagram below). Messages can be sent to one or the other or both based on custom routing keys.

media/image1.png

Implement a new object: Router

Inspired by the publish/subscribe design pattern, this object will route received messages where they need to go. It will allow for registering methods as callbacks for specific messages (subscribers). This object will also act as the routing 'gateway' for messages to be sent on to the new Messenger object (described below) for submission to AMQP.

Use of Python decorators to identify subscriber methods

Decorator use will allow for easy identifiability of methods that require messages to execute properly. They will also simply the code by offloading some of the message parsing.

Implement a new object: Messenger

This object will consume messages from our AMQP queues and send them to its router instance to be routed to their correct destinations. This object will also be able to publish messages back to AMQP.

All interaction with this object will occur via its Router instance. No service should be connecting the Messenger object directly.

media/image2.png

Relationship between a subsystem (and any components thereof), its router, and its messenger

media/image3.png

Sequence diagram illustrating the process of sending and receiving messages to/from AMQP

media/image4.png

Relationship between the subsystems, routers, and messengers ("Kombu listener/broadcaster mixin thing")

Replace pika with kombu

Kombu will provide a simpler and more flexible interface between the workspaces system and the AMQP server as well as helpful "mixin" classes that provide robust AMQP connection handling and lifetime management. These classes can be inherited from and modified to correspond the needs of the system.

Implement Messaging priority

This will allow for high priority messages - such as pause_request or cancel_request or just high priority project processing - to be received and processed as fast as possible.

Work Required

WS-127 - Getting issue details... STATUS

  1. Refactor channels package into messaging package using kombu instead of pika for AMQP connection. WS-128
    • Ensure that kombu connects to AMQP server
    • Ensure kombu correctly generates the two service exchanges (capability and workflow)
    • Ensure kombu correctly generates the queues for each exchange. (Make sure that the queues are accessible for debugging, sometimes the current system locks us out)
  2. Implement Router object WS-130
    1. Implement callback registering (using decorators)
  1. Implement Messenger object WS-129
  2. Ensure that messages can be sent with the rewritten system from the subsystem through to the AMQP server (using the new Router/Messenger) WS-131
  3. Ensure that messages can be consumed from AMQP with the rewritten system (using the new Router/Messenger) WS-132
    1. ensure that received messages are passed correctly to the
      appropriate callback/consumer method
  4. Ensure that functionality dependent on messaging is carried out correctly. WS-133
  5. Ensure that the changes made by callback/consumer methods are persisted to the database correctly. WS-134

Diagram illustrating overall changes to the system:

media/image5.png

Message Patterns

All the messages should have these common fields:

  • service, the service that generated the event—either workflow  or capability
  • subject, a compound dictionary with:
    • type, the name of a model object class (e.g. CapabilityRequest , CapabilityExecution , WorkflowRequest )
    • id, the identifier of that model object class
    • other properties, whatever they may be for this object (will be generated by calling that instance's json  method)
  • type, the type of the event (do we need this?)
  • status, the current status of the subject (e.g. complete , executing , etc.)

Principles:

  1. Code-friendliness: Messages should be easy to pattern-match on and have useful fields for that kind of matching
  2. Human-readable: Messages should be somewhat human-friendly for debuggability
  3. Parsimonious: Do not include useless information in the message
  4. Receiver-friendly: Don't force the receiver to do a database lookup for something you had when you sent the message
  5. Efficient: Don't look up stuff in the database just to include it in the message

Known Events

Capability

event example message
execution starting {service: capability, subject: {type: CapabilityRequest, id: 23, capability_name: test_download, ...}, status: Executing}
step complete {service: capability, subject: {type: CapabilityExecution, id: 43, request_id: 23, ...}, status: Executing Step AWAIT WORKFLOW test_download} ? {service: capability, subject: {type: CapabilityExecution, id: 43, request_id: 23, ...}, status: Executing, type: Step Complete} ?
execution complete {service: capability, subject: {type: CapabilityExecution, id: 43, request_id: 23, ...}, status: Complete}
request complete {service: capability, subject: {type: CapabilityRequest, id: 23, ...}, status: Complete}

Workflow

event example message
request starting {service: workflow, subject: {type: WorkflowExecution, id: 91, workflow_name: test_download, ...}, status: Executing}
request complete {service: workflow, subject: {type: WorkflowExecution, id: 91, workflow_name: test_download, ...}, status: Complete}

Questions

  1. How do we encode "step complete" events? What information do we need to include in these?
  2. Do we need event types?
  3. What other information are we going to want to send in these events?
  4. Do we need timestamp-related information in these events?
  5. Does delivery send a message with the destination files got written to, or is there some other mechanism for passing that information back to the capability execution?
  6. How do we document the system as we add new events to it?