Skip to content

Instantly share code, notes, and snippets.

@vaibhav-sinha
Last active June 20, 2021 13:56
Show Gist options
  • Select an option

  • Save vaibhav-sinha/ea1606857a7f3d76467d81d94b250d12 to your computer and use it in GitHub Desktop.

Select an option

Save vaibhav-sinha/ea1606857a7f3d76467d81d94b250d12 to your computer and use it in GitHub Desktop.
Writing your own HTTP Server - Implementing WSGI: Worker Implementation Changes
import logging
import queue
import threading
from typing import Type
from http_parser.pyparser import HttpParser
from gateway import WSGI
logger = logging.getLogger(__name__)
class Worker:
is_stopped = False
def __init__(self):
self.config = None
self.queue = None
self.gateway = None
self.kill_pill = None
def setup(self, config):
self.config = config
# What should be the size of this queue?
self.queue = queue.Queue(maxsize=config.get('concurrency', 10))
self.gateway = WSGI(config)
self.kill_pill = threading.Event()
threads = [RequestProcessorThread(name=f'RequestProcessor {i}', queue=self.queue, kill_pill=self.kill_pill, gateway=self.gateway) for i in range(config.get('concurrency', 10))]
for t in threads:
t.start()
def run(self, listener):
logger.info("Accepting connections now")
while not self.is_stopped:
sock, _ = listener.accept()
self.submit(sock)
def submit(self, sock):
try:
self.queue.put(sock, timeout=self.config.get('timeout', 5))
except queue.Full:
# What should we do here?
pass
def shutdown(self):
self.is_stopped = True
self.kill_pill.set()
class RequestProcessorThread(threading.Thread):
def __init__(self, group=None, target=None, name=None, queue:queue.Queue=None, kill_pill=None, gateway=None, args=(), kwargs=None):
super().__init__(group=group, target=target, name=name, args=args, kwargs=kwargs)
self.queue: Type[queue.Queue] = queue
self.kill_pill = kill_pill
self.gateway = gateway
def run(self) -> None:
logger.info(f"Running thread {self.name}")
while not self.kill_pill.is_set():
try:
socket = self.queue.get(block=True, timeout=1)
self.process(socket)
except queue.Empty:
continue
def process(self, sock):
p = HttpParser()
while True:
data = sock.recv(1024)
if not data:
# The client closed the connection. Nothing to do anymore
return
p.execute(data, len(data))
if p.is_message_complete():
break
def write(data):
sock.send(data)
self.gateway.process(p, write)
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment