#!/usr/bin/env python3 # SPDX-License-Identifier: 0BSD or CC0-1.0 or MIT-0 or Unlicense # Copyright (c) 2023, Ryan Castellucci, No Rights Reserved import io, sys import datetime import argparse import requests import operator import struct from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA from fnmatch import fnmatch structFileHeader = "<4s2B4HL2L2H" sizeFileHeader = struct.calcsize(structFileHeader) _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 BLOCK_SIZE = 1<<18 __debug = False def debug(*args, **kwarg): if __debug: if 'file' not in kwarg: kwarg['file'] = sys.stderr print(*args, **kwarg) class HttpError(IOError): def __init__(self, response, message=None): self.response = response if message is None: self.message = f'http status {response.status_code}' else: self.message = message super().__init__(self.message) # not a full implementation, just enough to use with BufferedReader class _HttpIO(io.RawIOBase): def __init__(self, url, *, session=None, parent=None): if not session: session = parent._session if parent else requests.Session() self._session = session self._parent = parent self._url, self._off = url, 0 self._pos, self._total = 0, 0 self._gets, self._heads = 0, 0 self._response, self._iter, self._next = None, None, None if not parent: # We assume the zip file doesn't change while this tool is in use r = self._head() if 'bytes' not in r.headers.get('Accept-Ranges'): raise HttpError(r, 'byte ranges not supported by server') self._len = int(r.headers.get('Content-Length')) def _head(self): self._heads += 1 if self._parent: self._parent._heads += 1 r = self._session.head(self._url, allow_redirects=True) if r.status_code != 200: raise HttpError(r) # Update URL if there was a redirect if r.url != self._url: self._url = r.url return r def _get(self, headers=None, *, stream=False): self._gets += 1 if self._parent: self._parent._gets += 1 r = self._session.get(self._url, headers=headers, stream=stream) if r.status_code not in (200, 206): raise HttpError(r) return r def _advance(self, n): self._pos += n self._total += n if self._parent: self._parent._total += n def slice(self, offset, length=None): hio = self.__class__(self._url, session=self._session, parent=self) hio._off = offset hio._len = length if length is not None else self._len - self._off hio.seek(0) return hio def _stream_close(self): debug('_stream_close') if self._response: self._response.close() self._response, self._iter, self._next = None, None, None def _stream_read(self, size=-1): debug(f'_stream_read avail={len(self._next) if self._next else 0}, requested={size}') assert self._iter is not None if self._next is None: # this implies that the response iterator was exausted on its first call, # which should not happen, but better safe than sorry self._stream_close() size = BLOCK_SIZE if size < 0 else min(size, BLOCK_SIZE) return self.read(size) elif size >= 0 and size < len(self._next): # reading less than the available amount of data isn't fully handled chunk = self._next[:size] self._stream_close() else: chunk = self._next # the response iterator only gets marked as exausted upon when it is asked # to generate a chunk but has no data available - read ahead one chunk # since the reader may stop after the last one it expects self._next = next(self._iter, None) if self._next is None: self._stream_close() self._advance(len(chunk)) return chunk def read(self, size=-1): return self._read(size) def _read(self, size=-1, *, stream=False, chunk_size=BLOCK_SIZE): if self._iter: return self._stream_read(size) elif size == 0: # any empty byte string will do return b'' elif self._off == 0 and self._pos == 0 and (size < 0 or size >= self._len): # entire file from the begining, no range header needed end = None elif size < 0: # rest of the file end = self._off + self._len else: # requested range or rest of the file, whichever is less end = min(self._off + self._len, self._pos + size) - 1 debug('read', size, self._pos - self._off, end - self._off, self._len) headers = {} if end is not None: headers['Range'] = f'bytes={self._pos}-{end}' # reading multiple sequential chunks without streaming would require a request # to the server for each chunk, involving potentially slow network round trips actually_stream = bool(stream and (size < 0 or size >= chunk_size)) r = self._get(headers, stream=actually_stream) if actually_stream: # set up response streaming debug(f'_stream_init chunk_size={chunk_size}') self._response = r self._iter = r.iter_content(chunk_size) self._next = next(self._iter, None) # return the first chunk return self._stream_read(size) n = int(r.headers.get('Content-Length')) self._advance(n) return r.content def readall(self): return self.read(-1) def readinto(self, b): b = (memoryview(b) if not isinstance(b, memoryview) else b).cast('B') n = len(b) data = self.read(n) n = len(data) b[:n] = data return n def seek(self, pos, whence=0): if whence != 0 or pos != self._pos - self._off: debug('seek', pos, whence, self._pos - self._off) if whence == 0: newpos = self._off + pos elif whence == 1: newpos = self._pos + pos elif whence == 2: newpos = self._off + self._len + pos else: raise ValueError('invalid whence') if self._pos != newpos and self._iter: self._stream_close() self._pos = newpos return newpos def __len__(self): return self._len def tell(self): return self._pos - self._off def writeable(self): return False def seekable(self): return True def readable(self): return True def __getattr__(self, name): if name in ('truncate', 'fileno', 'write'): raise OSError(f'{name} not supported') return None closed = property(lambda self: False) url = property(operator.attrgetter('_url')) # helper class that automatically bypasses the buffer for bulk reads, and tries to # cache the end of central directory record from the end of a zip file class _BufferedHttpIO(io.BufferedReader): def __init__(self, httpio, buffer_size=1024, block_size=BLOCK_SIZE): assert isinstance(httpio, _HttpIO) super().__init__(httpio, buffer_size) self._buffer_size = buffer_size self._block_size = block_size self._unbuffered = False self._tail = None def read(self, size=-1): if self.raw._len >= self._buffer_size: pos = self.tell() from_end = self.raw._len - pos if from_end <= self._buffer_size: # ZipFile does several small reads and seeks near the end of the file, # so it's useful to cache the last buffer worth of data, since seek() # resets the read buffer if not self._tail: debug('caching tail') self.seek(-self._buffer_size, 2) self._tail = super().read(self._buffer_size) n = len(self._tail) self.raw._advance(n) else: n = len(self._tail) start = n - from_end end = n if size < 0 else min(n, start+size) chunk = self._tail[start:end] debug('cached tail read', len(chunk)) self.seek(pos + len(chunk)) return chunk # for larger reads, io.BufferedReader just gets in the way, so bypass it if size < 0 or size > self._buffer_size: if self._unbuffered: return self.raw._read(size, stream=True, chunk_size=self._block_size) #return self.raw.read(min(size, self._block_size)) else: self._unbuffered = True debug('enter unbuffered mode') if size < 0: size = self._buffer_size # return the buffer contents return self.read1(size) elif self._unbuffered: debug('exit unbuffered mode') # syncronize the file position self.seek(self.raw.tell()) self._unbuffered = False # normal buffered read return super().read(size) class ZipHttp(ZipFile): def __init__(self, httpio, *args, **kwargs): assert isinstance(httpio, _HttpIO) self._args, self._kwargs = args, kwargs bkw = {} if 'buffer_size' in kwargs: bkw['buffer_size'] = kwargs.pop('buffer_size') if 'block_size' in kwargs: bkw['block_size'] = kwargs.pop('block_size') reader = _BufferedHttpIO(httpio, **bkw) super().__init__(reader, *args, **kwargs) self._httpio, self._reader = httpio, reader def sub(self, zi): assert zi.compress_type == ZIP_STORED saved_position = self._reader.tell() # get the zip member header self._reader.seek(zi.header_offset) fheader = self._reader.read(sizeFileHeader) fheader = struct.unpack(structFileHeader, fheader) self._reader.seek(saved_position) # find the start of the actual data skip = fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] data_offset = zi.header_offset + sizeFileHeader + skip httpio = self.httpio.slice(data_offset, zi.compress_size) return self.__class__(httpio, *self._args, **self._kwargs) return self.httpio.slice(data_offset, zi.compress_size) def infolist_nested(self, max_depth=1, parents=None, *, pattern=None): parents = parents or () for zi in self.infolist(): yield self, zi, parents # recursion limit if max_depth < 0: continue # only try to recurse STORED .zip files if zi.compress_type != ZIP_STORED: continue if zi.filename[-4:].lower() != '.zip': continue # if a pattern was provided, don't recurse unless it matches if pattern is not None and not fnmatch(zi.filename, pattern): continue with self.sub(zi) as z2: yield from z2.infolist_nested(max_depth - 1, parents + (zi.filename,)) def open_nested(self, name): # try to glob the filename for c in '[]?*': if c not in name: continue # if there's no colon in the name, pat1 and sep will be empty strings pat1, sep, pat2 = name.rpartition(':') for z2, zi, parents in self.infolist_nested(pattern=pat1): if not parents: if sep or not fnmatch(zi.filename, pat2): continue else: if not pat1 or not fnmatch(':'.join(parents), pat1): continue if not fnmatch(zi.filename, pat2): continue return z2.open(zi.filename) try: zi = self.getinfo(name) return self.open(name) except KeyError as e: oname, sep, name = name.rpartition(':') if sep and oname[-4:].lower() != '.zip': raise e try: zi = self.getinfo(oname) except: raise e if zi.compress_type != ZIP_STORED: raise ValueError(f'Nested zip file uses a method other than STORE!') z2 = self.sub(zi) return z2.open(name) httpio = property(operator.attrgetter('_httpio')) def list_zipinfo(zi, parents=None): parents = parents or () ts = datetime.datetime(*zi.date_time).strftime('%Y-%m-%d %H:%M:%S') name = ':'.join(parents + (zi.filename,)) method_id = zi.compress_type if method_id == ZIP_STORED: method = 'S' elif method_id == ZIP_DEFLATED: method = 'D' elif method_id == ZIP_BZIP2: method = 'B' elif method_id == ZIP_LZMA: method = 'L' else: method = '?' print(f'{ts} {zi.file_size:13d} {zi.compress_size:13d} {method} {name}') parser = argparse.ArgumentParser(description='Operate on zip files over HTTP.') parser.add_argument('url', metavar='URL', type=str, help='URL of a zip file') parser.add_argument('filename', metavar='FILENAME', type=str, nargs='?', help='filename within the zip file') args = parser.parse_args() z = ZipHttp(_HttpIO(args.url)) if args.filename: f = z.open_nested(args.filename) while True: chunk = f.read1(-1) if len(chunk) == 0: break sys.stdout.buffer.write(chunk) sys.stdout.buffer.flush() else: # if no filename given, list the contents for _, zi, parents in z.infolist_nested(): list_zipinfo(zi, parents) print(f'heads={z.httpio._heads} gets={z.httpio._gets} bytes_read={z.httpio._total}', file=sys.stderr)