#!/usr/bin/env python # zmqc: a small but powerful command-line interface to ZMQ. ## Usage: # zmqc [-0] (-r | -w) (-b | -c) SOCK_TYPE [-o SOCK_OPT=VALUE...] address [address ...] ## Examples: # zmqc -rc SUB 'tcp://127.0.0.1:5000' # # Subscribe to 'tcp://127.0.0.1:5000', reading messages from it and printing # them to the console. This will subscribe to all messages by default. # # ls | zmqc -wb PUSH 'tcp://*:4000' # # Send the name of every file in the current directory as a message from a # PUSH socket bound to port 4000 on all interfaces. Don't forget to quote the # address to avoid glob expansion. # # zmqc -rc PULL 'tcp://127.0.0.1:5202' | tee $TTY | zmqc -wc PUSH 'tcp://127.0.0.1:5404' # # Read messages coming from a PUSH socket bound to port 5202 (note that we're # connecting with a PULL socket), echo them to the active console, and # forward them to a PULL socket bound to port 5404 (so we're connecting with # a PUSH). # # zmqc -n 10 -0rb PULL 'tcp://*:4123' | xargs -0 grep 'pattern' # # Bind to a PULL socket on port 4123, receive 10 messages from the socket # (with each message representing a filename), and grep the files for # `'pattern'`. The `-0` option means messages will be NULL-delimited rather # than separated by newlines, so that filenames with spaces in them are not # considered two separate arguments by xargs. ## License: # This is free and unencumbered software released into the public domain. # # Anyone is free to copy, modify, publish, use, compile, sell, or # distribute this software, either in source code form or as a compiled # binary, for any purpose, commercial or non-commercial, and by any # means. # # In jurisdictions that recognize copyright laws, the author or authors # of this software dedicate any and all copyright interest in the # software to the public domain. We make this dedication for the benefit # of the public at large and to the detriment of our heirs and # successors. We intend this dedication to be an overt act of # relinquishment in perpetuity of all present and future rights to this # software under copyright law. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR # OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, # ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. # For more information, please refer to import argparse import array import errno import itertools import re import sys import zmq __version__ = '0.0.1' class ParserError(Exception): """An exception which occurred when parsing command-line arguments.""" pass parser = argparse.ArgumentParser( prog='zmqc', version=__version__, usage= "%(prog)s [-h] [-v] [-0] (-r | -w) (-b | -c)\n " "SOCK_TYPE [-o SOCK_OPT=VALUE...]\n " "address [address ...]", description="zmqc is a small but powerful command-line interface to ZMQ. " "It allows you to create a socket of a given type, bind or connect it to " "multiple addresses, set options on it, and receive or send messages over " "it using standard I/O, in the shell or in scripts.", epilog="This is free and unencumbered software released into the public " "domain. For more information, please refer to .", ) parser.add_argument('-0', dest='delimiter', action='store_const', const='\x00', default='\n', help="Separate messages on input/output should be " "delimited by NULL characters (instead of newlines). Use " "this if your messages may contain newlines, and you want " "to avoid ambiguous message borders.") parser.add_argument('-n', metavar='NUM', dest='number', type=int, default=None, help="Receive/send only NUM messages. By default, zmqc " "lives forever in 'write' mode, or until the end of input " "in 'read' mode.") mode_group = parser.add_argument_group(title='Mode') mode = mode_group.add_mutually_exclusive_group(required=True) mode.add_argument('-r', '--read', dest='mode', action='store_const', const='r', help="Read messages from the socket onto stdout.") mode.add_argument('-w', '--write', dest='mode', action='store_const', const='w', help="Write messages from stdin to the socket.") behavior_group = parser.add_argument_group(title='Behavior') behavior = behavior_group.add_mutually_exclusive_group(required=True) behavior.add_argument('-b', '--bind', dest='behavior', action='store_const', const='bind', help="Bind to the specified address(es).") behavior.add_argument('-c', '--connect', dest='behavior', action='store_const', const='connect', help="Connect to the specified address(es).") sock_params = parser.add_argument_group(title='Socket parameters') sock_type = sock_params.add_argument('sock_type', metavar='SOCK_TYPE', choices=('PUSH', 'PULL', 'PUB', 'SUB', 'PAIR'), type=str.upper, help="Which type of socket to create. Must be one of 'PUSH', 'PULL', " "'PUB', 'SUB' or 'PAIR'. See `man zmq_socket` for an explanation of the " "different types. 'REQ', 'REP', 'DEALER' and 'ROUTER' sockets are " "currently unsupported. --read mode is unsupported for PUB sockets, and " "--write mode is unsupported for SUB sockets.") sock_opts = sock_params.add_argument('-o', '--option', metavar='SOCK_OPT=VALUE', dest='sock_opts', action='append', default=[], help="Socket option names and values to set on the created socket. " "Consult `man zmq_setsockopt` for a comprehensive list of options. Note " "that you can safely omit the 'ZMQ_' prefix from the option name. If the " "created socket is of type 'SUB', and no 'SUBSCRIBE' options are given, " "the socket will automatically be subscribed to everything.") addresses = sock_params.add_argument('addresses', nargs='+', metavar='address', help="One or more addresses to bind/connect to. Must be in full ZMQ " "format (e.g. 'tcp://:')") def read_until_delimiter(stream, delimiter): """ Read from a stream until a given delimiter or EOF, or raise EOFError. >>> io = StringIO("abcXdefgXfoo") >>> read_until_delimiter(io, "X") "abc" >>> read_until_delimiter(io, "X") "defg" >>> read_until_delimiter(io, "X") "foo" >>> read_until_delimiter(io, "X") Traceback (most recent call last): ... EOFError """ output = array.array('c') c = stream.read(1) while c and c != delimiter: output.append(c) c = stream.read(1) if not (c or output): raise EOFError return output.tostring() def get_sockopts(sock_opts): """ Turn a list of 'OPT=VALUE' into a list of (opt_code, value). Work on byte string options: >>> get_sockopts(['SUBSCRIBE=', 'SUBSCRIBE=abc']) [(6, ''), (6, 'abc')] Automatically convert integer options to integers: >>> zmqc.get_sockopts(['LINGER=0', 'LINGER=-1', 'LINGER=50']) [(17, 0), (17, -1), (17, 50)] Spew on invalid input: >>> zmqc.get_sockopts(['LINGER=foo']) Traceback (most recent call last): ... zmqc.ParserError: Invalid value for option LINGER: 'foo' >>> zmqc.get_sockopts(['NONEXISTENTOPTION=blah']) Traceback (most recent call last): ... zmqc.ParserError: Unrecognised socket option: 'NONEXISTENTOPTION' """ option_coerce = { int: set(zmq.core.constants.int_sockopts).union( zmq.core.constants.int64_sockopts), str: set(zmq.core.constants.bytes_sockopts) } options = [] for option in sock_opts: match = re.match(r'^([A-Z_]+)\=(.*)$', option) if not match: raise ParserError("Invalid option spec: %r" % match) opt_name = match.group(1) if opt_name.startswith('ZMQ_'): opt_name = opt_name[4:] try: opt_code = getattr(zmq.core.constants, opt_name.upper()) except AttributeError: raise ParserError("Unrecognised socket option: %r" % ( match.group(1),)) opt_value = match.group(2) for converter, opt_codes in option_coerce.iteritems(): if opt_code in opt_codes: try: opt_value = converter(opt_value) except (TypeError, ValueError): raise ParserError("Invalid value for option %s: %r" % ( opt_name, opt_value)) break options.append((opt_code, opt_value)) return options def main(): args = parser.parse_args() context = zmq.Context.instance() sock = context.socket(getattr(zmq, args.sock_type)) # Bind or connect to the provided addresses. for address in args.addresses: getattr(sock, args.behavior)(address) # Set any specified socket options. try: sock_opts = get_sockopts(args.sock_opts) except ParserError, exc: parser.error(str(exc)) else: for opt_code, opt_value in sock_opts: sock.setsockopt(opt_code, opt_value) # If we have a 'SUB' socket that's not explicitly subscribed to # anything, subscribe it to everything. if (sock.socket_type == zmq.SUB and not any(opt_code == zmq.SUBSCRIBE for (opt_code, _) in sock_opts)): sock.setsockopt(zmq.SUBSCRIBE, '') # Live forever if no `-n` argument was given, otherwise die after a fixed # number of messages. if args.number is None: iterator = itertools.repeat(None) else: iterator = itertools.repeat(None, args.number) try: if args.mode == 'r': read_loop(iterator, sock, args.delimiter, sys.stdout) elif args.mode == 'w': write_loop(iterator, sock, args.delimiter, sys.stdin) finally: sock.close() def read_loop(iterator, sock, delimiter, output): """Continously get messages from the socket and print them on output.""" for _ in iterator: try: message = sock.recv() output.write(message + delimiter) output.flush() except KeyboardInterrupt: return except IOError, exc: if exc.errno == errno.EPIPE: return raise def write_loop(iterator, sock, delimiter, input): """Continously get messages from input and send them through the socket.""" for _ in iterator: try: message = read_until_delimiter(input, delimiter) sock.send(message) except (KeyboardInterrupt, EOFError): return if __name__ == '__main__': main()