Skip to content

Instantly share code, notes, and snippets.

@pylixm
Created August 14, 2019 07:11
Show Gist options
  • Select an option

  • Save pylixm/e6bd4f5456740c12e462eecbc66692fb to your computer and use it in GitHub Desktop.

Select an option

Save pylixm/e6bd4f5456740c12e462eecbc66692fb to your computer and use it in GitHub Desktop.
tailer.py
# -*- coding:utf-8 -*-
"""
Tailer
Build Status
Python tail is a simple implementation of GNU tail and head.
It provides 3 main functions that can be performed on any file-like object that supports seek() and tell().
tail - read lines from the end of a file
head - read lines from the top of a file
follow - read lines as a file grows
docs: https://github.com/six8/pytailer
"""
import os
import re
import sys
import time
if sys.version_info < (3,):
range = xrange
class Tailer(object):
"""
Implements tailing and heading functionality like GNU tail and head
commands.
"""
line_terminators = ('\r\n', '\n', '\r')
def __init__(self, path, read_size=1024, reopen_count=5, end=False):
self.read_size = read_size
self.path = path
self.real_path = None
self.inode = None
self.file = file
self.start_pos = self.file.tell()
self.reopen_count = reopen_count
if end:
self.seek_end()
def open(self, follow=True):
# file is ok or not
if not os.access(self.path, os.F_OK):
raise Exception("File '%s' does not exist" % self.path)
if not os.access(self.path, os.R_OK):
raise Exception("File '%s' not readable" % self.path)
if os.path.isdir(self.path):
raise Exception("File '%s' is a directory" % self.path)
try:
self.real_path = os.path.realpath(self.path)
self.inode = os.stat(self.real_path).st_ino
except OSError as error:
raise Exception(error)
try:
self.file = open(self.real_path)
except IOError as error:
raise Exception(error)
if follow:
self.seek_end()
def reopen(self, delay=1):
self.close()
reopen_count = self.reopen_count
while reopen_count >= 0:
reopen_count -= 1
try:
self.open()
return True
except Exception:
time.sleep(delay)
return False
def check(self, pos):
try:
stat = os.stat(self.real_path)
# 文件变化
if self.inode != stat.st_ino:
return True
# 文件切分
if pos > stat.st_size:
return True
except OSError:
return True
return False
def wait(self, pos, delay=1):
if self.check(pos):
if not self.reopen():
raise Exception('Unable to reopen file: %s' % self.file)
else:
self.file.seek(pos)
time.sleep(delay)
def splitlines(self, data):
return re.split('|'.join(self.line_terminators), data)
def seek_end(self):
self.seek(0, 2)
def seek(self, pos, whence=0):
"""移动文件指针到某处
:param pos: 偏移量
:param whence: 0-文件开头,1-当前位置,2-文件末尾
:return:
"""
self.file.seek(pos, whence)
def read(self, read_size=None):
if read_size:
read_str = self.file.read(read_size)
else:
read_str = self.file.read()
return len(read_str), read_str
def seek_line_forward(self):
"""
Searches forward from the current file position for a line terminator
and seeks to the charachter after it.
"""
pos = start_pos = self.file.tell()
bytes_read, read_str = self.read(self.read_size)
start = 0
if bytes_read and read_str[0] in self.line_terminators:
# The first charachter is a line terminator, don't count this one
start += 1
while bytes_read > 0:
# Scan forwards, counting the newlines in this bufferfull
i = start
while i < bytes_read:
print(read_str[i])
if read_str[i] in self.line_terminators:
self.seek(pos + i + 1)
return self.file.tell()
i += 1
pos += self.read_size
self.seek(pos)
bytes_read, read_str = self.read(self.read_size)
return None
def seek_line(self):
"""
Searches backwards from the current file position for a line terminator
and seeks to the charachter after it.
"""
pos = end_pos = self.file.tell()
read_size = self.read_size
if pos > read_size:
pos -= read_size
else:
pos = 0
read_size = end_pos
self.seek(pos)
bytes_read, read_str = self.read(read_size)
if bytes_read and read_str[-1] in self.line_terminators:
# The last charachter is a line terminator, don't count this one
bytes_read -= 1
if read_str[-2:] == '\r\n' and '\r\n' in self.line_terminators:
# found crlf
bytes_read -= 1
while bytes_read > 0:
# Scan backward, counting the newlines in this bufferfull
i = bytes_read - 1
while i >= 0:
if read_str[i] in self.line_terminators:
self.seek(pos + i + 1)
return self.file.tell()
i -= 1
if pos == 0 or pos - self.read_size < 0:
# Not enought lines in the buffer, send the whole file
self.seek(0)
return None
pos -= self.read_size
self.seek(pos)
bytes_read, read_str = self.read(self.read_size)
return None
def tail(self, lines=10):
"""
Return the last lines of the file.
"""
self.seek_end()
end_pos = self.file.tell()
for i in range(lines):
if not self.seek_line():
break
data = self.file.read(end_pos - self.file.tell() - 1)
if data:
return self.splitlines(data)
else:
return []
def head(self, lines=10):
"""
Return the top lines of the file.
"""
self.seek(0)
for i in range(lines):
if not self.seek_line_forward():
break
end_pos = self.file.tell()
self.seek(0)
data = self.file.read(end_pos - 1)
if data:
return self.splitlines(data)
else:
return []
def follow(self):
"""
Iterator generator that returns lines as data is added to the file.
Based on: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/157035
"""
while True:
pos = self.file.tell()
line = self.file.readline()
if line:
# trim line terminators
if line[-1] in self.line_terminators:
line = line[:-1]
if line[-2:] == '\r\n' and '\r\n' in self.line_terminators:
# found crlf
line = line[:-2]
yield line
else:
# self.seek(pos)
# time.sleep(delay)
self.wait(pos)
def __iter__(self):
return self.follow()
def close(self):
self.file.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment