Skip to content

Instantly share code, notes, and snippets.

@kamuridesu
Created October 19, 2022 10:50
Show Gist options
  • Select an option

  • Save kamuridesu/4fe27f1322003b5e46880f0d40b77030 to your computer and use it in GitHub Desktop.

Select an option

Save kamuridesu/4fe27f1322003b5e46880f0d40b77030 to your computer and use it in GitHub Desktop.
class Server:
def __init__(self, port: int | None = None):
self.HOST = "0.0.0.0"
self.PORT = PortSelector().choosen_port if port is None else port
self.__thread = None
self.__handle_threads = []
self.__TIPOS = Tipos()
self.__running = True
self.__server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if port is not None:
self.__server.bind((self.HOST, self.PORT))
else:
while True:
try:
self.__server.bind((self.HOST, self.PORT))
break
except OSError:
print("choosing random port")
self.PORT = PortSelector().choosen_port
self.__server.listen()
def stop(self):
if self.__thread is not None:
self.__thread.kill()
self.__thread.join()
print(self.__thread.pid)
self.__server.shutdown(socket.SHUT_RD)
self.__server.close()
self.__running = False
def start(self):
self.__thread = threading.Thread(target=self.run)
self.__thread.start()
def __handleIncoming(self, conn: socket.socket):
print("handling incoming")
while True:
data = conn.recv(1024)
if not data:
break
output = {
"status": "error",
}
if data == b"ref":
output = self.__TIPOS.reference
elif data == b"q":
self.stop()
else:
try:
tipo = self.__TIPOS.getTipo(int(data.decode("utf-8")))
output = tipo
except (IndexError, TypeError, ValueError, UnicodeDecodeError):
pass
data_to_send = json.dumps(output)
if not data_to_send.endswith("\n"):
data_to_send += "\n"
conn.sendall(data_to_send.encode("utf-8"))
conn.close()
def run(self):
print(f"Running on {self.HOST}:{self.PORT}")
try:
while self.__running:
[x.join() for x in self.__handle_threads if not x.is_alive()]
conn, addr = self.__server.accept()
p = threading.Thread(target=self.__handleIncoming, args=(conn,))
p.start()
self.__handle_threads.append(p)
except KeyboardInterrupt:
self.stop()
# dundlers start here
def __atexit__(self):
self.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment