Skip to content
Snippets Groups Projects
Commit 877d9969 authored by Daniel Lyons's avatar Daniel Lyons Committed by Daniel Lyons
Browse files

Creating a sketch of the Mealy machine system

parent e90d444f
No related branches found
No related tags found
1 merge request!328Creating a sketch of the Mealy machine system
Pipeline #2175 passed
"""
Prototype of the state machine concept for capability execution.
The idea here is to replace the step sequence with a state machine. The state machine
reacts to certain events by triggering actions and going into another state.
"""
import abc
import json
class State(abc.ABC):
"""
A state that a machine could reside in.
"""
@abc.abstractmethod
def matches(self, other: "State") -> bool:
"""
This is most likely implemented by doing a string-equality test.
:param other: the other state to compare to
:return: true if we and the other state match
"""
pass
class Action(abc.ABC):
"""
An action to take upon performing a transition. We expect to see several implementations
of this interface:
- SendNotification(template, additional_args) that sends a notification with
the event and additional arguments
- StartWorkflow(workflow_name, additional_args) that starts a workflow with the
provided name, the event and additional arguments
"""
@abc.abstractmethod
def execute(self):
pass
class Pattern(abc.ABC):
@abc.abstractmethod
def matches(self, event: dict) -> bool:
"""
This is most likely going to be implemented as a JSON Path expression, using
the jsonpath-python library: https://pypi.org/project/jsonpath-python/
:param event: a JSON object
:return: true if this pattern matches that event object
"""
pass
class TransitionIF(abc.ABC):
"""
A transition between states
"""
def __init__(self, from_state: State, to_state: State, pattern: Pattern, action: Action):
self.from_state, self.to_state = from_state, to_state
self.pattern = pattern
self.action = action
@abc.abstractmethod
def matches(self, state: State, event: dict) -> bool:
"""
True if this transition is applicable in the supplied state and matches the supplied event.
:param state: state to check against
:param event: event to match against
:return: true if everything matches
"""
return self.from_state.matches(state) and self.pattern.matches(event)
@abc.abstractmethod
def take(self) -> State:
"""
Take this transition. Perform the action associated with this transition and then
reveal the new state to the caller.
:return: the new state
"""
self.action.execute()
return self.to_state
class MealyMachine:
"""
I am a state machine for a given capability. I am responsible for handling events
and transitioning to other states.
"""
def __init__(self):
self.transitions = []
self.current_state: State = None
def on_event(self, event: dict):
"""
Process an event and possibly take a transition.
This is more of an example of how this could be done than an efficient way
to do it.
:param event:
:return:
"""
# look through our transitions to see if we have one that matches
# for the state we're currently in
for transition in self.transitions:
# we're going off a "first match wins" algorithm here
# if multiple transitions match, make sure the most specific
# patterns occur earlier in the list of transitions
if transition.matches(self.current_state, event):
# take the transition
self.current_state = transition.take(self)
# do not look for another transition
break
class CapabilityInfoForMachines:
"""
This is a demonstration of the sort of query I expect we'll use to locate executions
that are active and need to be acted on in response to an event of some kind.
"""
def find_requests_matching_transition(self, event: dict) -> list["CapabilityExecution"]:
"""
The concept here is to let the database do the heavy lifting and actually tell us
whether there are any executions that are both in the right state and matching
this event, that we would therefore need to act on.
:param event: the event to check
:return: a list of matching capability executions
"""
return self.session.query("""
SELECT *
FROM transitions t
JOIN machines m ON t.machine_id = m.id
JOIN capabilities c ON c.machine_id = m.id
JOIN capability_requests cr on cr.capability_name = c.name
JOIN capability_executions ce on cr.capability_request_id = ce.capability_request_id
WHERE %(event)s @? t.pattern AND ce.state = t.from_state
""", {"event": json.dumps(event)})
def build_tables(self):
"""
This is just a demonstration method to hold some SQL to demo the tables I have
in mind for this system.
"""
self.session.execute("""
CREATE TABLE machines(id serial primary key);
CREATE TABLE actions(id serial primary key, action_type varchar, action_arguments json);
CREATE TABLE transitions (
id serial primary key,
machine_id integer references(machines),
from_state varchar,
to_state varchar,
pattern jsonpath,
action_id integer references(actions)
);
""")
class CapabilityExecution:
def process(self, event):
self.machine.on_event(event)
class CapabilityServiceMachineMananger:
"""
This is a demonstration of how to handle state transitions without keeping explicit
machines alive during the execution of the program. The idea here is to be more
efficient and more event-driven.
"""
def __init__(self):
self.info = CapabilityInfoForMachines()
def on_event(self, event: dict):
# we get an event, we just try and find matches for it
for execution in self.info.find_requests_matching_transition(event):
# now we have the execution in hand, we can process the event.
execution.process(event)
# another approach here would be to be more explicit about what
# transition matched and prevent the execution from really doing
# any real thinking, something like this (assuming "transition" is
# the matching transition, returned from the SQL alchemy query):
# execution.state = execution.transition.take(event)
# we don't need to hold this execution in memory, so nothing else has
# to be done here
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment