Wiznet has a board that adds ethernet capability to a Raspberry Pi Pico - the W5500-EVB-Pico. I'd like to use this to implement network connected instruments in my home lab.
There are plenty of examples on the documentation site, but the TCP examples all seem to use low-level socket programming. The latest Micropython implementations all have asyncio available so why not use the facilities available there?
For example Bytepawn's simple message queue server
Nope. Doesn't work. The problem seems to be that even though the Micropython documentation says that the asyncio.start_server function works in the same way as the standard library - it doesn't. After some digging on the net, I came up with the following.
This code uses start_server but then runs the resulting generator using an explicitly constructed asyncio event_loop. There is an advantage to this, though, in that this also allows us to start multiple tasks, or multiple servers at the same time.
The create_handler function builds a parametrised handler that is passed to start_server. It can be parametrised with a function that transforms from a string to a string. The default is just the identity function.
Run the program and use telnet to connect to to the board on either port 7777 - which acts as an echo server, or port 7778 which uppercases the reply.
import sys, asyncio
#--------------------------------------------------------------------------
# Board and network configuration
#--------------------------------------------------------------------------
# on micropython systems, this can be placed in boot.py
from machine import Pin,SPI
import network
import time
#W5x00 interface configuration
def config():
spi=SPI(0,2_000_000, mosi=Pin(19),miso=Pin(16),sck=Pin(18))
nic = network.WIZNET5K(spi,Pin(17),Pin(20)) #spi,cs,reset pin
nic.active(True)
#Use if static IP address is required
#nic.ifconfig(('192.168.1.20','255.255.255.0','192.168.1.1','8.8.8.8'))
#DHCP
nic.ifconfig('dhcp')
print('IP address :', nic.ifconfig())
while not nic.isconnected():
time.sleep(1)
print(nic.regs())
#--------------------------------------------------------------------------
# END network configuration
#--------------------------------------------------------------------------
async def handle_client(reader, writer):
print('New client connected...')
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
if line.strip() == '': continue
print(f'Received: {line.strip()}')
writer.write(line.encode('utf8'))
writer.close()
print('Client disconnected...')
def create_handler(transformer = lambda x : x):
async def handle_client(reader, writer):
print('New client connected...')
line = str()
while line.strip() != 'quit':
line = (await reader.readline()).decode('utf8')
if line.strip() == '': continue
print(f'Received: {line.strip()}')
writer.write(transformer(line).encode('utf8'))
writer.close()
print('Client disconnected...')
return handle_client
async def Main():
config()
loop = asyncio.get_event_loop()
loop.create_task(asyncio.start_server(create_handler(), '0.0.0.0',7777))
loop.create_task(asyncio.start_server(create_handler(str.upper), '0.0.0.0',7778))
loop.run_forever()
try:
asyncio.run(Main())
finally:
asyncio.new_event_loop()