Skip to content

Instantly share code, notes, and snippets.

@blackwithwhite666
Last active December 30, 2015 18:29
Show Gist options
  • Select an option

  • Save blackwithwhite666/7867877 to your computer and use it in GitHub Desktop.

Select an option

Save blackwithwhite666/7867877 to your computer and use it in GitHub Desktop.
import unittest
from tempfile import TemporaryFile
from util import StringBuffer
class TestStringBuffer(unittest.TestCase):
def test_read(self):
with TemporaryFile() as fh:
fh.write(b'test' * 8)
fh.flush()
fh.seek(0)
def iterator():
while True:
s = fh.read(6)
if not s:
return
yield s
buf = StringBuffer(iterator())
self.assertEqual(b'test', buf.read(4))
self.assertEqual(b'test' * 6, buf.read(4 * 6))
self.assertEqual('te', buf.read(2))
self.assertEqual('st', buf.read(2))
self.assertEqual('', buf.read(4))
if __name__ == '__main__':
unittest.main()
from collections import deque
class StringBuffer(object):
def __init__(self, iterator, max_size=100*1024*1024):
self.iterator = iterator
self._max_size = max_size
self._buf = deque()
self._size = 0
self._exhausted = False
def read(self, nbytes):
if self._size >= nbytes:
return self._consume(nbytes)
while self._size < nbytes and self._exhaust():
pass
return self._consume(nbytes)
def clear(self):
self._buf.clear()
self._size = 0
# internal
def _feed(self, chunk):
self._buf.append(chunk)
self._size += len(chunk)
if self._size >= self._max_size:
raise IOError('Maximum buffer size reached')
def _exhaust(self):
if not self._exhausted:
try:
chunk = next(self.iterator)
except StopIteration:
self._exhausted = True
else:
self._feed(chunk)
return not self._exhausted
def _consume(self, loc):
loc = loc if loc <= self._size else self._size
if loc == 0:
return b''
self._merge_prefix(loc)
self._size -= loc
return self._buf.popleft()
def _merge_prefix(self, size):
"""Replace the first entries in a deque of strings with a single
string of up to size bytes.
>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
>>> _merge_prefix(d, 5); print(d)
deque(['abcde', 'fghi', 'j'])
Strings will be split as necessary to reach the desired size.
>>> _merge_prefix(d, 7); print(d)
deque(['abcdefg', 'hi', 'j'])
>>> _merge_prefix(d, 3); print(d)
deque(['abc', 'defg', 'hi', 'j'])
>>> _merge_prefix(d, 100); print(d)
deque(['abcdefghij'])
"""
if len(self._buf) == 1 and len(self._buf[0]) <= size:
return
prefix = []
remaining = size
while self._buf and remaining > 0:
chunk = self._buf.popleft()
if len(chunk) > remaining:
self._buf.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
if prefix:
self._buf.appendleft(b''.join(prefix))
if not self._buf:
self._buf.appendleft(b'')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment