|
#!/usr/bin/env python |
|
# -*- coding: utf-8 -*- |
|
"""Sort incoming file using external merge sort.""" |
|
from __future__ import absolute_import |
|
|
|
import os |
|
import io |
|
import sys |
|
import argparse |
|
import shutil |
|
import struct |
|
import tempfile |
|
import multiprocessing |
|
import logging |
|
import unittest |
|
import random |
|
from heapq import merge |
|
from itertools import islice, chain |
|
from contextlib import contextmanager |
|
from collections import deque |
|
from functools import wraps, partial |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
# True if we are running on Python 3. |
|
PY3 = sys.version_info[0] == 3 |
|
|
|
if PY3: |
|
xrange = range |
|
|
|
|
|
#: Default buffer size to use. In bytes. |
|
DEFAULT_BUFFER_SIZE = 2 ** 16 |
|
|
|
|
|
_tempdir = None |
|
|
|
def gettempdir(): |
|
"""Get temporary directory for this run.""" |
|
global _tempdir |
|
if _tempdir is None: |
|
_tempdir = tempfile.mkdtemp() |
|
return _tempdir |
|
|
|
|
|
def settempdir(path): |
|
"""Set temporary directory to given path.""" |
|
global _tempdir |
|
_tempdir = path |
|
|
|
|
|
def cleanuptempdir(): |
|
"""Remove content of temporary directory and directory itself.""" |
|
global _tempdir |
|
if _tempdir is not None: |
|
try: |
|
shutil.rmtree(_tempdir) |
|
except Exception: |
|
logger.error("Can't cleanup!") |
|
_tempdir = None |
|
|
|
|
|
def isplit(n, iterable): |
|
"""Split given iterator on chunks with given length.""" |
|
it = iter(iterable) |
|
piece = islice(it, n) |
|
while True: |
|
piece = chain((next(piece),), piece) |
|
yield piece |
|
piece = islice(it, n) |
|
|
|
|
|
def log_exception(func): |
|
"""Decorator to log exception using logger.""" |
|
|
|
@wraps(func) |
|
def inner_func(*args, **kwargs): |
|
try: |
|
return func(*args, **kwargs) |
|
except Exception as exc: |
|
logger.exception(exc) |
|
raise |
|
|
|
return inner_func |
|
|
|
|
|
class Stream(object): |
|
"""Open given file and provide buffered access to it. |
|
Iterator return buffer and data size. Can act as context |
|
manager. |
|
|
|
:param name: path to file or file descriptor |
|
|
|
""" |
|
|
|
def __init__(self, name, buffer_size=DEFAULT_BUFFER_SIZE): |
|
self.name = name |
|
self.buffer_size = buffer_size |
|
# allocate buffer once, filled by null byte. |
|
self._buffer = bytearray(b'\x00' * self.buffer_size) |
|
self._handler = io.FileIO(self.name, mode='r+') |
|
# use buffered reads and writes |
|
self._buffered_handler = io.BufferedRandom( |
|
self._handler, buffer_size=self.buffer_size) |
|
self._streams = ( |
|
self._buffered_handler, |
|
self._handler, |
|
) |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, tp, value, tb): |
|
self.flush() |
|
self.close() |
|
|
|
def write(self, buf): |
|
"""Write given data into opened file.""" |
|
self._buffered_handler.write(buf) |
|
|
|
def flush(self): |
|
"""Flush data from buffers.""" |
|
for stream in self._streams: |
|
stream.flush() |
|
|
|
def close(self): |
|
"""Close opened stream.""" |
|
for stream in self._streams: |
|
stream.close() |
|
|
|
def __iter__(self): |
|
"""Iterate over file's data. Reenterable. |
|
|
|
:returns: generator that return current buffer |
|
and size of data in this buffer. You must |
|
use this data until next generator's loop |
|
or data in buffer will be overrided. |
|
""" |
|
stream = self._buffered_handler |
|
buf = self._buffer |
|
pos = stream.tell() |
|
stream.seek(0) |
|
try: |
|
while True: |
|
size = stream.readinto(buf) |
|
if not size: |
|
break |
|
yield (size, buf) |
|
finally: |
|
stream.seek(pos) |
|
|
|
@classmethod |
|
def from_temp(cls, **kwargs): |
|
"""Create stream from temporary file.""" |
|
fd, name = tempfile.mkstemp(dir=gettempdir()) |
|
return (cls(fd, **kwargs), name) |
|
|
|
|
|
class Marshaller(object): |
|
"""Marshal and unmarshal data into and from the given stream. |
|
Iterator return tuple of int to reduce iteration number. |
|
Can act as context manager. |
|
|
|
:param stream: stream from which we should get data |
|
:type stream: :py:class:`.Stream` |
|
:param byteorder: byte order of incoming data |
|
|
|
""" |
|
|
|
def __init__(self, stream, byteorder=None): |
|
byteorder = byteorder or sys.byteorder |
|
order = b'>' if byteorder == 'big' else b'<' |
|
fmt = b'I' |
|
self.stream = stream |
|
self._atom_fmt = order + fmt |
|
self._atom_size = struct.calcsize(self._atom_fmt) |
|
assert not self.stream.buffer_size % self._atom_size |
|
self._atoms_count = self.stream.buffer_size // self._atom_size |
|
self._struct = struct.Struct(order + fmt * self._atoms_count) |
|
self._queue = [] |
|
self._buffer = bytearray(b'\x00' * self._atoms_count * self._atom_size) |
|
|
|
def feed(self, chunk): |
|
"""Add given items into write queue.""" |
|
self._queue.extend(chunk) |
|
if len(self._queue) >= self._atoms_count: |
|
self.flush() |
|
|
|
def flush(self): |
|
"""Flush buffered items into underlying stream.""" |
|
queue = self._queue |
|
if not queue: |
|
return |
|
buf = None |
|
atom_fmt = self._atom_fmt |
|
atom_size = self._atom_size |
|
pack_into = self._struct.pack_into |
|
for i in xrange(0, len(queue), self._atoms_count): |
|
elements = queue[i:i+self._atoms_count] |
|
if len(elements) != self._atoms_count: |
|
buf = bytearray(b'\x00' * atom_size * len(elements)) |
|
for j, element in enumerate(elements): |
|
struct.pack_into( |
|
atom_fmt, buf, j * atom_size, element) |
|
else: |
|
pack_into(self._buffer, 0, *elements) |
|
buf = self._buffer |
|
self.stream.write(buf) |
|
self._queue = [] |
|
|
|
def close(self): |
|
"""Close underlying stream.""" |
|
self.stream.flush() |
|
self.stream.close() |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, tp, value, tb): |
|
self.flush() |
|
self.close() |
|
|
|
def __iter__(self): |
|
atom_fmt = self._atom_fmt |
|
atom_size = self._atom_size |
|
buffer_size = self.stream.buffer_size |
|
unpack_from = self._struct.unpack_from |
|
for size, buf in self.stream: |
|
if size != buffer_size: |
|
# handle unaligned chunk |
|
if size % atom_size: |
|
raise ValueError("Unaligned atom!") |
|
yield tuple( |
|
struct.unpack( |
|
atom_fmt, buf[offset:offset+atom_size])[0] |
|
for offset in xrange(0, size, atom_size)) |
|
else: |
|
# handle aligned chunk |
|
yield unpack_from(buf) |
|
|
|
@classmethod |
|
def from_name(cls, name, **kwargs): |
|
"""Create marshaller that can access given file. |
|
|
|
:param name: path to file |
|
:type name: string |
|
|
|
""" |
|
stream = Stream(name) |
|
return cls(stream, **kwargs) |
|
|
|
@classmethod |
|
def from_temp(cls, **kwargs): |
|
"""Create marshaller from temporary file.""" |
|
stream, name = Stream.from_temp() |
|
return (cls(stream, **kwargs), name) |
|
|
|
|
|
@log_exception |
|
def create_bucket(chunk): |
|
"""Create sorted bucket from unsorted chunk. |
|
|
|
:param chunk: list of ints |
|
:returns: path to sorted bucket |
|
|
|
""" |
|
stream, name = Marshaller.from_temp() |
|
with stream: |
|
stream.feed(sorted(chunk)) |
|
return name |
|
|
|
|
|
class Splitter(object): |
|
"""Split given objects into buckets. |
|
Simply write every chunk into separte file. |
|
|
|
:param pool: pool into which we should run tasks |
|
:type pool: :py:class:`multiprocessing.Pool` |
|
:param marshaller: iterator from which we |
|
should get data to process |
|
:type marshaller: :py:class:`.Marshaller` |
|
|
|
""" |
|
|
|
def __init__(self, pool, marshaller): |
|
self.pool = pool |
|
self.marshaller = marshaller |
|
|
|
def __iter__(self): |
|
"""Split iterable into bucket. |
|
|
|
:returns: generator of path to each created bucket |
|
|
|
""" |
|
with self.marshaller as iterable: |
|
for name in self.pool.imap(create_bucket, iterable): |
|
yield name |
|
|
|
|
|
@log_exception |
|
def merge_buckets(names, chunk_size=1024): |
|
"""Merge bucket together. |
|
|
|
:param names: list of paths to files we should merge |
|
:returns: path to merged file |
|
|
|
""" |
|
sources = [Marshaller.from_name(name) for name in names] |
|
try: |
|
dest, dest_name = Marshaller.from_temp() |
|
with dest: |
|
for chunk in isplit(chunk_size, |
|
merge(*[ |
|
chain.from_iterable(stream) for stream in sources |
|
])): |
|
chunk = list(chunk) |
|
dest.feed(chunk) |
|
finally: |
|
for stream in sources: |
|
stream.close() |
|
for name in names: |
|
os.unlink(name) |
|
return dest_name |
|
|
|
|
|
class Merger(object): |
|
"""Multiway merger. Merge multiple buckets into one. |
|
|
|
:param pool: pool into which we should run tasks |
|
:type pool: :py:class:`multiprocessing.Pool` |
|
:param splitter: iterator returning paths to each buckets |
|
:type splitter: :py:class:`.Splitter` |
|
:param ways: how many buckets we can merge at once |
|
|
|
""" |
|
|
|
def __init__(self, pool, splitter, ways=None): |
|
self.pool = pool |
|
self.splitter = splitter |
|
self.ways = ways or 16 |
|
|
|
def merge(self): |
|
ways = self.ways |
|
stack = deque((self.splitter, )) |
|
while stack: |
|
chunks = [tuple(names) for names in |
|
isplit(ways, stack.popleft())] |
|
names = list(self.pool.imap(merge_buckets, chunks)) |
|
if len(names) == 1: |
|
return names[0] |
|
elif names: |
|
stack.append(names) |
|
else: |
|
return None |
|
|
|
|
|
@contextmanager |
|
def pool_as_context(processes=None): |
|
"""Create and properly destroy pool.""" |
|
processes = processes or multiprocessing.cpu_count() |
|
# properly setup temporary directory in childs |
|
pool = multiprocessing.Pool( |
|
processes, settempdir, (gettempdir(), )) |
|
try: |
|
yield pool |
|
finally: |
|
pool.close() |
|
pool.join() |
|
|
|
|
|
def external_sort(source, dest, processes=None, byteorder=None, ways=None): |
|
"""Sort given `source` using external merge sort into `dest`. |
|
|
|
:param source: path to file which should be sorted |
|
:param dest: path to resulting file |
|
:param processes: how many processes to use |
|
:param byteorder: big or little endian |
|
:param ways: n-ways in merge |
|
|
|
""" |
|
with pool_as_context(processes=processes) as pool: |
|
marshaller = Marshaller.from_name( |
|
source, byteorder=byteorder) |
|
splitter = Splitter(pool, marshaller) |
|
merger = Merger(pool, splitter, ways=ways) |
|
path = merger.merge() |
|
if path is None: |
|
shutil.copy(source, dest) |
|
else: |
|
shutil.copymode(source, path) |
|
shutil.move(path, dest) |
|
|
|
|
|
def existed_file(path): |
|
"""Check that given path is existed readable file.""" |
|
if not os.access(path, os.R_OK): |
|
raise argparse.ArgumentTypeError( |
|
"{0!r} is not readable".format(path)) |
|
if not os.path.isfile(path): |
|
raise argparse.ArgumentTypeError( |
|
"{0!r} not a file".format(path)) |
|
if os.path.getsize(path) % 4: |
|
raise argparse.ArgumentTypeError( |
|
"size of {0!r} size is not a multiple of 4".format(path)) |
|
return path |
|
|
|
|
|
def unexisted_file(path): |
|
"""Check that given path is not exists.""" |
|
if path and os.path.exists(path): |
|
raise argparse.ArgumentTypeError( |
|
"{0!r} exists".format(path)) |
|
base = os.path.dirname(path) |
|
if base and not os.path.exists(base): |
|
raise argparse.ArgumentTypeError( |
|
"{0!r} not exists".format(base)) |
|
return path |
|
|
|
|
|
def is_sorted(path, byteorder=None): |
|
"""Check that given file is sorted.""" |
|
with Marshaller.from_name(path, |
|
byteorder=byteorder) as marshaller: |
|
prev = 0 |
|
for i in chain.from_iterable(marshaller): |
|
if prev > i: |
|
return False |
|
prev = i |
|
return True |
|
|
|
|
|
def dispatch(parser, args): |
|
"""Execute proper action.""" |
|
if args.validate: |
|
# check is given file sorted or not |
|
if not is_sorted(args.input, |
|
byteorder=args.byteorder): |
|
return 1 |
|
return 0 |
|
if not args.output: |
|
parser.error("ouput not specified!") |
|
external_sort(args.input, args.output, |
|
byteorder=args.byteorder) |
|
return 0 |
|
|
|
|
|
def main(): |
|
"""Script entrypoint.""" |
|
parser = argparse.ArgumentParser( |
|
description=""" |
|
Sort given file using n-way merge sort. |
|
File should consists of 32-bit unsinged integers. |
|
""".strip()) |
|
parser.add_argument("-v", "--verbose", |
|
help="increase output verbosity", |
|
action="store_true") |
|
parser.add_argument("input", |
|
help="file to work with", |
|
type=existed_file) |
|
parser.add_argument("output", |
|
help="path to result", |
|
type=unexisted_file, |
|
nargs='?') |
|
parser.add_argument("--validate", |
|
help="check if given file is sorted", |
|
action="store_true") |
|
parser.add_argument("-b", "--byteorder", |
|
help="treat integers stored as little-endian or big-endian," |
|
" for output will be used system-wide byte order", |
|
choices=("big", "little"), |
|
default=None) |
|
args = parser.parse_args() |
|
logging.basicConfig( |
|
level=logging.DEBUG if args.verbose else logging.INFO, |
|
format="%(asctime)s %(message)s") |
|
try: |
|
sys.exit(dispatch(parser, args)) |
|
except KeyboardInterrupt: |
|
if args.verbose: |
|
raise |
|
sys.exit(1) |
|
except Exception as exc: |
|
if args.verbose: |
|
raise |
|
logger.exception(exc) |
|
parser.error("Unhandled exception caught!") |
|
finally: |
|
cleanuptempdir() |
|
|
|
|
|
class BaseTest(unittest.TestCase): |
|
|
|
def setUp(self): |
|
self.addCleanup(cleanuptempdir) |
|
|
|
def create_file(self, content): |
|
stream, path = Stream.from_temp() |
|
with stream: |
|
stream.write(content) |
|
return path |
|
|
|
|
|
class UtilityTest(BaseTest): |
|
|
|
def test_isplit(self): |
|
self.assertEqual( |
|
[[0, 1, 2, 3], |
|
[4, 5, 6, 7], |
|
[8, 9]], |
|
[list(chunk) for chunk in |
|
isplit(4, [i for i in xrange(10)])] |
|
) |
|
self.assertEqual([], list(isplit(4, []))) |
|
self.assertEqual( |
|
[[0]], |
|
[list(chunk) for chunk in |
|
isplit(2, [0])] |
|
) |
|
|
|
def test_create_bucket(self): |
|
source = [random.randint(0, 100) for i in xrange(100)] |
|
path = create_bucket(source) |
|
with Marshaller.from_name(path) as marshaller: |
|
result = list(chain.from_iterable(marshaller)) |
|
source.sort() |
|
self.assertEqual(source, result) |
|
|
|
def test_merge_buckets(self): |
|
sources = [sorted([random.randint(0, 100) |
|
for i in xrange(10)]) |
|
for i in xrange(6)] |
|
path = merge_buckets([ |
|
create_bucket(source) for source in sources]) |
|
with Marshaller.from_name(path) as marshaller: |
|
self.assertEqual( |
|
list(merge(*sources)), |
|
list(chain.from_iterable(marshaller)) |
|
) |
|
|
|
|
|
class StreamTest(BaseTest): |
|
|
|
def test_iteration(self): |
|
content = b'foobar' |
|
path = self.create_file(content) |
|
with Stream(path) as stream: |
|
chunks = list(stream) |
|
self.assertEqual(1, len(chunks)) |
|
size, buf = chunks[0] |
|
self.assertEqual(content, buf[0:size]) |
|
|
|
def test_empty_file(self): |
|
path = self.create_file(b'') |
|
with Stream(path) as stream: |
|
self.assertEqual(0, len(list(stream))) |
|
|
|
|
|
class MarshallerTest(BaseTest): |
|
|
|
def test_iteration(self): |
|
values = [100, 200, 300] |
|
content = struct.pack('>III', *values) |
|
path = self.create_file(content) |
|
with Marshaller.from_name(path, byteorder='big') as marshaller: |
|
result = list(chain.from_iterable(marshaller)) |
|
self.assertEqual(values, result) |
|
|
|
def test_unaligned_file(self): |
|
value = 100 |
|
content = struct.pack('>I', value) |
|
content = content[:len(content)-1] |
|
path = self.create_file(content) |
|
with self.assertRaises(ValueError): |
|
with Marshaller.from_name(path, byteorder='big') as marshaller: |
|
next(iter(marshaller)) |
|
|
|
def test_feed(self): |
|
values = [100, 200, 300] |
|
content = struct.pack('>III', *values) |
|
marshaller, path = Marshaller.from_temp(byteorder='big') |
|
with marshaller: |
|
marshaller.feed(values) |
|
with open(path, 'rb') as stream: |
|
self.assertEqual(content, stream.read()) |
|
|
|
|
|
if __name__ == "__main__": |
|
multiprocessing.freeze_support() |
|
main() |
|
|