Created
August 14, 2019 07:11
-
-
Save pylixm/e6bd4f5456740c12e462eecbc66692fb to your computer and use it in GitHub Desktop.
tailer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding:utf-8 -*- | |
| """ | |
| Tailer | |
| Build Status | |
| Python tail is a simple implementation of GNU tail and head. | |
| It provides 3 main functions that can be performed on any file-like object that supports seek() and tell(). | |
| tail - read lines from the end of a file | |
| head - read lines from the top of a file | |
| follow - read lines as a file grows | |
| docs: https://github.com/six8/pytailer | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import time | |
| if sys.version_info < (3,): | |
| range = xrange | |
| class Tailer(object): | |
| """ | |
| Implements tailing and heading functionality like GNU tail and head | |
| commands. | |
| """ | |
| line_terminators = ('\r\n', '\n', '\r') | |
| def __init__(self, path, read_size=1024, reopen_count=5, end=False): | |
| self.read_size = read_size | |
| self.path = path | |
| self.real_path = None | |
| self.inode = None | |
| self.file = file | |
| self.start_pos = self.file.tell() | |
| self.reopen_count = reopen_count | |
| if end: | |
| self.seek_end() | |
| def open(self, follow=True): | |
| # file is ok or not | |
| if not os.access(self.path, os.F_OK): | |
| raise Exception("File '%s' does not exist" % self.path) | |
| if not os.access(self.path, os.R_OK): | |
| raise Exception("File '%s' not readable" % self.path) | |
| if os.path.isdir(self.path): | |
| raise Exception("File '%s' is a directory" % self.path) | |
| try: | |
| self.real_path = os.path.realpath(self.path) | |
| self.inode = os.stat(self.real_path).st_ino | |
| except OSError as error: | |
| raise Exception(error) | |
| try: | |
| self.file = open(self.real_path) | |
| except IOError as error: | |
| raise Exception(error) | |
| if follow: | |
| self.seek_end() | |
| def reopen(self, delay=1): | |
| self.close() | |
| reopen_count = self.reopen_count | |
| while reopen_count >= 0: | |
| reopen_count -= 1 | |
| try: | |
| self.open() | |
| return True | |
| except Exception: | |
| time.sleep(delay) | |
| return False | |
| def check(self, pos): | |
| try: | |
| stat = os.stat(self.real_path) | |
| # 文件变化 | |
| if self.inode != stat.st_ino: | |
| return True | |
| # 文件切分 | |
| if pos > stat.st_size: | |
| return True | |
| except OSError: | |
| return True | |
| return False | |
| def wait(self, pos, delay=1): | |
| if self.check(pos): | |
| if not self.reopen(): | |
| raise Exception('Unable to reopen file: %s' % self.file) | |
| else: | |
| self.file.seek(pos) | |
| time.sleep(delay) | |
| def splitlines(self, data): | |
| return re.split('|'.join(self.line_terminators), data) | |
| def seek_end(self): | |
| self.seek(0, 2) | |
| def seek(self, pos, whence=0): | |
| """移动文件指针到某处 | |
| :param pos: 偏移量 | |
| :param whence: 0-文件开头,1-当前位置,2-文件末尾 | |
| :return: | |
| """ | |
| self.file.seek(pos, whence) | |
| def read(self, read_size=None): | |
| if read_size: | |
| read_str = self.file.read(read_size) | |
| else: | |
| read_str = self.file.read() | |
| return len(read_str), read_str | |
| def seek_line_forward(self): | |
| """ | |
| Searches forward from the current file position for a line terminator | |
| and seeks to the charachter after it. | |
| """ | |
| pos = start_pos = self.file.tell() | |
| bytes_read, read_str = self.read(self.read_size) | |
| start = 0 | |
| if bytes_read and read_str[0] in self.line_terminators: | |
| # The first charachter is a line terminator, don't count this one | |
| start += 1 | |
| while bytes_read > 0: | |
| # Scan forwards, counting the newlines in this bufferfull | |
| i = start | |
| while i < bytes_read: | |
| print(read_str[i]) | |
| if read_str[i] in self.line_terminators: | |
| self.seek(pos + i + 1) | |
| return self.file.tell() | |
| i += 1 | |
| pos += self.read_size | |
| self.seek(pos) | |
| bytes_read, read_str = self.read(self.read_size) | |
| return None | |
| def seek_line(self): | |
| """ | |
| Searches backwards from the current file position for a line terminator | |
| and seeks to the charachter after it. | |
| """ | |
| pos = end_pos = self.file.tell() | |
| read_size = self.read_size | |
| if pos > read_size: | |
| pos -= read_size | |
| else: | |
| pos = 0 | |
| read_size = end_pos | |
| self.seek(pos) | |
| bytes_read, read_str = self.read(read_size) | |
| if bytes_read and read_str[-1] in self.line_terminators: | |
| # The last charachter is a line terminator, don't count this one | |
| bytes_read -= 1 | |
| if read_str[-2:] == '\r\n' and '\r\n' in self.line_terminators: | |
| # found crlf | |
| bytes_read -= 1 | |
| while bytes_read > 0: | |
| # Scan backward, counting the newlines in this bufferfull | |
| i = bytes_read - 1 | |
| while i >= 0: | |
| if read_str[i] in self.line_terminators: | |
| self.seek(pos + i + 1) | |
| return self.file.tell() | |
| i -= 1 | |
| if pos == 0 or pos - self.read_size < 0: | |
| # Not enought lines in the buffer, send the whole file | |
| self.seek(0) | |
| return None | |
| pos -= self.read_size | |
| self.seek(pos) | |
| bytes_read, read_str = self.read(self.read_size) | |
| return None | |
| def tail(self, lines=10): | |
| """ | |
| Return the last lines of the file. | |
| """ | |
| self.seek_end() | |
| end_pos = self.file.tell() | |
| for i in range(lines): | |
| if not self.seek_line(): | |
| break | |
| data = self.file.read(end_pos - self.file.tell() - 1) | |
| if data: | |
| return self.splitlines(data) | |
| else: | |
| return [] | |
| def head(self, lines=10): | |
| """ | |
| Return the top lines of the file. | |
| """ | |
| self.seek(0) | |
| for i in range(lines): | |
| if not self.seek_line_forward(): | |
| break | |
| end_pos = self.file.tell() | |
| self.seek(0) | |
| data = self.file.read(end_pos - 1) | |
| if data: | |
| return self.splitlines(data) | |
| else: | |
| return [] | |
| def follow(self): | |
| """ | |
| Iterator generator that returns lines as data is added to the file. | |
| Based on: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/157035 | |
| """ | |
| while True: | |
| pos = self.file.tell() | |
| line = self.file.readline() | |
| if line: | |
| # trim line terminators | |
| if line[-1] in self.line_terminators: | |
| line = line[:-1] | |
| if line[-2:] == '\r\n' and '\r\n' in self.line_terminators: | |
| # found crlf | |
| line = line[:-2] | |
| yield line | |
| else: | |
| # self.seek(pos) | |
| # time.sleep(delay) | |
| self.wait(pos) | |
| def __iter__(self): | |
| return self.follow() | |
| def close(self): | |
| self.file.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment