Skip to content

Instantly share code, notes, and snippets.

@jesteria
Created May 7, 2019 16:13
Show Gist options
  • Select an option

  • Save jesteria/079df79e0e591209aa6028053d720426 to your computer and use it in GitHub Desktop.

Select an option

Save jesteria/079df79e0e591209aa6028053d720426 to your computer and use it in GitHub Desktop.
functional test for triage storage
import argparse
import random
import re
import string
import sys
from triage.component.catwalk import storage
SIZE_UNITS = 'kmgtpez'
HUMAN_SIZE = re.compile(rf'^(\d+) *([{SIZE_UNITS}]?i?b)$', re.I)
def human_size_to_bytes(value):
"""Convert a given "human-readable" byte-size value to ``int`` of
bytes.
"""
if value.isdigit():
return int(value)
match = HUMAN_SIZE.search(value)
if match:
(number, unit) = match.groups()
if len(unit) == 1:
return int(number)
prefix = unit[0].lower()
multiplier = 1024 if unit[1].lower() == 'i' else 1000
try:
scale = SIZE_UNITS.index(prefix) + 1
except ValueError:
pass
else:
return int(number) * (multiplier ** scale)
raise ValueError(f"unrecognized human-readable size: {value}")
def stream_random(length, chunk_size=1000):
"""Generate text of ``length`` in chunks of at most ``chunk_size``."""
while length > 0:
chunk_length = min(chunk_size, length)
length -= chunk_length
yield ''.join(random.choices(string.printable, k=chunk_length))
def progress_bar(char='=', end='\n', file=sys.stdout, width=80):
# setup toolbar
file.write("[{}]".format(" " * width))
file.flush()
file.write("\b" * (width + 1)) # return to start of line, after '['
for loop in range(width):
yield loop
# update the bar
file.write(char)
file.flush()
file.write(end)
def main(path, length, method, assert_type=None, chunk_size=1000, bar_width=80):
store = storage.Store.factory(path)
if assert_type:
assert store.__class__.__name__ == assert_type
with store.open('w') as fd:
if method == 'incremental':
# progress bar management
progress_chunk = 0
progress_size = length / bar_width
bar_iterator = progress_bar(width=bar_width)
next(bar_iterator)
for chunk in stream_random(length, chunk_size):
fd.write(chunk)
# update progress bar
progress_chunk += chunk_size
if progress_chunk >= progress_size:
progress_chunk = 0
next(bar_iterator)
elif method == 'full':
# Avoid random generation here as it makes it far too slow
# (and isn't necessary regardless)
# payload = ''.join(stream_random(length, length))
payload = 'i' * length
fd.write(payload)
else:
raise ValueError(f'bad method: {method}')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('path', help="Store-compatbile path, (e.g.: s3://review.dssg.io/test.dat)")
parser.add_argument('--assert-type', metavar='STORAGE-CLASS',
help="assert that the selected Store class matches, (e.g.: S3Store)")
parser.add_argument('-l', '--length', default='1MiB',
help="file size to write (human-readable) (default: 1MiB)",
type=human_size_to_bytes)
parser.add_argument('--method', default='incremental', choices={'incremental', 'full'})
args = parser.parse_args()
main(**vars(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment