Last active
December 30, 2015 18:29
-
-
Save blackwithwhite666/7867877 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.pyc |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import unittest | |
| from tempfile import TemporaryFile | |
| from util import StringBuffer | |
| class TestStringBuffer(unittest.TestCase): | |
| def test_read(self): | |
| with TemporaryFile() as fh: | |
| fh.write(b'test' * 8) | |
| fh.flush() | |
| fh.seek(0) | |
| def iterator(): | |
| while True: | |
| s = fh.read(6) | |
| if not s: | |
| return | |
| yield s | |
| buf = StringBuffer(iterator()) | |
| self.assertEqual(b'test', buf.read(4)) | |
| self.assertEqual(b'test' * 6, buf.read(4 * 6)) | |
| self.assertEqual('te', buf.read(2)) | |
| self.assertEqual('st', buf.read(2)) | |
| self.assertEqual('', buf.read(4)) | |
| if __name__ == '__main__': | |
| unittest.main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import deque | |
| class StringBuffer(object): | |
| def __init__(self, iterator, max_size=100*1024*1024): | |
| self.iterator = iterator | |
| self._max_size = max_size | |
| self._buf = deque() | |
| self._size = 0 | |
| self._exhausted = False | |
| def read(self, nbytes): | |
| if self._size >= nbytes: | |
| return self._consume(nbytes) | |
| while self._size < nbytes and self._exhaust(): | |
| pass | |
| return self._consume(nbytes) | |
| def clear(self): | |
| self._buf.clear() | |
| self._size = 0 | |
| # internal | |
| def _feed(self, chunk): | |
| self._buf.append(chunk) | |
| self._size += len(chunk) | |
| if self._size >= self._max_size: | |
| raise IOError('Maximum buffer size reached') | |
| def _exhaust(self): | |
| if not self._exhausted: | |
| try: | |
| chunk = next(self.iterator) | |
| except StopIteration: | |
| self._exhausted = True | |
| else: | |
| self._feed(chunk) | |
| return not self._exhausted | |
| def _consume(self, loc): | |
| loc = loc if loc <= self._size else self._size | |
| if loc == 0: | |
| return b'' | |
| self._merge_prefix(loc) | |
| self._size -= loc | |
| return self._buf.popleft() | |
| def _merge_prefix(self, size): | |
| """Replace the first entries in a deque of strings with a single | |
| string of up to size bytes. | |
| >>> d = collections.deque(['abc', 'de', 'fghi', 'j']) | |
| >>> _merge_prefix(d, 5); print(d) | |
| deque(['abcde', 'fghi', 'j']) | |
| Strings will be split as necessary to reach the desired size. | |
| >>> _merge_prefix(d, 7); print(d) | |
| deque(['abcdefg', 'hi', 'j']) | |
| >>> _merge_prefix(d, 3); print(d) | |
| deque(['abc', 'defg', 'hi', 'j']) | |
| >>> _merge_prefix(d, 100); print(d) | |
| deque(['abcdefghij']) | |
| """ | |
| if len(self._buf) == 1 and len(self._buf[0]) <= size: | |
| return | |
| prefix = [] | |
| remaining = size | |
| while self._buf and remaining > 0: | |
| chunk = self._buf.popleft() | |
| if len(chunk) > remaining: | |
| self._buf.appendleft(chunk[remaining:]) | |
| chunk = chunk[:remaining] | |
| prefix.append(chunk) | |
| remaining -= len(chunk) | |
| if prefix: | |
| self._buf.appendleft(b''.join(prefix)) | |
| if not self._buf: | |
| self._buf.appendleft(b'') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment