Created
May 28, 2015 20:56
-
-
Save aristus/656c0172b9671ae53d5c to your computer and use it in GitHub Desktop.
Revisions
-
aristus created this gist
May 28, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,297 @@ #!/usr/bin/env python2.7 # Note: this script is for manual benchmarking! # ./benchmark init-spatial # time ./benchmark write-spatial # ./benchmark read-spatial from memsql.common import connection_pool import random, math, sys from time import time, sleep from multiprocessing import Pool AGGREGATORS = ['127.0.0.1'] WRITE_THREADS = 8 READ_THREADS = 16 NUM_RECORDS = int(200 * (10**6)) CHUNK_SIZE = 10000 ## when doing inserts "live" into memsql DISK_CHUNK_SIZE = 100000 ## when writing files to disk LOAD_DATA_BATCH_SIZE = 1000000 ## number of records per file on disk NUM_LOAD_DATA_FILES = NUM_RECORDS / LOAD_DATA_BATCH_SIZE NUM_LOCATIONS_READ=1 ## for non-spatial reads test POLYGON_MIN = 9000 #6500 ## for spatial reads test POLYGON_MAX = (POLYGON_MIN * 1.2) POLY_POINTS = 12 READ_ITERATIONS = 1 def cross (x,y): if len(x) == 2: assert len(y) == 2 return x[0]*y[1]-y[0]*x[1] return (x[1]*y[2]-y[1]*x[2], x[2]*y[0]-y[2]*x[0], x[0]*y[1]-y[0]*x[1]) def dot (x,y): return sum([xi * yi for xi, yi in zip(list(x),list(y))]) def norm2(x): return dot(x,x) def norm(x): return math.sqrt(norm2(x)) def normalize(x): return tuple([a / norm(x) for a in list(x)]) def random_vector(): d2 = 0 while d2 < 0.1 or d2 > 1.0: x = random.random() * 2 - 1 y = random.random() * 2 - 1 z = random.random() * 2 - 1 d2 = x*x + y*y + z*z d = math.sqrt(d2) return (x/d, y/d, z/d) def mul(v, a): (x, y, z) = v return (x*a, y*a, z*a) def lonlat(v, fmt="%.8f %.8f"): (x, y, z) = v lat = math.atan2(z, math.sqrt(x*x + y*y)) * (180.0 / math.pi) lon = math.atan2(y, x) * (180.0 / math.pi) return fmt % (lon, lat) def ortho(v): (x, y, z) = v ax = abs(x) ay = abs(y) az = abs(z) if ax < ay: if ax < az: return (0, -z, y) else: return (-y, x, 0) if ay < az: return (-z, 0, x) else: return (-y, x, 0) def random_point(delim=',', quote="'"): x = delim.join((lonlat(random_vector(), quote+"point(%.8f %.8f)"+quote), str(random.randint(-10000, 100000)), str(random.randint(1, 100000000)), str(int(time())))) return x def random_row(delim=','): x = delim.join((str(random.randint(1, 10000000)), str(random.randint(-10000, 100000)), str(random.randint(1, 100000000)), str(int(time())))) return x def combine(zero, i, j, a): v = (0, 0, 0) s = math.sin(a) c = math.cos(a) vv = [0, 0, 0] for k in xrange(3): vv[k] = zero[k] + i[k]*s + j[k]*c return normalize(tuple(vv)) def random_polygon(size): angles = [(random.random() * 2 - 1) * math.pi for i in xrange(POLY_POINTS)] angles.sort() height = 1 - 2 * (math.sin(0.5 * (size / 6378137)))**2 radius = math.sqrt(1 - height**2) center = random_vector() o = normalize(ortho(center)) i = mul(o, radius) j = mul(normalize(cross(o, center)), radius) zero = mul(center, height) points = [lonlat(combine(zero, i, j, a)) for a in angles] return "POLYGON((%s, %s))" % (', '.join(points), points[0]) def measure(db, name, query, args): queries = [query % a for a in args] a = time() rows = len(db.query(queries[0])) #assert len(db.query("show warnings")) == 0 b = time() if len(queries) > 1: rows = 0 a = time() for query in queries: rows += len(db.query(query)) #assert len(db.query("show warnings")) == 0 b = time() if rows == 0: return False if random.random() <= 0.05: print "%s: %.2f ms, rows %.0f, count %d" % (name, float((b - a) * 1000 / len(queries)), float(rows) / len(queries), len(queries)) sys.stdout.flush() return True def worker(a): sleep(random.random()) id, rows = a global AGGREGATORS agg = AGGREGATORS[id % len(AGGREGATORS)] global pool ts = start = time() db = pool.connect(agg, '3306', 'root', '', 'db_select_perf') for i in xrange(rows / CHUNK_SIZE): db.query("INSERT IGNORE INTO db_select_perf.terrain_points VALUES (%s)" % "),(".join((random_point() for q in xrange(CHUNK_SIZE)))) if (i % 100) == 0: stop = time() print (i+1) * WRITE_THREADS * CHUNK_SIZE, "total, ", int((CHUNK_SIZE * 100 * WRITE_THREADS)/(stop-ts)), "per sec" ts=time() print (i+1) * WRITE_THREADS * CHUNK_SIZE, "total, ", int(((i+1) * WRITE_THREADS * CHUNK_SIZE)/(stop-start)), "per sec" def worker_nonspatial(a): sleep(random.random()) id, rows = a global AGGREGATORS id = AGGREGATORS[id % len(AGGREGATORS)] global pool ts = time() db = pool.connect(id, '3306', 'root', '', 'db_select_perf') for i in xrange(rows / CHUNK_SIZE): db.query("INSERT INTO db_select_perf.terrain_points_int VALUES (%s)" % "),(".join((random_row() for q in xrange(CHUNK_SIZE)))) if (i % 100) == 0: stop = time() print i * WRITE_THREADS * CHUNK_SIZE, "total, ", int((CHUNK_SIZE * 100 * WRITE_THREADS)/(stop-ts)), "per sec" ts=time() def worker_print(a): thread_id, num_rows = a out = open(FILE_PATH + ('/terrain_points_int-%04d.tsv' % thread_id), 'wc') for i in xrange(num_rows / DISK_CHUNK_SIZE): print >> out, '\n'.join(random_row('\t') for q in xrange(DISK_CHUNK_SIZE)) def worker_print_spatial(a): thread_id, num_rows = a out = open(FILE_PATH + ('/terrain_points-%04d.tsv' % thread_id), 'wc') for i in xrange(num_rows / DISK_CHUNK_SIZE): print >> out, '\n'.join(random_point('\t', '') for q in xrange(DISK_CHUNK_SIZE)) def select_worker(a): sleep(random.random()) db = pool.connect(a, '3306', 'root', '', 'db_select_perf') while True: size = random.randrange(POLYGON_MIN, POLYGON_MAX) rsize = size * 1.0e-7 * math.pi / 3 * 6378137 args = [random_polygon(rsize) for i in xrange(READ_ITERATIONS)] measure(db, "", "SELECT * FROM db_select_perf.terrain_points with (index=location, resolution=6) WHERE geography_intersects(location, '%s')", args) def select_worker_approx(a): sleep(random.random()) db = pool.connect(a, '3306', 'root', '', 'db_select_perf') while True: size = random.randrange(POLYGON_MIN, POLYGON_MAX) rsize = size * 1.0e-7 * math.pi / 3 * 6378137 args = [random_polygon(rsize) for i in xrange(READ_ITERATIONS)] measure(db, "", "SELECT * FROM db_select_perf.terrain_points with (index=location, resolution=6) WHERE approx_geography_intersects(location, '%s')", args) def select_worker2(a): sleep(random.random()) db = pool.connect(a, '3306', 'root', '', 'db_select_perf') while True: args = [','.join((str(random.randint(1, 10000000)) for _ in xrange(NUM_LOCATIONS_READ))) for _ in xrange(READ_ITERATIONS)] measure(db, "", "SELECT * FROM db_select_perf.terrain_points_int WHERE location in (%s)", args) pool = None db = None def init(): global db, pool pool = connection_pool.ConnectionPool() db = pool.connect('127.0.0.1', '3306', 'root', '', '') #db = pool.connect('127.0.0.1', '3306', 'root', '', '', {'ssl': '../certs/ca-cert.pem'}) random.seed(time()) return db if __name__ == "__main__": cmd = sys.argv[1] if cmd == 'init-spatial': db = init() db.query("create database if not exists db_select_perf") db.query("use db_select_perf") db.query("flush connection pools") db.query("drop table if exists terrain_points") db.query("""CREATE TABLE terrain_points ( location geographypoint DEFAULT 'Point(0 0)', elevation int unsigned NOT NULL, ent_id int unsigned NOT NULL, time_sec int unsigned NOT NULL, shard key (location, ent_id, time_sec) );""") elif cmd == 'init': db = init() db.query("create database if not exists db_select_perf") db.query("use db_select_perf") db.query("flush connection pools") db.query("drop table if exists terrain_points_int") db.query("""CREATE TABLE terrain_points_int ( location bigint unsigned NOT NULL, elevation int unsigned NOT NULL, ent_id int unsigned NOT NULL, time_sec int unsigned NOT NULL, shard key (location, ent_id, time_sec) );""") elif cmd == 'write-spatial': db = init() Pool(processes=WRITE_THREADS).map(worker, enumerate([NUM_RECORDS / WRITE_THREADS] * WRITE_THREADS)) elif cmd == 'write': init() Pool(processes=WRITE_THREADS).map(worker_nonspatial, enumerate([NUM_RECORDS / WRITE_THREADS] * WRITE_THREADS)) ## dump data files to disk elif cmd == 'generate-files': FILE_PATH = sys.argv[2] Pool(processes=WRITE_THREADS).map(worker_print, enumerate([LOAD_DATA_BATCH_SIZE] * NUM_LOAD_DATA_FILES)) elif cmd == 'generate-files-spatial': FILE_PATH = sys.argv[2] Pool(processes=WRITE_THREADS).map(worker_print_spatial, enumerate([LOAD_DATA_BATCH_SIZE] * NUM_LOAD_DATA_FILES)) elif cmd == 'read': db = init() Pool(processes=READ_THREADS).map(select_worker2, AGGREGATORS * (READ_THREADS / len(AGGREGATORS))) elif cmd == 'read-spatial': db = init() Pool(processes=READ_THREADS).map(select_worker, AGGREGATORS * (READ_THREADS / len(AGGREGATORS))) elif cmd == 'read-spatial-approx': db = init() Pool(processes=READ_THREADS).map(select_worker_approx, AGGREGATORS * (READ_THREADS / len(AGGREGATORS))) else: print "No command given."