Created
May 7, 2019 16:13
-
-
Save jesteria/079df79e0e591209aa6028053d720426 to your computer and use it in GitHub Desktop.
functional test for triage storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import random | |
| import re | |
| import string | |
| import sys | |
| from triage.component.catwalk import storage | |
| SIZE_UNITS = 'kmgtpez' | |
| HUMAN_SIZE = re.compile(rf'^(\d+) *([{SIZE_UNITS}]?i?b)$', re.I) | |
| def human_size_to_bytes(value): | |
| """Convert a given "human-readable" byte-size value to ``int`` of | |
| bytes. | |
| """ | |
| if value.isdigit(): | |
| return int(value) | |
| match = HUMAN_SIZE.search(value) | |
| if match: | |
| (number, unit) = match.groups() | |
| if len(unit) == 1: | |
| return int(number) | |
| prefix = unit[0].lower() | |
| multiplier = 1024 if unit[1].lower() == 'i' else 1000 | |
| try: | |
| scale = SIZE_UNITS.index(prefix) + 1 | |
| except ValueError: | |
| pass | |
| else: | |
| return int(number) * (multiplier ** scale) | |
| raise ValueError(f"unrecognized human-readable size: {value}") | |
| def stream_random(length, chunk_size=1000): | |
| """Generate text of ``length`` in chunks of at most ``chunk_size``.""" | |
| while length > 0: | |
| chunk_length = min(chunk_size, length) | |
| length -= chunk_length | |
| yield ''.join(random.choices(string.printable, k=chunk_length)) | |
| def progress_bar(char='=', end='\n', file=sys.stdout, width=80): | |
| # setup toolbar | |
| file.write("[{}]".format(" " * width)) | |
| file.flush() | |
| file.write("\b" * (width + 1)) # return to start of line, after '[' | |
| for loop in range(width): | |
| yield loop | |
| # update the bar | |
| file.write(char) | |
| file.flush() | |
| file.write(end) | |
| def main(path, length, method, assert_type=None, chunk_size=1000, bar_width=80): | |
| store = storage.Store.factory(path) | |
| if assert_type: | |
| assert store.__class__.__name__ == assert_type | |
| with store.open('w') as fd: | |
| if method == 'incremental': | |
| # progress bar management | |
| progress_chunk = 0 | |
| progress_size = length / bar_width | |
| bar_iterator = progress_bar(width=bar_width) | |
| next(bar_iterator) | |
| for chunk in stream_random(length, chunk_size): | |
| fd.write(chunk) | |
| # update progress bar | |
| progress_chunk += chunk_size | |
| if progress_chunk >= progress_size: | |
| progress_chunk = 0 | |
| next(bar_iterator) | |
| elif method == 'full': | |
| # Avoid random generation here as it makes it far too slow | |
| # (and isn't necessary regardless) | |
| # payload = ''.join(stream_random(length, length)) | |
| payload = 'i' * length | |
| fd.write(payload) | |
| else: | |
| raise ValueError(f'bad method: {method}') | |
| if __name__ == '__main__': | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('path', help="Store-compatbile path, (e.g.: s3://review.dssg.io/test.dat)") | |
| parser.add_argument('--assert-type', metavar='STORAGE-CLASS', | |
| help="assert that the selected Store class matches, (e.g.: S3Store)") | |
| parser.add_argument('-l', '--length', default='1MiB', | |
| help="file size to write (human-readable) (default: 1MiB)", | |
| type=human_size_to_bytes) | |
| parser.add_argument('--method', default='incremental', choices={'incremental', 'full'}) | |
| args = parser.parse_args() | |
| main(**vars(args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment