Skip to content

Instantly share code, notes, and snippets.

@dbarnett
Created February 3, 2012 15:10
Show Gist options
  • Select an option

  • Save dbarnett/1730610 to your computer and use it in GitHub Desktop.

Select an option

Save dbarnett/1730610 to your computer and use it in GitHub Desktop.

Revisions

  1. @invalid-email-address Anonymous created this gist Feb 3, 2012.
    145 changes: 145 additions & 0 deletions jsonalchemy.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,145 @@
    import simplejson
    import sqlalchemy
    from sqlalchemy import String
    from sqlalchemy.ext.mutable import Mutable

    class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
    """Represents an immutable structure as a json-encoded string."""

    impl = String

    def process_bind_param(self, value, dialect):
    if value is not None:
    value = simplejson.dumps(value)
    return value

    def process_result_value(self, value, dialect):
    if value is not None:
    value = simplejson.loads(value)
    return value

    class MutationObj(Mutable):
    @classmethod
    def coerce(cls, key, value):
    if isinstance(value, dict) and not isinstance(value, MutationDict):
    return MutationDict.coerce(key, value)
    if isinstance(value, list) and not isinstance(value, MutationList):
    return MutationList.coerce(key, value)
    return value

    @classmethod
    def _listen_on_attribute(cls, attribute, coerce, parent_cls):
    key = attribute.key
    if parent_cls is not attribute.class_:
    return

    # rely on "propagate" here
    parent_cls = attribute.class_

    def load(state, *args):
    val = state.dict.get(key, None)
    if coerce:
    val = cls.coerce(key, val)
    state.dict[key] = val
    if isinstance(val, cls):
    val._parents[state.obj()] = key

    def set(target, value, oldvalue, initiator):
    if not isinstance(value, cls):
    value = cls.coerce(key, value)
    if isinstance(value, cls):
    value._parents[target.obj()] = key
    if isinstance(oldvalue, cls):
    oldvalue._parents.pop(target.obj(), None)
    return value

    def pickle(state, state_dict):
    val = state.dict.get(key, None)
    if isinstance(val, cls):
    if 'ext.mutable.values' not in state_dict:
    state_dict['ext.mutable.values'] = []
    state_dict['ext.mutable.values'].append(val)

    def unpickle(state, state_dict):
    if 'ext.mutable.values' in state_dict:
    for val in state_dict['ext.mutable.values']:
    val._parents[state.obj()] = key

    sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True)
    sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True)
    sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True)
    sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True)
    sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True)

    class MutationDict(MutationObj, dict):
    @classmethod
    def coerce(cls, key, value):
    """Convert plain dictionary to MutationDict"""
    self = MutationDict((k,MutationObj.coerce(key,v)) for (k,v) in value.items())
    self._key = key
    return self

    def __setitem__(self, key, value):
    dict.__setitem__(self, key, MutationObj.coerce(self._key, value))
    self.changed()

    def __delitem__(self, key):
    dict.__delitem__(self, key)
    self.changed()

    class MutationList(MutationObj, list):
    @classmethod
    def coerce(cls, key, value):
    """Convert plain list to MutationList"""
    self = MutationList((MutationObj.coerce(key, v) for v in value))
    self._key = key
    return self

    def __setitem__(self, idx, value):
    list.__setitem__(self, idx, MutationObj.coerce(self._key, value))
    self.changed()

    def __setslice__(self, start, stop, values):
    list.__setslice__(self, start, stop, (MutationObj.coerce(self._key, v) for v in values))
    self.changed()

    def __delitem__(self, idx):
    list.__delitem__(self, idx)
    self.changed()

    def __delslice__(self, start, stop):
    list.__delslice__(self, start, stop)
    self.changed()

    def append(self, value):
    list.append(self, MutationObj.coerce(self._key, value))
    self.changed()

    def insert(self, idx, value):
    list.insert(self, idx, MutationObj.coerce(self._key, value))
    self.changed()

    def extend(self, values):
    list.extend(self, (MutationObj.coerce(self._key, v) for v in values))
    self.changed()

    def pop(self, *args, **kw):
    value = list.pop(self, *args, **kw)
    self.changed()
    return value

    def remove(self, value):
    list.remove(self, value)
    self.changed()

    def JSONAlchemy(sqltype):
    """A type to encode/decode JSON on the fly
    sqltype is the string type for the underlying DB column.
    You can use it like:
    Column(JSONAlchemy(Text(600)))
    """
    class _JSONEncodedObj(JSONEncodedObj):
    impl = sqltype
    return MutationObj.as_mutable(_JSONEncodedObj)