#!/usr/bin/env python # Copyright (C) 2024 Abhijit Bose (aka. Boseji) # SPDX: MIT import logging from socketserver import TCPServer from collections import defaultdict from umodbus import conf from umodbus.server.tcp import RequestHandler, get_server from umodbus.utils import log_to_stream # Add stream handler to logger 'uModbus'. log_to_stream(level=logging.DEBUG) # Server Detail svr = ('localhost', 50222) # A very simple data store which maps address against their values. data_store = defaultdict(int) # Enable values to be signed (default is False). conf.SIGNED_VALUES = True TCPServer.allow_reuse_address = True # Open Socket app = get_server(TCPServer, svr, RequestHandler) @app.route(slave_ids=[1], function_codes=[3, 4], addresses=list(range(0, 10))) def read_data_store(slave_id, function_code, address): """" Return value of address. """ print("Fetch ADDR:", address) return data_store[address] @app.route(slave_ids=[1], function_codes=[6, 16], addresses=list(range(0, 10))) def write_data_store(slave_id, function_code, address, value): """" Set value for address. """ print("Store ADDR:", address, " Data:", value) data_store[address] = value if __name__ == '__main__': print("\n Starting Sever ", svr) try: app.serve_forever() finally: print("\n\n Closing down server...\n") app.shutdown() app.server_close()