Skip to content

Instantly share code, notes, and snippets.

@jxskiss
Last active May 23, 2022 07:55
Show Gist options
  • Select an option

  • Save jxskiss/01816eec9a2b64bae341f4d07f58646e to your computer and use it in GitHub Desktop.

Select an option

Save jxskiss/01816eec9a2b64bae341f4d07f58646e to your computer and use it in GitHub Desktop.

Revisions

  1. jxskiss revised this gist Aug 28, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion example.py
    Original file line number Diff line number Diff line change
    @@ -83,7 +83,7 @@ def on_enter_verified(self, event):
    # ...


    class Item(models.Model):
    class Item(ItemMachineMixin, models.Model):
    status = models.CharField(max_length=16, default=ItemStatus.NEW)
    # many other fields

  2. jxskiss revised this gist Aug 26, 2017. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions example.py
    Original file line number Diff line number Diff line change
    @@ -74,18 +74,18 @@ class ItemMachineMixin(object):
    # raise

    def check_review_ready(self):
    pass
    pass

    def on_enter_verified(self, event):
    pass
    def on_enter_verified(self, event):
    pass

    # other conditions and callbacks
    # other conditions and callbacks
    # ...


    class Item(models.Model):
    status = models.CharField(max_length=16, default=ItemStatus.NEW)
    # many other fields
    status = models.CharField(max_length=16, default=ItemStatus.NEW)
    # many other fields


    # using the state machine some where
  3. jxskiss revised this gist Aug 26, 2017. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions fysom_singleton.py
    Original file line number Diff line number Diff line change
    @@ -111,9 +111,9 @@ def _add(trans):
    src = [src]

    ev = {'source': set(src), 'dest': trans['dest']}
    trans = trans.get('conditions')
    if trans:
    ev['conditions'] = trans
    conditions = trans.get('conditions')
    if conditions:
    ev['conditions'] = conditions

    self._map[event] = ev

  4. jxskiss revised this gist Aug 26, 2017. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions example.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,5 @@
    from django.db import models
    from fysom_singleton import *


    class ItemStatus(object):
  5. jxskiss created this gist Aug 26, 2017.
    97 changes: 97 additions & 0 deletions example.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    from django.db import models


    class ItemStatus(object):
    NEW = 'new'
    NEED_INFO = 'need_info'
    REVIEWING = 'reviewing'
    REDOING = 'redoing'
    CONFLICT = 'conflict'
    VERIFIED = 'verified'
    DELETED = 'deleted'

    SM_STATES = [
    NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED, DELETED
    ]

    SM_TRANSITIONS = [
    # trigger, source, destination
    ['sm_prepare_new', NEW, NEED_INFO],
    {
    'trigger': 'sm_commit_review',
    'source': NEED_INFO,
    'dest': REVIEWING,
    'conditions': [
    {'true': 'check_review_ready'},
    ],
    },
    {
    'trigger': 'sm_done_verified',
    'source': [REVIEWING, REDOING],
    'dest': VERIFIED,
    'conditions': [
    {'true': 'check_required_fields'},
    {'true': 'check_barcodes_valid'},
    {'true': 'check_no_conflict', 'else': CONFLICT},
    ],
    },
    ['sm_mark_conflict', [REVIEWING, REDOING], CONFLICT],
    ['sm_revert_verified', [VERIFIED, CONFLICT], REDOING],
    ['sm_require_info', [REVIEWING, REDOING], NEED_INFO],
    {
    'trigger': 'sm_mark_deleted',
    'source': [
    NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED
    ],
    'dest': DELETED
    },
    ['sm_revert_deleted', DELETED, REDOING],
    {
    'trigger': 'sm_update',
    'source': [NEW, NEED_INFO, REVIEWING, REDOING, VERIFIED],
    'dest': '=',
    }
    ]



    class ItemMachineMixin(object):

    sm = StateMachine(
    state_field='status',
    states=ItemStatus.SM_STATES,
    transitions=ItemStatus.SM_TRANSITIONS,
    )

    # def __getattribute__(self, item):
    # try:
    # return super(ItemMachineMixin, self).__getattribute__(item)
    # except AttributeError:
    # # proxy transition calling to state machine
    # if item.startswith('sm_'):
    # return partial(getattr(self.sm, item), self)
    # raise

    def check_review_ready(self):
    pass

    def on_enter_verified(self, event):
    pass

    # other conditions and callbacks
    # ...


    class Item(models.Model):
    status = models.CharField(max_length=16, default=ItemStatus.NEW)
    # many other fields


    # using the state machine some where
    obj = Item()
    obj.sm.sm_prepare_new(obj)

    # the `obj.sm.sm_prepaare_new(obj)` looks ugly
    # by overriding the `__getattribute__`` method of ItemMachineMixin class
    # demonstrated above, it can be used like this:
    obj.sm_prepare_new()
    279 changes: 279 additions & 0 deletions fysom_singleton.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,279 @@
    # -*- coding:utf-8 -*-
    #
    # fysom - pYthOn Finite State Machine - this is a port of Jake
    # Gordon's javascript-state-machine to python
    # https://github.com/jakesgordon/javascript-state-machine
    #
    # Modified from "fysom" by wsh <jxskiss@126.com>
    #

    """
    Modified from https://github.com/mriehl/fysom
    Main difference with the original fysom:
    - the StateMachine class in this module is targeted to be used as a
    singleton, the object being processed is passed to related methods
    as an argument
    - customized for django model integration
    - this module has less feature support than the original fysom:
    * no initial state and final state support
    * no callbacks support
    - this module implements conditions and conditional transitions support
    NOTE: this module is not heavily tested, especially for edge cases.
    """


    class StateMachineError(Exception):
    pass


    class TransitionInvalidError(StateMachineError):
    pass


    class TransitionCanceledError(StateMachineError):
    pass


    class StateMachine(object):
    """
    Conditions and callbacks execution order:
    conditions
    on_before_<event>
    on_exit_<state>
    <<STATE CHANGE>>
    on_change_state
    on_enter_<state>
    on_after_<event>
    When dest state is same with source state:
    conditions
    on_before_<event>
    on_reenter_<state>
    on_after_<event>
    """

    WILDCARD = '*'
    SAME_DEST = '='

    def __init__(self, state_field, states, transitions):
    self.state_field = state_field
    # events registry =>
    # {
    # event_name: {
    # "source": set(),
    # "dest": dest,
    # "conditions": [
    # {"true/false": "condition_method"},
    # {"true/false": "condition_method", "dest": conditional_dest},
    # ],
    # },
    # }
    self._map = {}
    self.events = []
    self.states = states

    cfg = {
    'transitions': transitions,
    }
    self._build_machine(cfg)

    def is_state(self, obj, state):
    return getattr(obj, self.state_field) == state

    def can(self, obj, event):
    return (
    event in self._map and
    ((getattr(obj, self.state_field) in self._map[event]['source']) or
    self.WILDCARD in self._map[event]['source'])
    )

    def cannot(self, obj, event):
    return not self.can(obj, event)

    def _build_machine(self, cfg):
    transitions = cfg.get('transitions', [])

    def _add(trans):
    event = trans['trigger']
    if event in self._map:
    raise StateMachineError(
    'Improperly configured transitions, event %s '
    'already registered' % event)

    src = trans['source']
    if src == self.WILDCARD:
    src = [self.WILDCARD]
    elif self._is_base_string(src):
    src = [src]

    ev = {'source': set(src), 'dest': trans['dest']}
    trans = trans.get('conditions')
    if trans:
    ev['conditions'] = trans

    self._map[event] = ev

    # Construct all transition handlers
    for trans in transitions:
    if isinstance(trans, list):
    trans = {'trigger': trans[0],
    'source': trans[1], 'dest': trans[2]}
    _add(trans)
    event = trans['trigger']
    self.events.append(event)
    setattr(self, event, self._build_event(event))

    def _build_event(self, event):
    """
    For every event in the state machine, prepares the event handler.
    """
    def fn(obj, *args, **kwargs):
    current_state = getattr(obj, self.state_field)
    # Check if this event can be triggered in the current state.
    if not self.can(obj, event):
    raise TransitionInvalidError(
    'event %s inappropriate in current state %s'
    % (event, current_state))

    # On event occurrence, source will always be the current state.
    src = current_state
    # dest may change during checking conditions
    dst = self._map[event]['dest']
    if dst == self.SAME_DEST:
    dst = src

    # Check transition conditions first.
    for c in self._map[event].get('conditions', ()):
    target = 'true' if 'true' in c else 'false'
    _c_r = self._check_condition(obj, target, c[target])
    if not _c_r:
    if 'else' in c:
    dst = c['else']
    break
    else:
    raise TransitionCanceledError(
    'Cannot trigger event {0} because the {1} '
    'condition returns False'.format(
    event, c[target])
    )

    # Prepares the object with all the meta data to be passed to
    # callbacks.
    e = self._event_obj()
    e.fsm, e.obj, e.event, e.src, e.dst = self, obj, event, src, dst
    e.args, e.kwargs = args, kwargs
    # used to share object saving status between callbacks
    e.obj_has_saved = False

    # Try to trigger the before event, unless it gets canceled.
    if self._before_event(obj, e) is False:
    raise TransitionCanceledError(
    'Cannot trigger event {0} because the on_before_{0} '
    'handler returns False'.format(event)
    )

    # Wraps the activities that must constitute a single successful
    # transaction.
    if src != dst:
    def _trans():
    delattr(obj, '_sm_transition')
    setattr(obj, self.state_field, dst)
    self._change_state(obj, e)
    self._enter_state(obj, e)
    self._after_event(obj, e)

    obj._sm_transition = _trans

    # Hook to perform asynchronous transition
    if self._exit_state(obj, e) is not False:
    obj._sm_transition()
    else:
    self._reenter_state(obj, e)
    self._after_event(obj, e)

    fn.__name__ = str(event)
    fn.__doc__ = (
    "Event handler for an {event} event. This event can be "
    "fired if the machine is in {states} states.".format(
    event=event, states=self._map[event].keys()))

    return fn

    @staticmethod
    def _check_condition(obj, target, func):
    if not hasattr(obj, func):
    raise AttributeError("obj %s hasn't method %s" % (obj, func))
    r = getattr(obj, func)()
    if target == 'true':
    return r is True
    else: # false
    return r is False

    @staticmethod
    def _before_event(obj, ev):
    fn_name = 'on_before_' + ev.event
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    @staticmethod
    def _after_event(obj, ev):
    fn_name = 'on_after_' + ev.event
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    @staticmethod
    def _enter_state(obj, ev):
    fn_name = 'on_enter_' + ev.dst
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    @staticmethod
    def _exit_state(obj, ev):
    fn_name = 'on_exit_' + ev.src
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    @staticmethod
    def _reenter_state(obj, ev):
    fn_name = 'on_reenter_' + ev.dst
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    @staticmethod
    def _change_state(obj, ev):
    fn_name = 'on_change_state'
    if hasattr(obj, fn_name):
    return getattr(obj, fn_name)(ev)

    def trigger(self, obj, event, *args, **kwargs):
    """
    Triggers the given event.
    The event can be triggered by calling the event handler directly,
    for ex: fsm.eat(), but this method will come in handy if the event is
    determined dynamically and you have the event name to trigger as
    a string.
    """
    if not hasattr(self, event):
    raise StateMachineError(
    "There isn't any event registered as %s" % event)
    return getattr(self, event)(obj, event, *args, **kwargs)

    @staticmethod
    def _is_base_string(obj):
    """
    Returns if the object is an instance of basestring.
    """
    return isinstance(obj, str)

    class _event_obj(object):
    """
    Event object.
    Attributes:
    fsm, obj, event, src, dst, args, kwargs, obj_has_saved
    """
    pass