point_loader.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. """Load and validate Modbus point configuration."""
  2. import logging
  3. from constants import DEFAULT_BATCH_SIZE, SUPPORTED_DATA_TYPES
  4. from point_model import ModbusPoint
  5. logger = logging.getLogger(__name__)
  6. def load_points(conn) -> list[ModbusPoint]:
  7. sql = """
  8. SELECT point_id, name, data_type, slave_id, address
  9. FROM modbus_server_point
  10. ORDER BY slave_id, address, point_id
  11. """
  12. with conn.cursor() as cursor:
  13. cursor.execute(sql)
  14. rows = cursor.fetchall()
  15. return [
  16. ModbusPoint(
  17. point_id=str(row[0]),
  18. name=str(row[1]),
  19. data_type=str(row[2]),
  20. slave_id=int(row[3]),
  21. address=int(row[4]),
  22. )
  23. for row in rows
  24. ]
  25. def validate_data_types(points: list[ModbusPoint]) -> list[str]:
  26. errors = [
  27. f"point_id={point.point_id}, data_type={point.data_type}"
  28. for point in points
  29. if point.data_type not in SUPPORTED_DATA_TYPES
  30. ]
  31. return errors
  32. def validate_address_overlaps(points: list[ModbusPoint]) -> list[str]:
  33. errors: list[str] = []
  34. by_slave: dict[int, list[ModbusPoint]] = {}
  35. for point in points:
  36. by_slave.setdefault(point.slave_id, []).append(point)
  37. for slave_id, slave_points in by_slave.items():
  38. previous: ModbusPoint | None = None
  39. for current in sorted(slave_points, key=lambda item: item.address):
  40. if previous and current.address <= previous.end_address:
  41. errors.append(
  42. "从站=%s 地址重叠: %s(%s) 范围=%s-%s, %s(%s) 范围=%s-%s"
  43. % (
  44. slave_id,
  45. previous.point_id,
  46. previous.data_type,
  47. previous.address,
  48. previous.end_address,
  49. current.point_id,
  50. current.data_type,
  51. current.address,
  52. current.end_address,
  53. )
  54. )
  55. if previous is None or current.end_address > previous.end_address:
  56. previous = current
  57. return errors
  58. def check_point_exists(conn, point_ids: list[str]) -> list[str]:
  59. if not point_ids:
  60. return []
  61. existing: set[str] = set()
  62. with conn.cursor() as cursor:
  63. for start in range(0, len(point_ids), DEFAULT_BATCH_SIZE):
  64. batch = point_ids[start:start + DEFAULT_BATCH_SIZE]
  65. cursor.execute("SELECT point_id FROM pt_point WHERE point_id = ANY(%s)", (batch,))
  66. existing.update(str(row[0]) for row in cursor.fetchall())
  67. logger.info("校验pt_point点位完成,数量=%d", len(point_ids))
  68. return sorted(set(point_ids) - existing)