| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- """Load and validate Modbus point configuration."""
- import logging
- from constants import DEFAULT_BATCH_SIZE, SUPPORTED_DATA_TYPES
- from point_model import ModbusPoint
- logger = logging.getLogger(__name__)
- def load_points(conn) -> list[ModbusPoint]:
- sql = """
- SELECT point_id, name, data_type, slave_id, address
- FROM modbus_server_point
- ORDER BY slave_id, address, point_id
- """
- with conn.cursor() as cursor:
- cursor.execute(sql)
- rows = cursor.fetchall()
- return [
- ModbusPoint(
- point_id=str(row[0]),
- name=str(row[1]),
- data_type=str(row[2]),
- slave_id=int(row[3]),
- address=int(row[4]),
- )
- for row in rows
- ]
- def validate_data_types(points: list[ModbusPoint]) -> list[str]:
- errors = [
- f"point_id={point.point_id}, data_type={point.data_type}"
- for point in points
- if point.data_type not in SUPPORTED_DATA_TYPES
- ]
- return errors
- def validate_address_overlaps(points: list[ModbusPoint]) -> list[str]:
- errors: list[str] = []
- by_slave: dict[int, list[ModbusPoint]] = {}
- for point in points:
- by_slave.setdefault(point.slave_id, []).append(point)
- for slave_id, slave_points in by_slave.items():
- previous: ModbusPoint | None = None
- for current in sorted(slave_points, key=lambda item: item.address):
- if previous and current.address <= previous.end_address:
- errors.append(
- "从站=%s 地址重叠: %s(%s) 范围=%s-%s, %s(%s) 范围=%s-%s"
- % (
- slave_id,
- previous.point_id,
- previous.data_type,
- previous.address,
- previous.end_address,
- current.point_id,
- current.data_type,
- current.address,
- current.end_address,
- )
- )
- if previous is None or current.end_address > previous.end_address:
- previous = current
- return errors
- def check_point_exists(conn, point_ids: list[str]) -> list[str]:
- if not point_ids:
- return []
- existing: set[str] = set()
- with conn.cursor() as cursor:
- for start in range(0, len(point_ids), DEFAULT_BATCH_SIZE):
- batch = point_ids[start:start + DEFAULT_BATCH_SIZE]
- cursor.execute("SELECT point_id FROM pt_point WHERE point_id = ANY(%s)", (batch,))
- existing.update(str(row[0]) for row in cursor.fetchall())
- logger.info("校验pt_point点位完成,数量=%d", len(point_ids))
- return sorted(set(point_ids) - existing)
|