Skip to content

Instantly share code, notes, and snippets.

@btctcn
Created November 20, 2019 12:17
Show Gist options
  • Select an option

  • Save btctcn/1c32e7500ec0c88a4127021f1de2e12a to your computer and use it in GitHub Desktop.

Select an option

Save btctcn/1c32e7500ec0c88a4127021f1de2e12a to your computer and use it in GitHub Desktop.
import threading
import socket
class ClientThread(threading.Thread): # при тестах это будет мок-обьект
def __init__(self, conn, addr):
super().__init__()
self._connection = conn # экземпляр класса socket
self._address = addr # ip-адрес и порт
def run(self):
print('Connection from address {}'.format(self._address)) # соединение через ip-адрес и порт
data = self._connection.recv(1024) # байт сообщение из сети, которое принимается с помощью метода recv
print('Received {}'.format(data.decode())) # декодируем байт-сообщение в строковое сообщение
self._connection.send(data) # передаем сообщение серверу
self._connection.close() # закрываем оединение
print('Closed connection from {}'.format(self._address))
class TcpServer:
# def __init__(self, host, port):
# self.host = host # ip-адрес
# self.port = port # номер порта
# self._socket = None # соединения пока нет
# self._runnning = False
def __init__(self, host, port, my_socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)):
self._socket = my_socket
self.host = host # ip-адрес
self.port = port # номер порта
self._runnning = False
def run(self):
# self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # создаем экземпляр класса соекет TCP
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # позволяет нескольким приложениям
# «слушать» сокет.
self._socket.bind(self.host, self.port) # устанавливаем соединение.Где-то сдесь нужно начать мок клиента
self._socket.listen(5)
self._runnning = True # открываем передачу сообщений
print('Server is up')
while self._runnning: # пока отрыта передача сообщений, запускаем потоки клиента, для
conn, addr = self._socket.accept() # где-то здесь должен начинаться мок клиента-трединга
ClientThread(conn, addr).start()
def stop(self):
self._runnning = False
self._socket.close()
print('Server is down')
if __name__ == '__main__':
srv = TcpServer(host='127.0.0.1', port=5555, my_socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM))
try:
srv.run()
except KeyboardInterrupt:
srv.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment