"""Thread-safe Modbus register storage.""" from threading import RLock from pymodbus.constants import ExcCodes from modbus_codec import encode_registers from point_model import ModbusPoint class RegisterStore: def __init__(self): self._lock = RLock() self._registers: dict[int, dict[int, int]] = {} self._valid_addresses: dict[int, set[int]] = {} def initialize_slave(self, slave_id: int, registers: dict[int, int]) -> None: with self._lock: self._registers[slave_id] = dict(registers) self._valid_addresses[slave_id] = set(registers) def read_holding_registers(self, slave_id: int, address: int, count: int): with self._lock: slave_registers = self._registers.get(slave_id) valid_addresses = self._valid_addresses.get(slave_id) if not slave_registers or not valid_addresses: return ExcCodes.ILLEGAL_ADDRESS addresses = range(address, address + count) if any(item not in valid_addresses for item in addresses): return ExcCodes.ILLEGAL_ADDRESS return [slave_registers[item] for item in addresses] def write_internal(self, slave_id: int, address: int, values: list[int]) -> None: with self._lock: slave_registers = self._registers.get(slave_id) valid_addresses = self._valid_addresses.get(slave_id) if slave_registers is None or valid_addresses is None: raise KeyError(f"slave_id is not initialized: {slave_id}") for offset, register in enumerate(values): register_address = address + offset if register_address not in valid_addresses: raise KeyError(f"address is not configured: slave_id={slave_id}, address={register_address}") slave_registers[register_address] = register def describe(self) -> str: with self._lock: if not self._valid_addresses: return "无从站" parts = [] for slave_id in sorted(self._valid_addresses): addresses = self._valid_addresses[slave_id] if not addresses: parts.append(f"slave_id={slave_id}: 无地址") continue parts.append( f"slave_id={slave_id}: 地址范围={min(addresses)}-{max(addresses)}, 寄存器数量={len(addresses)}" ) return "; ".join(parts) def initialize_register_store(points: list[ModbusPoint], store: RegisterStore) -> None: by_slave: dict[int, dict[int, int]] = {} for point in points: registers = encode_registers(0, point.data_type) slave_registers = by_slave.setdefault(point.slave_id, {}) for offset, register in enumerate(registers): slave_registers[point.address + offset] = register for slave_id, registers in by_slave.items(): store.initialize_slave(slave_id, registers)