Last active
May 27, 2024 17:03
-
-
Save NicolasT/4519146 to your computer and use it in GitHub Desktop.
Revisions
-
NicolasT revised this gist
Jan 13, 2013 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1 +0,0 @@ -
NicolasT revised this gist
Jan 13, 2013 . 2 changed files with 140 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,120 @@ ''' Demonstration of using `splice` with non-blocking IO Lots of code is similar to 'splice.py', take a look at that module for more documentation. ''' import os import os.path import errno import fcntl import socket import select import subprocess import splice def set_nonblock(fd): #pylint: disable-msg=C0103 '''Set a file descriptor in non-blocking mode''' flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) flags |= os.O_NONBLOCK fcntl.fcntl(fd, fcntl.F_SETFL, flags) def main(host, port, path): #pylint: disable-msg=R0914 '''Server implementation''' # Set up server socket # ==================== sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen(1) # Wait for client # =============== conn, addr = sock.accept() print 'Connection from:', addr # Launch subprocess argv = ['python', 'slowcat.py', path] proc = subprocess.Popen(argv, close_fds=True, stdout=subprocess.PIPE) # Set up source and sink FDs # ========================== pipe_fd = proc.stdout.fileno() set_nonblock(pipe_fd) conn_fd = conn.fileno() #pylint: disable-msg=E1101 set_nonblock(conn_fd) print 'Will splice data from FD', pipe_fd, 'to', conn_fd # Blah blah # ========= transferred = 0 chunksize = 32 * 1024 * 1024 flags = \ splice.SPLICE_F_MOVE | splice.SPLICE_F_MORE | splice.SPLICE_F_NONBLOCK # Run transfer # ============ # The whole read/write-set and select code below is extremely bare-bone, # this is not how you should implement a 'serious' event-loop. # You shouldn't implement your own event-loop anyway most likely, there are # tons of good ones (using different approaches) out there. read_set = [pipe_fd] write_set = [conn_fd] while True: # Wait until (most likely) the subprocess pipe is readable, and the # output socket is writable. readable_set, writable_set, _ = select.select(read_set, write_set, []) # This is terrible. Don't do this. Seriously. if pipe_fd in readable_set: read_set = [] if conn_fd in writable_set: write_set = [] if read_set or write_set: # At least one of the FDs we need isn't ready continue # Jay, both file descriptors might be usable! # Reset for the next iteration... read_set = [pipe_fd] write_set = [conn_fd] try: # Splice! done = splice.splice(pipe_fd, None, conn_fd, None, chunksize, flags) except IOError, exc: if exc.errno in [errno.EAGAIN, errno.EWOULDBLOCK]: # Oops, looks like one of the FDs blocks again. Retry! continue raise if done == 0: break transferred += done print 'Bytes transferred:', transferred # Clean up # ======== conn.close() sock.close() proc.wait() if __name__ == '__main__': main('', 9009, os.path.abspath(__file__)) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,20 @@ ''' A lazy cat which sleeps in-between lines ''' import sys import time SLEEP_TIME = 0.05 def main(fd): #pylint: disable-msg=W0621,C0103 '''Main, what else''' for line in fd: print line, sys.stdout.flush() time.sleep(SLEEP_TIME) if __name__ == '__main__': with open(sys.argv[1], 'r') as fd: main(fd) -
NicolasT revised this gist
Jan 13, 2013 . 1 changed file with 3 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -74,7 +74,8 @@ def splice(fd_in, off_in, fd_out, off_out, len_, flags): `off_in` and `off_out` can be `None`, which is equivalent to `NULL`. If the call to `splice` fails (i.e. returns -1), an `OSError` is raised with the appropriate `errno`, unless the error is `EINTR`, which results in the call to be retried. ''' c_off_in = \ @@ -172,4 +173,4 @@ def main(host, port, path): if __name__ == '__main__': main('', 9009, os.path.abspath(__file__)) -
NicolasT revised this gist
Jan 13, 2013 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1 @@ *.pyc -
NicolasT created this gist
Jan 12, 2013 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,175 @@ ''' Demonstration of using splice from Python This code starts a TCP/IP server, waits for a connection, and once a connection has been made, launches a subprocess ('cat' of this file). Then, it transfers everything this subprocess outputs on stdout to the socket client. When no more data is available, everything is shut down. The server is fully blocking etc. etc. etc. even though splice(2) supports non-blocking execution. You should set any pipes to non-blocking mode (using fcntl or whatever) and call splice with the `SPLICE_F_NONBLOCK` flag set, then integrate FD read/write'ability with your mainloop and select/poll/epoll/... calls. This is very application/framework/library-specific, so I don't bother with it in this code. Notice you might need to wrap calls to splice in an exception handler to catch EWOULDBLOCK, EAGAIN,... The lot. Bindings to splice(2) are made using ctypes. This code is public domain as fully as possible in any applicable law, etc. etc. etc. It comes without warranty blah blah blah do whatever you want with it but don't blame me if anything breaks. If you find any errors, please let me know! ''' import os import os.path import errno import socket import subprocess import ctypes import ctypes.util def make_splice(): '''Set up a splice(2) wrapper''' # Load libc libc_name = ctypes.util.find_library('c') libc = ctypes.CDLL(libc_name, use_errno=True) # Get a handle to the 'splice' call c_splice = libc.splice # These should match for x86_64, might need some tweaking for other # platforms... c_loff_t = ctypes.c_uint64 c_loff_t_p = ctypes.POINTER(c_loff_t) # ssize_t splice(int fd_in, loff_t *off_in, int fd_out, # loff_t *off_out, size_t len, unsigned int flags) c_splice.argtypes = [ ctypes.c_int, c_loff_t_p, ctypes.c_int, c_loff_t_p, ctypes.c_size_t, ctypes.c_uint ] c_splice.restype = ctypes.c_ssize_t # Clean-up closure names. Yup, useless nit-picking. del libc del libc_name del c_loff_t_p # pylint: disable-msg=W0621,R0913 def splice(fd_in, off_in, fd_out, off_out, len_, flags): '''Wrapper for splice(2) See the syscall documentation ('man 2 splice') for more information about the arguments and return value. `off_in` and `off_out` can be `None`, which is equivalent to `NULL`. If the call to `splice` fails (i.e. returns -1), an `OSError` is raised with the appropriate `errno`. ''' c_off_in = \ ctypes.byref(c_loff_t(off_in)) if off_in is not None else None c_off_out = \ ctypes.byref(c_loff_t(off_out)) if off_out is not None else None # For handling EINTR... while True: res = c_splice(fd_in, c_off_in, fd_out, c_off_out, len_, flags) if res == -1: errno_ = ctypes.get_errno() # Try again on EINTR if errno_ == errno.EINTR: continue raise IOError(errno_, os.strerror(errno_)) return res return splice # Build and export wrapper splice = make_splice() #pylint: disable-msg=C0103 del make_splice # From bits/fcntl.h # Values for 'flags', can be OR'ed together SPLICE_F_MOVE = 1 SPLICE_F_NONBLOCK = 2 SPLICE_F_MORE = 4 SPLICE_F_GIFT = 8 def main(host, port, path): '''Server implementation''' # Set up a simple server socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) sock.listen(1) # Single accept, we'll clean up once this one connection has been handled. # Yes, this is a very stupid server indeed. conn, addr = sock.accept() print 'Connection from:', addr # Set up some subprocess which produces some output which should be # transferred to the client. # In this case, we just 'cat' this source file. argv = ['cat', path] # Might want to do something about stdin and stdout as well in a serious # application proc = subprocess.Popen(argv, close_fds=True, stdout=subprocess.PIPE) # We need the integer FDs for splice to work pipe_fd = proc.stdout.fileno() conn_fd = conn.fileno() #pylint: disable-msg=E1101 print 'Will splice data from FD', pipe_fd, 'to', conn_fd transferred = 0 # 32MB chunks chunksize = 32 * 1024 * 1024 # If you know the number of bytes to be transferred upfront, you could # change this into a 'while todo > 0', pass 'todo' to splice instead of the # arbitrary 'chunksize', and error out if splice returns 0 before all bytes # are transferred. # In this example, we just transfer as much as possible until the write-end # closes the pipe. while True: done = splice(pipe_fd, None, conn_fd, None, chunksize, SPLICE_F_MOVE | SPLICE_F_MORE) if done == 0: # Write-end of the source pipe has gone, no more data will be # available break transferred += done print 'Bytes transferred:', transferred # Close client and server socket conn.close() sock.close() # Wait for subprocess to finish (it should be finished by now anyway...) proc.wait() if __name__ == '__main__': main('', 9009, os.path.abspath(__file__))