| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- """Modbus TCP server helpers."""
- import asyncio
- import logging
- from pymodbus.server.requesthandler import ServerRequestHandler
- from pymodbus.server.server import ModbusTcpServer
- logger = logging.getLogger(__name__)
- class LoggingServerRequestHandler(ServerRequestHandler):
- def _client_addr(self) -> str:
- if not self.transport:
- return "unknown"
- peer = self.transport.get_extra_info("peername")
- return "%s:%s" % peer if peer else "unknown"
- def callback_connected(self) -> None:
- super().callback_connected()
- logger.info("客户端已连接(Modbus): %s", self._client_addr())
- def callback_disconnected(self, exc: Exception | None) -> None:
- client_addr = self._client_addr()
- super().callback_disconnected(exc)
- if exc:
- logger.info("客户端已断开(Modbus): %s, 原因=%s", client_addr, exc)
- else:
- logger.info("客户端已断开(Modbus): %s", client_addr)
- class LoggingModbusTcpServer(ModbusTcpServer):
- def callback_new_connection(self):
- return LoggingServerRequestHandler(
- self,
- self.trace_packet,
- self.trace_pdu,
- self.trace_connect,
- )
- async def start_modbus_server(context, host: str, port: int) -> None:
- server = LoggingModbusTcpServer(
- context,
- address=(host, port),
- ignore_missing_devices=False,
- broadcast_enable=False,
- )
- logger.info("服务已启动监听(Modbus TCP),地址=%s:%s", host, port)
- await server.serve_forever()
- def run_modbus_server(context, host: str, port: int) -> None:
- asyncio.run(start_modbus_server(context, host, port))
|