register_store.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. """Thread-safe Modbus register storage."""
  2. from threading import RLock
  3. from pymodbus.constants import ExcCodes
  4. from modbus_codec import encode_registers
  5. from point_model import ModbusPoint
  6. class RegisterStore:
  7. def __init__(self):
  8. self._lock = RLock()
  9. self._registers: dict[int, dict[int, int]] = {}
  10. self._valid_addresses: dict[int, set[int]] = {}
  11. def initialize_slave(self, slave_id: int, registers: dict[int, int]) -> None:
  12. with self._lock:
  13. self._registers[slave_id] = dict(registers)
  14. self._valid_addresses[slave_id] = set(registers)
  15. def read_holding_registers(self, slave_id: int, address: int, count: int):
  16. with self._lock:
  17. slave_registers = self._registers.get(slave_id)
  18. valid_addresses = self._valid_addresses.get(slave_id)
  19. if not slave_registers or not valid_addresses:
  20. return ExcCodes.ILLEGAL_ADDRESS
  21. addresses = range(address, address + count)
  22. if any(item not in valid_addresses for item in addresses):
  23. return ExcCodes.ILLEGAL_ADDRESS
  24. return [slave_registers[item] for item in addresses]
  25. def write_internal(self, slave_id: int, address: int, values: list[int]) -> None:
  26. with self._lock:
  27. slave_registers = self._registers.get(slave_id)
  28. valid_addresses = self._valid_addresses.get(slave_id)
  29. if slave_registers is None or valid_addresses is None:
  30. raise KeyError(f"slave_id is not initialized: {slave_id}")
  31. for offset, register in enumerate(values):
  32. register_address = address + offset
  33. if register_address not in valid_addresses:
  34. raise KeyError(f"address is not configured: slave_id={slave_id}, address={register_address}")
  35. slave_registers[register_address] = register
  36. def describe(self) -> str:
  37. with self._lock:
  38. if not self._valid_addresses:
  39. return "无从站"
  40. parts = []
  41. for slave_id in sorted(self._valid_addresses):
  42. addresses = self._valid_addresses[slave_id]
  43. if not addresses:
  44. parts.append(f"slave_id={slave_id}: 无地址")
  45. continue
  46. parts.append(
  47. f"slave_id={slave_id}: 地址范围={min(addresses)}-{max(addresses)}, 寄存器数量={len(addresses)}"
  48. )
  49. return "; ".join(parts)
  50. def initialize_register_store(points: list[ModbusPoint], store: RegisterStore) -> None:
  51. by_slave: dict[int, dict[int, int]] = {}
  52. for point in points:
  53. registers = encode_registers(0, point.data_type)
  54. slave_registers = by_slave.setdefault(point.slave_id, {})
  55. for offset, register in enumerate(registers):
  56. slave_registers[point.address + offset] = register
  57. for slave_id, registers in by_slave.items():
  58. store.initialize_slave(slave_id, registers)