Skip to content
Snippets Groups Projects
Commit 6a50207e authored by Janet Goldstein's avatar Janet Goldstein
Browse files

WS-828: remove unused statemachine.py

parent 6790485b
No related branches found
No related tags found
1 merge request!694WS-828: remove unused statemachine.py
Pipeline #3830 passed
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
"""
Prototype of the state machine concept for capability execution.
The idea here is to replace the step sequence with a state machine. The state machine
reacts to certain events by triggering actions and going into another state.
"""
import abc
import json
from typing import Dict, List, Optional
from workspaces.capability.schema import State, Transition
class State(abc.ABC):
"""
A state that a machine could reside in.
A state has a suite of transitions to other states. When an event comes in, we match against it; if we find a
matching pattern, we perform that transition to another state.
"""
def __init__(self, transitions: List[Transition]):
# We have a bit of a chicken-and-egg problem here, in that the State needs Transitions to be initialized but
# the Transition needs States to be initialized. Going from prototype to production here will mean breaking
# this cycle, possibly by introducing a builder of some kind, but for now we can just pretend that they are
# built successfully somehow.
self.transitions = transitions
def on_event(self, event: Dict) -> Optional[State]:
# Locate the first matching transition
matching_transition = None
for transition in self.transitions:
if transition.matches(event):
matching_transition = transition
break
# take this transition
if matching_transition is not None:
return matching_transition.take()
class Action(abc.ABC):
"""
An action to take upon performing a transition. We expect to see several implementations
of this interface:
- SendNotification(template, additional_args) that sends a notification with
the event and additional arguments
- StartWorkflow(workflow_name, additional_args) that starts a workflow with the
provided name, the event and additional arguments
"""
@abc.abstractmethod
def execute(self):
pass
class Pattern(abc.ABC):
@abc.abstractmethod
def matches(self, event: Dict) -> bool:
"""
This is most likely going to be implemented as a JSON Path expression, using
the jsonpath-python library: https://pypi.org/project/jsonpath-python/
:param event: a JSON object
:return: true if this pattern matches that event object
"""
pass
@abc.abstractmethod
def matches(self, event: Dict) -> bool:
"""
True if this transition is applicable in the supplied state and matches the supplied event.
:param state: state to check against
:param event: event to match against
:return: true if everything matches
"""
return self.pattern.matches(event)
@abc.abstractmethod
def take(self) -> State:
"""
Take this transition. Perform the action associated with this transition and then
reveal the new state to the caller.
:return: the new state
"""
self.action.execute()
return self.to_state
class MealyMachine:
"""
I am a state machine for a given capability. I am responsible for handling events
and transitioning to other states.
"""
def __init__(self):
self.transitions = []
self.current_state: State = None
def on_event(self, event: Dict):
"""
Process an event and possibly take a transition.
This is more of an example of how this could be done than an efficient way
to do it.
:param event:
:return:
"""
# look through our transitions to see if we have one that matches
# for the state we're currently in
for transition in self.transitions:
# we're going off a "first match wins" algorithm here
# if multiple transitions match, make sure the most specific
# patterns occur earlier in the list of transitions
if transition.matches(self.current_state, event):
# take the transition
self.current_state = transition.take(self)
# do not look for another transition
break
class CapabilityInfoForMachines:
"""
This is a demonstration of the sort of query I expect we'll use to locate executions
that are active and need to be acted on in response to an event of some kind.
"""
def find_requests_matching_transition(self, event: Dict) -> List["CapabilityExecution"]:
"""
The concept here is to let the database do the heavy lifting and actually tell us
whether there are any executions that are both in the right state and matching
this event, that we would therefore need to act on.
:param event: the event to check
:return: a list of matching capability executions
"""
return self.session.query(
"""
SELECT *
FROM transitions t
JOIN machines m ON t.machine_id = m.id
JOIN capabilities c ON c.machine_id = m.id
JOIN capability_requests cr on cr.capability_name = c.name
JOIN capability_executions ce on cr.capability_request_id = ce.capability_request_id
WHERE %(event)s @? t.pattern AND ce.state = t.from_state
""",
{"event": json.dumps(event)},
)
def build_tables(self):
"""
This is just a demonstration method to hold some SQL to demo the tables I have
in mind for this system.
"""
self.session.execute(
"""
CREATE TABLE machines(id serial primary key);
CREATE TABLE actions(id serial primary key, action_type varchar, action_arguments json);
CREATE TABLE transitions (
id serial primary key,
machine_id integer references machines(id),
from_state varchar,
to_state varchar,
pattern jsonpath,
action_id integer references actions(id)
);
"""
)
class CapabilityExecution:
machine: MealyMachine = None
def process(self, event):
self.machine.on_event(event)
class CapabilityServiceMachineMananger:
"""
This is a demonstration of how to handle state transitions without keeping explicit
machines alive during the execution of the program. The idea here is to be more
efficient and more event-driven.
"""
def __init__(self):
self.info = CapabilityInfoForMachines()
def on_event(self, event: Dict):
# we get an event, we just try and find matches for it
for execution in self.info.find_requests_matching_transition(event):
# now we have the execution in hand, we can process the event.
execution.process(event)
# another approach here would be to be more explicit about what
# transition matched and prevent the execution from really doing
# any real thinking, something like this (assuming "transition" is
# the matching transition, returned from the SQL alchemy query):
# execution.state = execution.transition.take(event)
# we don't need to hold this execution in memory, so nothing else has
# to be done here
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment