"""Modbus TCP server helpers.""" import asyncio import logging from pymodbus.server.requesthandler import ServerRequestHandler from pymodbus.server.server import ModbusTcpServer logger = logging.getLogger(__name__) class LoggingServerRequestHandler(ServerRequestHandler): def _client_addr(self) -> str: if not self.transport: return "unknown" peer = self.transport.get_extra_info("peername") return "%s:%s" % peer if peer else "unknown" def callback_connected(self) -> None: super().callback_connected() logger.info("客户端已连接(Modbus): %s", self._client_addr()) def callback_disconnected(self, exc: Exception | None) -> None: client_addr = self._client_addr() super().callback_disconnected(exc) if exc: logger.info("客户端已断开(Modbus): %s, 原因=%s", client_addr, exc) else: logger.info("客户端已断开(Modbus): %s", client_addr) class LoggingModbusTcpServer(ModbusTcpServer): def callback_new_connection(self): return LoggingServerRequestHandler( self, self.trace_packet, self.trace_pdu, self.trace_connect, ) async def start_modbus_server(context, host: str, port: int) -> None: server = LoggingModbusTcpServer( context, address=(host, port), ignore_missing_devices=False, broadcast_enable=False, ) logger.info("服务已启动监听(Modbus TCP),地址=%s:%s", host, port) await server.serve_forever() def run_modbus_server(context, host: str, port: int) -> None: asyncio.run(start_modbus_server(context, host, port))