Skip to content

Instantly share code, notes, and snippets.

Created March 8, 2016 10:11
Show Gist options
  • Select an option

  • Save anonymous/bc82a0bea35a5cb3ce01 to your computer and use it in GitHub Desktop.

Select an option

Save anonymous/bc82a0bea35a5cb3ce01 to your computer and use it in GitHub Desktop.

Revisions

  1. @invalid-email-address Anonymous created this gist Mar 8, 2016.
    29 changes: 29 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    from functools import update_wrapper
    import zmq

    class Socket(zmq.Socket):

    def __init__(self, ctx, type, default_timeout=None):
    zmq.Socket.__init__(self, ctx, type)
    self.default_timeout = default_timeout

    def on_timeout(self):
    return None

    def _timeout_wrapper(f):
    def wrapper(self, *args, **kwargs):
    timeout = kwargs.pop('timeout', self.default_timeout)
    if timeout is not None:
    timeout = int(timeout * 1000)
    poller = zmq.Poller()
    poller.register(self)
    if not poller.poll(timeout):
    return self.on_timeout()
    return f(self, *args, **kwargs)
    return update_wrapper(wrapper, f, ('__name__', '__doc__'))

    for _meth in dir(zmq.Socket):
    if _meth.startswith(('send', 'recv')):
    locals()[_meth] = _timeout_wrapper(getattr(zmq.Socket, _meth))

    del _meth, _timeout_wrapper