| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- """Thread-safe Modbus register storage."""
- from threading import RLock
- from pymodbus.constants import ExcCodes
- from modbus_codec import encode_registers
- from point_model import ModbusPoint
- class RegisterStore:
- def __init__(self):
- self._lock = RLock()
- self._registers: dict[int, dict[int, int]] = {}
- self._valid_addresses: dict[int, set[int]] = {}
- def initialize_slave(self, slave_id: int, registers: dict[int, int]) -> None:
- with self._lock:
- self._registers[slave_id] = dict(registers)
- self._valid_addresses[slave_id] = set(registers)
- def read_holding_registers(self, slave_id: int, address: int, count: int):
- with self._lock:
- slave_registers = self._registers.get(slave_id)
- valid_addresses = self._valid_addresses.get(slave_id)
- if not slave_registers or not valid_addresses:
- return ExcCodes.ILLEGAL_ADDRESS
- addresses = range(address, address + count)
- if any(item not in valid_addresses for item in addresses):
- return ExcCodes.ILLEGAL_ADDRESS
- return [slave_registers[item] for item in addresses]
- def write_internal(self, slave_id: int, address: int, values: list[int]) -> None:
- with self._lock:
- slave_registers = self._registers.get(slave_id)
- valid_addresses = self._valid_addresses.get(slave_id)
- if slave_registers is None or valid_addresses is None:
- raise KeyError(f"slave_id is not initialized: {slave_id}")
- for offset, register in enumerate(values):
- register_address = address + offset
- if register_address not in valid_addresses:
- raise KeyError(f"address is not configured: slave_id={slave_id}, address={register_address}")
- slave_registers[register_address] = register
- def describe(self) -> str:
- with self._lock:
- if not self._valid_addresses:
- return "无从站"
- parts = []
- for slave_id in sorted(self._valid_addresses):
- addresses = self._valid_addresses[slave_id]
- if not addresses:
- parts.append(f"slave_id={slave_id}: 无地址")
- continue
- parts.append(
- f"slave_id={slave_id}: 地址范围={min(addresses)}-{max(addresses)}, 寄存器数量={len(addresses)}"
- )
- return "; ".join(parts)
- def initialize_register_store(points: list[ModbusPoint], store: RegisterStore) -> None:
- by_slave: dict[int, dict[int, int]] = {}
- for point in points:
- registers = encode_registers(0, point.data_type)
- slave_registers = by_slave.setdefault(point.slave_id, {})
- for offset, register in enumerate(registers):
- slave_registers[point.address + offset] = register
- for slave_id, registers in by_slave.items():
- store.initialize_slave(slave_id, registers)
|