#!/usr/bin/env python import click import functools import signal import uuid from multiprocessing import Pool def run(keys, iterations, retry_failures): from redis import ( StrictRedis, WatchError, ) result = [0, 0] def increment(): """Update counters.""" with client.pipeline(transaction=True) as transaction: transaction.watch(*keys) counts = transaction.mget(keys) try: transaction.multi() updates = {} for key, count in zip(keys, counts): updates[key] = (int(count) if count is not None else 0) + 1 transaction.mset(updates) transaction.execute() except WatchError: result[1] += 1 if retry_failures: increment() else: result[0] += 1 try: client = StrictRedis() for i in xrange(iterations): increment() except KeyboardInterrupt: pass return result @click.command() @click.option('-c', '--concurrency', default=4) @click.option('-n', '--num-keys', default=2) @click.option('-i', '--iterations', default=500) @click.option('--retry-failures', default=False, is_flag=True) def __main__(concurrency, num_keys, iterations, retry_failures): namespace = uuid.uuid1().hex keys = ['{}:{}'.format(namespace, i) for i in xrange(num_keys)] pool = Pool(concurrency) results = [] for _ in xrange(concurrency): results.append(pool.apply_async(run, (keys, iterations, retry_failures))) signal.signal(signal.SIGINT, signal.SIG_IGN) successes = 0 failures = 0 for result in results: data = result.get() successes += data[0] failures += data[1] stdout = click.get_text_stream('stdout') stdout.write( '{} {} {:%}\n'.format( successes, failures, float(successes) / (successes + failures) ) ) if __name__ == '__main__': __main__()