modbus_server.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. """Modbus TCP server helpers."""
  2. import asyncio
  3. import logging
  4. from pymodbus.server.requesthandler import ServerRequestHandler
  5. from pymodbus.server.server import ModbusTcpServer
  6. logger = logging.getLogger(__name__)
  7. class LoggingServerRequestHandler(ServerRequestHandler):
  8. def _client_addr(self) -> str:
  9. if not self.transport:
  10. return "unknown"
  11. peer = self.transport.get_extra_info("peername")
  12. return "%s:%s" % peer if peer else "unknown"
  13. def callback_connected(self) -> None:
  14. super().callback_connected()
  15. logger.info("客户端已连接(Modbus): %s", self._client_addr())
  16. def callback_disconnected(self, exc: Exception | None) -> None:
  17. client_addr = self._client_addr()
  18. super().callback_disconnected(exc)
  19. if exc:
  20. logger.info("客户端已断开(Modbus): %s, 原因=%s", client_addr, exc)
  21. else:
  22. logger.info("客户端已断开(Modbus): %s", client_addr)
  23. class LoggingModbusTcpServer(ModbusTcpServer):
  24. def callback_new_connection(self):
  25. return LoggingServerRequestHandler(
  26. self,
  27. self.trace_packet,
  28. self.trace_pdu,
  29. self.trace_connect,
  30. )
  31. async def start_modbus_server(context, host: str, port: int) -> None:
  32. server = LoggingModbusTcpServer(
  33. context,
  34. address=(host, port),
  35. ignore_missing_devices=False,
  36. broadcast_enable=False,
  37. )
  38. logger.info("服务已启动监听(Modbus TCP),地址=%s:%s", host, port)
  39. await server.serve_forever()
  40. def run_modbus_server(context, host: str, port: int) -> None:
  41. asyncio.run(start_modbus_server(context, host, port))