Skip to content

Instantly share code, notes, and snippets.

@prostoalex
Forked from temoto/helpers_data.py
Created February 2, 2014 06:35
Show Gist options
  • Select an option

  • Save prostoalex/8763862 to your computer and use it in GitHub Desktop.

Select an option

Save prostoalex/8763862 to your computer and use it in GitHub Desktop.

Revisions

  1. @temoto temoto revised this gist Nov 20, 2013. 1 changed file with 63 additions and 0 deletions.
    63 changes: 63 additions & 0 deletions helpers_data.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,63 @@
    def namedlist(typename, field_names):
    """Returns a new subclass of list with named fields.
    >>> Point = namedlist('Point', ('x', 'y'))
    >>> Point.__doc__ # docstring for the new class
    'Point(x, y)'
    >>> p = Point(11, y=22) # instantiate with positional args or keywords
    >>> p[0] + p[1] # indexable like a plain list
    33
    >>> x, y = p # unpack like a regular list
    >>> x, y
    (11, 22)
    >>> p.x + p.y # fields also accessable by name
    33
    >>> d = p._asdict() # convert to a dictionary
    >>> d['x']
    11
    >>> Point(**d) # convert from a dictionary
    Point(x=11, y=22)
    >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
    Point(x=100, y=22)
    """
    fields_len = len(field_names)
    fields_text = repr(tuple(field_names)).replace("'", "")[1:-1] # tuple repr without parens or quotes

    class ResultType(list):
    __slots__ = ()
    _fields = field_names

    def _fixed_length_error(*args, **kwargs):
    raise TypeError(u"Named list has fixed length")
    append = _fixed_length_error
    insert = _fixed_length_error
    pop = _fixed_length_error
    remove = _fixed_length_error

    def sort(self):
    raise TypeError(u"Sorting named list in place would corrupt field accessors. Use sorted(x)")

    def _replace(self, **kwargs):
    values = map(kwargs.pop, field_names, self)
    if kwargs:
    raise TypeError(u"Unexpected field names: {s!r}".format(kwargs.keys()))

    if len(values) != fields_len:
    raise TypeError(u"Expected {e} arguments, got {n}".format(e=fields_len, n=len(values)))

    return ResultType(*values)

    def __repr__(self):
    items_repr=", ".join("{name}={value!r}".format(name=name, value=value)
    for name, value in zip(field_names, self))
    return "{typename}({items})".format(typename=typename, items=items_repr)

    ResultType.__init__ = eval("lambda self, {fields}: self.__setitem__(slice(None, None, None), [{fields}])".format(fields=fields_text))
    ResultType.__name__ = typename

    for i, name in enumerate(field_names):
    fget = eval("lambda self: self[{0:d}]".format(i))
    fset = eval("lambda self, value: self.__setitem__({0:d}, value)".format(i))
    setattr(ResultType, name, property(fget, fset))

    return ResultType
  2. @temoto temoto revised this gist Sep 9, 2013. 1 changed file with 370 additions and 0 deletions.
    370 changes: 370 additions & 0 deletions helpers_db_psycopg2.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,370 @@
    """PostgreSQL DB helpers.
    Uses two pools for PostgreSQL DB connections: one pool for connections in autocommit mode used by execute(),
    one pool for connections in transaction mode.
    Interface::
    * autocommit() -> context manager, returns psycopg2.Cursor (in autocommit mode)
    * execute(statement, params=None, repeat=True) -> psycopg2.Cursor
    * transaction() -> context manager, returns psycopg2.Cursor inside explicit transaction
    """
    import collections
    import contextlib
    import eventlet
    import eventlet.db_pool
    import logbook
    # import logging
    import psycopg2
    import psycopg2.extensions
    import psycopg2.extras
    import psycopg2.pool
    import random
    import time

    try:
    import sqlalchemy
    from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect
    _sa_class = sqlalchemy.sql.ClauseElement
    _sa_dialect = PGDialect()
    except ImportError:
    _sa_class = None
    _sa_dialect = None


    # Select logbook or logging here
    log = logbook.Logger('db')
    # log = logging.getLogger('db')


    class Error(Exception):
    pass


    class DictCursor(psycopg2.extras.DictCursor):

    def execute(self, statement, params=None, record_type=None,
    _sa_class=_sa_class, _sa_dialect=_sa_dialect):
    """Psycopg2.Cursor.execute wrapped with query time logging.
    Returns self, so you can chain it with fetch* methods, etc.
    """
    if _sa_class is not None and isinstance(statement, _sa_class):
    compiled = statement.compile(dialect=_sa_dialect)
    statement, params = compiled.string, compiled.params

    self.connection.notices[:] = []
    error = None
    start = time.time()
    try:
    super(DictCursor, self).execute(statement, params)
    except psycopg2.Error as error:
    pass
    total = round(time.time() - start, 3)

    for notice in self.connection.notices:
    log.notice(notice.strip().decode('utf-8', 'replace'))
    if notice == "WARNING: there is already a transaction in progress\n":
    raise Error(u"Nested BEGIN inside transaction. Aborting possibly broken code.")

    sql = (self.mogrify(statement, params)
    if not statement.lower().startswith("insert")
    else statement).decode('utf-8', 'replace')
    sql_id = id(sql)
    log.info(u"Query [{time:.3f}] id={id} {sql}".format(
    time=total, id=sql_id, sql=sql))

    if error is not None:
    raise error

    return self

    def executemany(self, statement, parameters):
    return super(DictCursor, self).executemany(statement, parameters)

    def callproc(self, procname, parameters):
    return super(DictCursor, self).callproc(procname, parameters)

    def scalar(self):
    row = self.fetchone()
    if row is None:
    return None
    return row[0]


    class NamedTupleCursor(psycopg2.extras.NamedTupleCursor):

    EmptyRecord = namedtuple("Record", ())

    def execute(self, statement, params=None, record_type=None,
    _sa_class=_sa_class, _sa_dialect=_sa_dialect):
    """Psycopg2.Cursor.execute wrapped with query time logging.
    Returns cursor object, so you can chain it with fetch* methods, etc.
    """
    if _sa_class is not None and isinstance(statement, _sa_class):
    compiled = statement.compile(dialect=_sa_dialect)
    statement, params = compiled.string, compiled.params

    self.connection.notices[:] = []
    error = None
    start = time.time()
    try:
    super(NamedTupleCursor, self).execute(statement, params)
    except psycopg2.Error as error:
    pass
    total = round(time.time() - start, 3)

    for notice in self.connection.notices:
    log.notice(notice.strip().decode('utf-8', 'replace'))
    if notice == "WARNING: there is already a transaction in progress\n":
    raise DbError(u"Nested BEGIN inside transaction. Aborting possibly broken code.")

    sql = (self.mogrify(statement, params)
    if not statement.lower().startswith("insert")
    else statement).decode('utf-8', 'replace')
    sql_id = id(sql)
    log.info(u"Query [{time:.3f}] id={id} {sql}".format(
    time=total, id=sql_id, sql=sql))

    if error is not None:
    raise error

    self.Record = record_type
    return self

    def executemany(self, statement, parameters):
    return super(NamedTupleCursor, self).executemany(statement, parameters)

    def callproc(self, procname, parameters):
    return super(NamedTupleCursor, self).callproc(procname, parameters)

    def scalar(self):
    row = self.fetchone()
    if row is None:
    return None
    return row[0]

    def _make_nt(self, _namedtuple=namedtuple):
    if not self.description:
    return NamedTupleCursor.EmptyRecord
    columns = [d[0] if d[0] != "?column?" else "column" + str(i)
    for i, d in enumerate(self.description, 1)]
    return _namedtuple("Record", columns)


    # Select default cursor class here
    default_cursor_class = DictCursor
    # default_cursor_class = NamedTupleCursor


    class ReadCursor(object):

    """Read-only cursor-like object.
    """
    rowcount = property(lambda self: self._rowcount)

    def __init__(self, rows, rowcount):
    self._rows = rows
    self._rowcount = rowcount

    def fetchone(self):
    if self._rows is None:
    raise psycopg2.ProgrammingError("no results to fetch")
    if self._rowcount == 0:
    return None
    return self._rows[0]

    def fetchmany(self, size=None):
    if self._rows is None:
    raise psycopg2.ProgrammingError("no results to fetch")
    if size is None:
    return self._rows
    return self._rows[:size]

    def fetchall(self):
    if self._rows is None:
    raise psycopg2.ProgrammingError("no results to fetch")
    return self._rows

    def __iter__(self):
    if self._rows is None:
    raise psycopg2.ProgrammingError("no results to fetch")
    return iter(self._rows)

    def scalar(self):
    if self._rows is None:
    raise psycopg2.ProgrammingError("no results to fetch")
    if self._rowcount == 0:
    return None
    return self._rows[0][0]


    class Connection(psycopg2.extensions.connection):

    def commit(self):
    start = time.time()
    super(Connection, self).commit()
    total = time.time() - start
    log.info(u"Commit [{time:.3f}]".format(time=total))

    def rollback(self):
    start = time.time()
    super(Connection, self).rollback()
    total = time.time() - start
    log.info(u"Rollback [{time:.3f}]".format(time=total))

    def cursor(self, _klass=default_cursor_class, *args, **kwargs):
    return super(Connection, self).cursor(
    *args, cursor_factory=_klass, **kwargs)


    class EventletConnectionPool(eventlet.db_pool.RawConnectionPool):

    def connect(self, db_module, timeout, *args, **kwargs):
    try:
    connection = super(EventletConnectionPool, self).connect(db_module, timeout, *args, **kwargs)
    except psycopg2.OperationalError:
    raise
    # Note: makes round-trip to DB. Only required for new connections.
    connection.autocommit = True
    return connection

    @contextlib.contextmanager
    def item(self):
    close = True
    conn = self.get()
    # Note: makes round-trip to DB. Only required for new connections.
    conn.autocommit = True
    try:
    yield conn
    # no error
    close = False
    finally:
    if close:
    conn._base.close()
    self.put(conn)


    class ThreadConnectionPool(psycopg2.pool.ThreadedConnectionPool):

    @contextlib.contextmanager
    def item(self):
    close = True
    conn = self.getconn()
    # Note: makes round-trip to DB. Only required for new connections.
    conn.autocommit = True
    try:
    yield conn
    # no error
    close = False
    finally:
    self.putconn(conn, close=close or conn.closed)


    def is_connection_error(e):
    """Exception object -> True | False
    """
    if not isinstance(e, psycopg2.DatabaseError):
    return False
    error_str = str(e)
    MSG1 = "socket not open"
    MSG2 = "server closed the connection unexpectedly"
    MSG3 = "could not connect to server"
    return MSG1 in error_str or MSG2 in error_str or MSG3 in error_str


    # TODO: override this if necessary
    def get_connection_pool(group):
    # the most straightforward threaded pool built-in psycopg2
    return ThreadConnectionPool(
    minconn=0, maxconn=10,
    dsn=POSTGRESQL_DSN, connection_factory=Connection)

    # eventlet pool
    # return EventletConnectionPool(
    # psycopg2, min_size=0, max_size=10, max_idle=10, max_age=60,
    # dsn=POSTGRESQL_DSN, connection_factory=Connection)

    # pre initialized pools for different groups of database servers
    # return group_map[group]


    @contextlib.contextmanager
    def autocommit(group='default', connection_pool=None):
    """Context manager.
    Executes block with new cursor from pooled connection in autocommit mode. Returns cursor.
    At the end of the block, the connection is returned to pool.
    >>> with autocommit() as cursor:
    ... cursor.execute("select 1")
    ... cursor.execute("select 2")
    Use it when you do several selects and don't want to waste time for final ROLLBACK.
    """
    pool = connection_pool or get_connection_pool(group)

    with pool.item() as connection:
    cursor = connection.cursor()
    yield cursor


    def execute(statement, params=None, group='default', connection_pool=None, repeat=True, record_type=None):
    """Shortcut for
    1. get connection from pool, create new cursor
    2. cursor.execute(statement, params)
    3. cursor.fetchall() (if possible)
    4. return connection to pool
    Returns read-only cursor with rows.
    On disconnect, if `repeat is True` attempts reconnect and repeats function call one more time.
    If second attempt fails, raises exception.
    """
    pool = connection_pool or get_connection_pool(group)

    with pool.item() as connection:
    try:
    cursor = connection.cursor()
    cursor.execute(statement, params, record_type=record_type)

    rows = None
    rowcount = cursor.rowcount
    try:
    rows = cursor.fetchall()
    except psycopg2.ProgrammingError as e:
    if str(e) != "no results to fetch":
    raise
    return ReadCursor(rows, rowcount)
    except psycopg2.DatabaseError as e:
    if repeat and is_connection_error(e):
    log.warning(u"execute() DB disconnect, repeating query.")
    else:
    raise

    # Connection lost, repeat.
    return execute(statement, params, repeat=False)


    def transaction(group='default', connection_pool=None):
    """Context manager.
    Executes block with new cursor from pooled connection in transaction. Returns cursor.
    At the end of the block, the connection is returned to pool.
    Transaction is commited "on success".
    >>> with transaction() as cursor:
    ... rows = cursor.execute(...).fetchall()
    ... process(rows)
    ... cursor.execute(...)
    Always use it instead of manual BEGIN/ROLLBACK-s.
    """
    pool = connection_pool or get_connection_pool(group)

    with pool.item() as connection:
    cursor = connection.cursor()
    cursor.execute("begin")
    try:
    yield cursor
    except Exception:
    cursor.execute("rollback")
    raise
    else:
    cursor.execute("commit")
  3. @temoto temoto revised this gist Aug 26, 2013. 1 changed file with 27 additions and 0 deletions.
    27 changes: 27 additions & 0 deletions helpers_datetime.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,27 @@
    def datetime_to_tuple(dt, precision=None):
    """datetime, precision -> tuple(year, month, day, hour, minute, second, microsecond)[:precision]
    Reverse operation is `datetime(*tuple)`.
    """
    full = (dt.year, dt.month, dt.day, dt.hour,
    dt.minute, dt.second, dt.microsecond)
    return full[:precision] if precision else full


    def datetime_to_unix(dt, _epoch_ord=datetime.date(1970, 1, 1).toordinal()):
    """UTC datetime -> UNIX timestamp
    Invariant: `datetime.utcfromtimestamp(datetime_to_unix(dt)) == dt`
    """
    days = dt.date().toordinal() - _epoch_ord
    hours = days * 24 + dt.hour
    minutes = hours * 60 + dt.minute
    seconds = minutes * 60 + dt.second
    return seconds + dt.microsecond / 1e6


    def str_to_date(s, format="%Y-%m-%d", _parse=datetime.datetime.strptime):
    """ '2012-11-13' -> date(2012, 11, 13)
    """
    dt = _parse(s, format)
    return dt.date()
  4. @temoto temoto revised this gist Aug 12, 2013. 1 changed file with 8 additions and 0 deletions.
    8 changes: 8 additions & 0 deletions helpers_encoding.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,8 @@
    def str_utf8(x):
    """
    Returns the byte string representation of obj.
    Like unicode(x).encode('utf-8') except it works for bytes.
    """
    if isinstance(x, str):
    return x
    return unicode(x).encode('utf-8')
  5. @temoto temoto revised this gist Aug 5, 2013. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions helpers_retry.py
    Original file line number Diff line number Diff line change
    @@ -32,18 +32,18 @@ def retry(tries, exceptions=(Exception,), delay=0):
    def wrapper(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
    n = tries # need to declare local variable to modify it
    n = tries # copy to local variable for modification
    while n > 0:
    n -= 1
    try:
    return func(*args, **kwargs)
    except exceptions as e:
    if n == 0:
    raise
    # logbook
    # log.error(u'retry: {f} {e}', f=repr_func(func), e=e)
    # logging
    log.error(u'retry: %s %s', repr_func(func), e)
    time.sleep(delay)
    if n == 0:
    raise
    return wrapped
    return wrapper
  6. @temoto temoto revised this gist Aug 5, 2013. 1 changed file with 10 additions and 1 deletion.
    11 changes: 10 additions & 1 deletion helpers_retry.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,13 @@
    import functools
    # import logbook
    import logging
    import time


    # log = logbook.Logger(__name__)
    log = logging.getLogger(__name__)


    def repr_func(f):
    """Attempt to get the most useful string representation of callable.
    """
    @@ -32,7 +38,10 @@ def wrapped(*args, **kwargs):
    try:
    return func(*args, **kwargs)
    except exceptions as e:
    log.error(u'retry: {f} {e}', f=repr_func(func), e=e)
    # logbook
    # log.error(u'retry: {f} {e}', f=repr_func(func), e=e)
    # logging
    log.error(u'retry: %s %s', repr_func(func), e)
    time.sleep(delay)
    if n == 0:
    raise
  7. @temoto temoto revised this gist Aug 5, 2013. 2 changed files with 45 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions helpers_gzip.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,8 @@
    from cStringIO import StringIO
    from gzip import GzipFile
    import zlib


    def gzip_string(s, level=6):
    """Compress string using gzip.
    Default compression level is 6.
    40 changes: 40 additions & 0 deletions helpers_retry.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    import functools
    import time


    def repr_func(f):
    """Attempt to get the most useful string representation of callable.
    """
    name = getattr(f, 'func_name', '<unknown>')
    func_code = getattr(f, 'func_code', None)
    if func_code is not None:
    return u'{name}() @ {fc.co_filename}:{fc.co_firstlineno}'.format(
    name=name,
    fc=func_code)

    return repr(f)


    def retry(tries, exceptions=(Exception,), delay=0):
    """
    Decorator for retrying a function if exception occurs
    tries -- num tries
    exceptions -- exceptions to catch
    delay -- wait between retries
    """
    def wrapper(func):
    @functools.wraps(func)
    def wrapped(*args, **kwargs):
    n = tries # need to declare local variable to modify it
    while n > 0:
    n -= 1
    try:
    return func(*args, **kwargs)
    except exceptions as e:
    log.error(u'retry: {f} {e}', f=repr_func(func), e=e)
    time.sleep(delay)
    if n == 0:
    raise
    return wrapped
    return wrapper
  8. @temoto temoto created this gist Aug 5, 2013.
    17 changes: 17 additions & 0 deletions helpers_gzip.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,17 @@
    def gzip_string(s, level=6):
    """Compress string using gzip.
    Default compression level is 6.
    """
    zbuf = StringIO()
    zfile = GzipFile(mode='wb', compresslevel=level, fileobj=zbuf)
    zfile.write(s)
    zfile.close()
    return zbuf.getvalue()


    def gunzip_string(s):
    """Decompress string using gzip.
    See http://stackoverflow.com/questions/2695152/in-python-how-do-i-decode-gzip-encoding/2695466#2695466
    """
    return zlib.decompress(s, 16 + zlib.MAX_WBITS)