Skip to content

Instantly share code, notes, and snippets.

@blackwithwhite666
Last active January 4, 2016 15:09
Show Gist options
  • Select an option

  • Save blackwithwhite666/8639323 to your computer and use it in GitHub Desktop.

Select an option

Save blackwithwhite666/8639323 to your computer and use it in GitHub Desktop.
external and parallel merge sort
.DS_Store
*.pyc
*$py.class
*~
.*.sw[po]
dist/
*.egg-info
*.egg
*.egg/
doc/__build/*
build/
.build/
pip-log.txt
.directory
erl_crash.dump
*.db
Documentation/
.tox/
.ropeproject/
.project
.pydevproject
.idea/
*.bin
.coverage

sort.py

Sort given file using n-way external merge sort.

Sorting

dd if=/dev/urandom of=unsorted.bin bs=128M count=1
./sort.py unsorted.bin sorted.bin

return code should be 0 if no errors happened

Checking result

./sort.py --validate sorted.bin
echo $?

return code should be 0 if file is correctly sorted in ascending order

Run tests

Install dependencies first:

pip install nose coverage

Run tests after it:

nosetests -v --with-coverage --cover-package=sort sort.py
# encoding: utf-8
"""Speed-up merge using network sort.
Currently NOT used.
"""
cimport cython
cdef extern from "Python.h":
ctypedef struct PyObject
void Py_INCREF(PyObject* o)
void Py_DECREF(PyObject* o)
cdef struct pair_s:
unsigned int value
PyObject* iterator
ctypedef pair_s pair
@cython.profile(False)
cdef inline void sort2(pair *h, int l=0, int r=1):
cdef unsigned int i = (&h[l]).value
cdef unsigned int j = (&h[r]).value
if j < i:
h[l], h[r] = h[r], h[l]
@cython.profile(False)
cdef inline void sort3(pair *h, int l=0):
sort2(h, l, l + 1)
sort2(h, l + 1, l + 2)
sort2(h, l, l + 1)
@cython.profile(False)
cdef inline void sort4(pair *h, int l=0):
sort2(h, l, l + 1)
sort2(h, l + 2, l + 3)
sort2(h, l, l + 2)
sort2(h, l + 1, l + 3)
sort2(h, l + 1, l + 2)
@cython.profile(False)
cdef inline void sort5(pair *h):
sort2(h, 0, 1)
sort2(h, 2, 3)
sort2(h, 1, 3)
sort2(h, 2, 4)
sort2(h, 0, 2)
sort2(h, 1, 4)
sort2(h, 1, 2)
sort2(h, 3, 4)
sort2(h, 2, 3)
@cython.profile(False)
cdef inline void sort6(pair *h):
sort3(h)
sort3(h, 3)
sort2(h, 0, 3)
sort2(h, 2, 5)
sort4(h, 1)
@cython.profile(False)
cdef void sortn(pair *h, int l):
if l == 6:
sort6(h)
elif l == 5:
sort5(h)
elif l == 4:
sort4(h)
elif l == 3:
sort3(h)
elif l == 2:
sort2(h)
def merge(iterables):
"""Merge multiple sorted inputs into a single sorted output."""
cdef int i
cdef unsigned int v
cdef pair *p
cdef pair[6] h
cdef object iterator
cdef int size = len(iterables)
assert size <= 6
# initialize dynamic array
for i in range(size):
p = &h[i]
iterator = iter(iterables[i])
p.value = next(iterator)
p.iterator = <PyObject *>iterator
Py_INCREF(p.iterator)
# using sorting network
while 2 <= size <= 6:
sortn(h, size)
try:
while True:
p = &h[0]
yield p.value
iterator = <object>p.iterator
p.value = next(iterator)
sortn(h, size)
except StopIteration:
size -= 1
Py_DECREF(h[0].iterator)
for i in range(size):
h[i] = h[i + 1]
if size > 0:
# fast case when only a single iterator remains
p = &h[0]
yield p.value
iterator = <object>p.iterator
try:
while True:
yield next(iterator)
except StopIteration:
pass
Py_DECREF(p.iterator)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Sort incoming file using external merge sort."""
from __future__ import absolute_import
import os
import io
import sys
import argparse
import shutil
import struct
import tempfile
import multiprocessing
import logging
import unittest
import random
from heapq import merge
from itertools import islice, chain
from contextlib import contextmanager
from collections import deque
from functools import wraps, partial
logger = logging.getLogger(__name__)
# True if we are running on Python 3.
PY3 = sys.version_info[0] == 3
if PY3:
xrange = range
#: Default buffer size to use. In bytes.
DEFAULT_BUFFER_SIZE = 2 ** 16
_tempdir = None
def gettempdir():
"""Get temporary directory for this run."""
global _tempdir
if _tempdir is None:
_tempdir = tempfile.mkdtemp()
return _tempdir
def settempdir(path):
"""Set temporary directory to given path."""
global _tempdir
_tempdir = path
def cleanuptempdir():
"""Remove content of temporary directory and directory itself."""
global _tempdir
if _tempdir is not None:
try:
shutil.rmtree(_tempdir)
except Exception:
logger.error("Can't cleanup!")
_tempdir = None
def isplit(n, iterable):
"""Split given iterator on chunks with given length."""
it = iter(iterable)
piece = islice(it, n)
while True:
piece = chain((next(piece),), piece)
yield piece
piece = islice(it, n)
def log_exception(func):
"""Decorator to log exception using logger."""
@wraps(func)
def inner_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
logger.exception(exc)
raise
return inner_func
class Stream(object):
"""Open given file and provide buffered access to it.
Iterator return buffer and data size. Can act as context
manager.
:param name: path to file or file descriptor
"""
def __init__(self, name, buffer_size=DEFAULT_BUFFER_SIZE):
self.name = name
self.buffer_size = buffer_size
# allocate buffer once, filled by null byte.
self._buffer = bytearray(b'\x00' * self.buffer_size)
self._handler = io.FileIO(self.name, mode='r+')
# use buffered reads and writes
self._buffered_handler = io.BufferedRandom(
self._handler, buffer_size=self.buffer_size)
self._streams = (
self._buffered_handler,
self._handler,
)
def __enter__(self):
return self
def __exit__(self, tp, value, tb):
self.flush()
self.close()
def write(self, buf):
"""Write given data into opened file."""
self._buffered_handler.write(buf)
def flush(self):
"""Flush data from buffers."""
for stream in self._streams:
stream.flush()
def close(self):
"""Close opened stream."""
for stream in self._streams:
stream.close()
def __iter__(self):
"""Iterate over file's data. Reenterable.
:returns: generator that return current buffer
and size of data in this buffer. You must
use this data until next generator's loop
or data in buffer will be overrided.
"""
stream = self._buffered_handler
buf = self._buffer
pos = stream.tell()
stream.seek(0)
try:
while True:
size = stream.readinto(buf)
if not size:
break
yield (size, buf)
finally:
stream.seek(pos)
@classmethod
def from_temp(cls, **kwargs):
"""Create stream from temporary file."""
fd, name = tempfile.mkstemp(dir=gettempdir())
return (cls(fd, **kwargs), name)
class Marshaller(object):
"""Marshal and unmarshal data into and from the given stream.
Iterator return tuple of int to reduce iteration number.
Can act as context manager.
:param stream: stream from which we should get data
:type stream: :py:class:`.Stream`
:param byteorder: byte order of incoming data
"""
def __init__(self, stream, byteorder=None):
byteorder = byteorder or sys.byteorder
order = b'>' if byteorder == 'big' else b'<'
fmt = b'I'
self.stream = stream
self._atom_fmt = order + fmt
self._atom_size = struct.calcsize(self._atom_fmt)
assert not self.stream.buffer_size % self._atom_size
self._atoms_count = self.stream.buffer_size // self._atom_size
self._struct = struct.Struct(order + fmt * self._atoms_count)
self._queue = []
self._buffer = bytearray(b'\x00' * self._atoms_count * self._atom_size)
def feed(self, chunk):
"""Add given items into write queue."""
self._queue.extend(chunk)
if len(self._queue) >= self._atoms_count:
self.flush()
def flush(self):
"""Flush buffered items into underlying stream."""
queue = self._queue
if not queue:
return
buf = None
atom_fmt = self._atom_fmt
atom_size = self._atom_size
pack_into = self._struct.pack_into
for i in xrange(0, len(queue), self._atoms_count):
elements = queue[i:i+self._atoms_count]
if len(elements) != self._atoms_count:
buf = bytearray(b'\x00' * atom_size * len(elements))
for j, element in enumerate(elements):
struct.pack_into(
atom_fmt, buf, j * atom_size, element)
else:
pack_into(self._buffer, 0, *elements)
buf = self._buffer
self.stream.write(buf)
self._queue = []
def close(self):
"""Close underlying stream."""
self.stream.flush()
self.stream.close()
def __enter__(self):
return self
def __exit__(self, tp, value, tb):
self.flush()
self.close()
def __iter__(self):
atom_fmt = self._atom_fmt
atom_size = self._atom_size
buffer_size = self.stream.buffer_size
unpack_from = self._struct.unpack_from
for size, buf in self.stream:
if size != buffer_size:
# handle unaligned chunk
if size % atom_size:
raise ValueError("Unaligned atom!")
yield tuple(
struct.unpack(
atom_fmt, buf[offset:offset+atom_size])[0]
for offset in xrange(0, size, atom_size))
else:
# handle aligned chunk
yield unpack_from(buf)
@classmethod
def from_name(cls, name, **kwargs):
"""Create marshaller that can access given file.
:param name: path to file
:type name: string
"""
stream = Stream(name)
return cls(stream, **kwargs)
@classmethod
def from_temp(cls, **kwargs):
"""Create marshaller from temporary file."""
stream, name = Stream.from_temp()
return (cls(stream, **kwargs), name)
@log_exception
def create_bucket(chunk):
"""Create sorted bucket from unsorted chunk.
:param chunk: list of ints
:returns: path to sorted bucket
"""
stream, name = Marshaller.from_temp()
with stream:
stream.feed(sorted(chunk))
return name
class Splitter(object):
"""Split given objects into buckets.
Simply write every chunk into separte file.
:param pool: pool into which we should run tasks
:type pool: :py:class:`multiprocessing.Pool`
:param marshaller: iterator from which we
should get data to process
:type marshaller: :py:class:`.Marshaller`
"""
def __init__(self, pool, marshaller):
self.pool = pool
self.marshaller = marshaller
def __iter__(self):
"""Split iterable into bucket.
:returns: generator of path to each created bucket
"""
with self.marshaller as iterable:
for name in self.pool.imap(create_bucket, iterable):
yield name
@log_exception
def merge_buckets(names, chunk_size=1024):
"""Merge bucket together.
:param names: list of paths to files we should merge
:returns: path to merged file
"""
sources = [Marshaller.from_name(name) for name in names]
try:
dest, dest_name = Marshaller.from_temp()
with dest:
for chunk in isplit(chunk_size,
merge(*[
chain.from_iterable(stream) for stream in sources
])):
chunk = list(chunk)
dest.feed(chunk)
finally:
for stream in sources:
stream.close()
for name in names:
os.unlink(name)
return dest_name
class Merger(object):
"""Multiway merger. Merge multiple buckets into one.
:param pool: pool into which we should run tasks
:type pool: :py:class:`multiprocessing.Pool`
:param splitter: iterator returning paths to each buckets
:type splitter: :py:class:`.Splitter`
:param ways: how many buckets we can merge at once
"""
def __init__(self, pool, splitter, ways=None):
self.pool = pool
self.splitter = splitter
self.ways = ways or 16
def merge(self):
ways = self.ways
stack = deque((self.splitter, ))
while stack:
chunks = [tuple(names) for names in
isplit(ways, stack.popleft())]
names = list(self.pool.imap(merge_buckets, chunks))
if len(names) == 1:
return names[0]
elif names:
stack.append(names)
else:
return None
@contextmanager
def pool_as_context(processes=None):
"""Create and properly destroy pool."""
processes = processes or multiprocessing.cpu_count()
# properly setup temporary directory in childs
pool = multiprocessing.Pool(
processes, settempdir, (gettempdir(), ))
try:
yield pool
finally:
pool.close()
pool.join()
def external_sort(source, dest, processes=None, byteorder=None, ways=None):
"""Sort given `source` using external merge sort into `dest`.
:param source: path to file which should be sorted
:param dest: path to resulting file
:param processes: how many processes to use
:param byteorder: big or little endian
:param ways: n-ways in merge
"""
with pool_as_context(processes=processes) as pool:
marshaller = Marshaller.from_name(
source, byteorder=byteorder)
splitter = Splitter(pool, marshaller)
merger = Merger(pool, splitter, ways=ways)
path = merger.merge()
if path is None:
shutil.copy(source, dest)
else:
shutil.copymode(source, path)
shutil.move(path, dest)
def existed_file(path):
"""Check that given path is existed readable file."""
if not os.access(path, os.R_OK):
raise argparse.ArgumentTypeError(
"{0!r} is not readable".format(path))
if not os.path.isfile(path):
raise argparse.ArgumentTypeError(
"{0!r} not a file".format(path))
if os.path.getsize(path) % 4:
raise argparse.ArgumentTypeError(
"size of {0!r} size is not a multiple of 4".format(path))
return path
def unexisted_file(path):
"""Check that given path is not exists."""
if path and os.path.exists(path):
raise argparse.ArgumentTypeError(
"{0!r} exists".format(path))
base = os.path.dirname(path)
if base and not os.path.exists(base):
raise argparse.ArgumentTypeError(
"{0!r} not exists".format(base))
return path
def is_sorted(path, byteorder=None):
"""Check that given file is sorted."""
with Marshaller.from_name(path,
byteorder=byteorder) as marshaller:
prev = 0
for i in chain.from_iterable(marshaller):
if prev > i:
return False
prev = i
return True
def dispatch(parser, args):
"""Execute proper action."""
if args.validate:
# check is given file sorted or not
if not is_sorted(args.input,
byteorder=args.byteorder):
return 1
return 0
if not args.output:
parser.error("ouput not specified!")
external_sort(args.input, args.output,
byteorder=args.byteorder)
return 0
def main():
"""Script entrypoint."""
parser = argparse.ArgumentParser(
description="""
Sort given file using n-way merge sort.
File should consists of 32-bit unsinged integers.
""".strip())
parser.add_argument("-v", "--verbose",
help="increase output verbosity",
action="store_true")
parser.add_argument("input",
help="file to work with",
type=existed_file)
parser.add_argument("output",
help="path to result",
type=unexisted_file,
nargs='?')
parser.add_argument("--validate",
help="check if given file is sorted",
action="store_true")
parser.add_argument("-b", "--byteorder",
help="treat integers stored as little-endian or big-endian,"
" for output will be used system-wide byte order",
choices=("big", "little"),
default=None)
args = parser.parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(message)s")
try:
sys.exit(dispatch(parser, args))
except KeyboardInterrupt:
if args.verbose:
raise
sys.exit(1)
except Exception as exc:
if args.verbose:
raise
logger.exception(exc)
parser.error("Unhandled exception caught!")
finally:
cleanuptempdir()
class BaseTest(unittest.TestCase):
def setUp(self):
self.addCleanup(cleanuptempdir)
def create_file(self, content):
stream, path = Stream.from_temp()
with stream:
stream.write(content)
return path
class UtilityTest(BaseTest):
def test_isplit(self):
self.assertEqual(
[[0, 1, 2, 3],
[4, 5, 6, 7],
[8, 9]],
[list(chunk) for chunk in
isplit(4, [i for i in xrange(10)])]
)
self.assertEqual([], list(isplit(4, [])))
self.assertEqual(
[[0]],
[list(chunk) for chunk in
isplit(2, [0])]
)
def test_create_bucket(self):
source = [random.randint(0, 100) for i in xrange(100)]
path = create_bucket(source)
with Marshaller.from_name(path) as marshaller:
result = list(chain.from_iterable(marshaller))
source.sort()
self.assertEqual(source, result)
def test_merge_buckets(self):
sources = [sorted([random.randint(0, 100)
for i in xrange(10)])
for i in xrange(6)]
path = merge_buckets([
create_bucket(source) for source in sources])
with Marshaller.from_name(path) as marshaller:
self.assertEqual(
list(merge(*sources)),
list(chain.from_iterable(marshaller))
)
class StreamTest(BaseTest):
def test_iteration(self):
content = b'foobar'
path = self.create_file(content)
with Stream(path) as stream:
chunks = list(stream)
self.assertEqual(1, len(chunks))
size, buf = chunks[0]
self.assertEqual(content, buf[0:size])
def test_empty_file(self):
path = self.create_file(b'')
with Stream(path) as stream:
self.assertEqual(0, len(list(stream)))
class MarshallerTest(BaseTest):
def test_iteration(self):
values = [100, 200, 300]
content = struct.pack('>III', *values)
path = self.create_file(content)
with Marshaller.from_name(path, byteorder='big') as marshaller:
result = list(chain.from_iterable(marshaller))
self.assertEqual(values, result)
def test_unaligned_file(self):
value = 100
content = struct.pack('>I', value)
content = content[:len(content)-1]
path = self.create_file(content)
with self.assertRaises(ValueError):
with Marshaller.from_name(path, byteorder='big') as marshaller:
next(iter(marshaller))
def test_feed(self):
values = [100, 200, 300]
content = struct.pack('>III', *values)
marshaller, path = Marshaller.from_temp(byteorder='big')
with marshaller:
marshaller.feed(values)
with open(path, 'rb') as stream:
self.assertEqual(content, stream.read())
if __name__ == "__main__":
multiprocessing.freeze_support()
main()
#!/bin/bash
SOURCE="unsorted.bin"
DEST="sorted.bin"
cleanup() {
rm -rf $SOURCE $DEST
}
try_sort() {
cleanup
echo -n "Sorting file with size $1:"
if [ "$1" == "0" ]
then
touch $SOURCE
else
dd if=/dev/urandom of=$SOURCE bs=$1 count=1 2> /dev/null
fi
python sort.py $SOURCE $DEST 2> /dev/null
RETVAL=$?
[ $RETVAL -ne 0 ] && echo " ERROR"
if [ -f $DEST ]
then
source_size=$(stat -c%s $SOURCE)
dest_size=$(stat -c%s $DEST)
python sort.py --validate $DEST
RETVAL=$?
[[ $RETVAL -eq 0 && $source_size -eq $dest_size ]] && echo " OK" || echo " FAILED"
fi
}
echo "* Valid files"
try_sort 0
try_sort 4
try_sort 8
try_sort 16
try_sort 40
try_sort 1K
try_sort 1M
try_sort 16M
echo "* Invalid files"
try_sort 1
try_sort 3
try_sort 9
try_sort 101
try_sort 1025
try_sort 1506
echo "Cleanup..."
cleanup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment