import_modbus_server_points.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. """One-off script to import mock Modbus server points from Excel into PostgreSQL."""
  2. from __future__ import annotations
  3. from dataclasses import dataclass
  4. from pathlib import Path
  5. import psycopg2
  6. from openpyxl import load_workbook
  7. from psycopg2.extras import execute_values
  8. BASE_DIR = Path(__file__).resolve().parent
  9. EXCEL_PATH = BASE_DIR / "mock_modbus_server_points.xlsx"
  10. DB_CONFIG = {
  11. "host": "192.168.1.109",
  12. "port": 48324,
  13. "dbname": "proj_dev2024_config",
  14. "user": "postgres",
  15. "password": "aragronprod",
  16. }
  17. TABLE_NAME = "modbus_server_point"
  18. BATCH_SIZE = 200
  19. EXPECTED_HEADERS = ["point_id", "name", "data_type", "slave_id", "address"]
  20. SUPPORTED_DATA_TYPES = {"int16", "int32", "float32"}
  21. @dataclass(frozen=True)
  22. class ModbusPoint:
  23. point_id: str
  24. name: str
  25. data_type: str
  26. slave_id: int
  27. address: int
  28. def main() -> None:
  29. points = load_points_from_xlsx(EXCEL_PATH)
  30. validate_points(points)
  31. insert_points(points)
  32. print(f"Inserted {len(points)} rows into {TABLE_NAME}")
  33. def load_points_from_xlsx(path: Path) -> list[ModbusPoint]:
  34. if not path.exists():
  35. raise FileNotFoundError(path)
  36. workbook = load_workbook(path, read_only=True, data_only=True)
  37. worksheet = workbook.active
  38. try:
  39. rows = worksheet.iter_rows(values_only=True)
  40. headers = [normalize_header(value) for value in next(rows)]
  41. if headers != EXPECTED_HEADERS:
  42. raise ValueError(f"Excel headers must be {EXPECTED_HEADERS}, got {headers}")
  43. points: list[ModbusPoint] = []
  44. for row_number, row in enumerate(rows, start=2):
  45. if not any(value is not None and str(value).strip() for value in row):
  46. continue
  47. values = pad_row(list(row), len(EXPECTED_HEADERS))
  48. points.append(
  49. ModbusPoint(
  50. point_id=require_text(values[0], row_number, "point_id"),
  51. name=require_text(values[1], row_number, "name"),
  52. data_type=require_text(values[2], row_number, "data_type"),
  53. slave_id=require_int(values[3], row_number, "slave_id"),
  54. address=require_int(values[4], row_number, "address"),
  55. )
  56. )
  57. return points
  58. finally:
  59. workbook.close()
  60. def normalize_header(value: object) -> str:
  61. return "" if value is None else str(value).strip()
  62. def pad_row(row: list[object], length: int) -> list[object | None]:
  63. return row[:length] + [None] * max(0, length - len(row))
  64. def require_text(value: object, row_number: int, field: str) -> str:
  65. if value is None or str(value).strip() == "":
  66. raise ValueError(f"row {row_number}: {field} is required")
  67. return str(value).strip()
  68. def require_int(value: object, row_number: int, field: str) -> int:
  69. if isinstance(value, bool):
  70. raise ValueError(f"row {row_number}: {field} must be an integer")
  71. if isinstance(value, int):
  72. return value
  73. if isinstance(value, float) and value.is_integer():
  74. return int(value)
  75. if isinstance(value, str) and value.strip().isdigit():
  76. return int(value.strip())
  77. raise ValueError(f"row {row_number}: {field} must be an integer")
  78. def validate_points(points: list[ModbusPoint]) -> None:
  79. if not points:
  80. raise ValueError("Excel has no data rows")
  81. errors: list[str] = []
  82. seen_point_ids: set[str] = set()
  83. seen_slave_addresses: set[tuple[int, int]] = set()
  84. for point in points:
  85. if len(point.point_id) > 128:
  86. errors.append(f"point_id too long: {point.point_id}")
  87. if len(point.name) > 128:
  88. errors.append(f"name too long: {point.point_id}")
  89. if point.data_type not in SUPPORTED_DATA_TYPES:
  90. errors.append(f"unsupported data_type: {point.point_id}={point.data_type}")
  91. if not 1 <= point.slave_id <= 247:
  92. errors.append(f"slave_id out of range: {point.point_id}={point.slave_id}")
  93. if point.address < 0:
  94. errors.append(f"address out of range: {point.point_id}={point.address}")
  95. if point.point_id in seen_point_ids:
  96. errors.append(f"duplicate point_id: {point.point_id}")
  97. slave_address = (point.slave_id, point.address)
  98. if slave_address in seen_slave_addresses:
  99. errors.append(f"duplicate slave_id/address: {point.slave_id}/{point.address}")
  100. seen_point_ids.add(point.point_id)
  101. seen_slave_addresses.add(slave_address)
  102. if errors:
  103. preview = "\n".join(errors[:50])
  104. suffix = "" if len(errors) <= 50 else f"\n... total errors={len(errors)}"
  105. raise ValueError(f"Point validation failed:\n{preview}{suffix}")
  106. def insert_points(points: list[ModbusPoint]) -> None:
  107. sql = f"""
  108. INSERT INTO {TABLE_NAME} (point_id, name, data_type, slave_id, address)
  109. VALUES %s
  110. """
  111. rows = [(point.point_id, point.name, point.data_type, point.slave_id, point.address) for point in points]
  112. conn = psycopg2.connect(**DB_CONFIG)
  113. try:
  114. with conn.cursor() as cursor:
  115. for start in range(0, len(rows), BATCH_SIZE):
  116. execute_values(cursor, sql, rows[start:start + BATCH_SIZE])
  117. conn.commit()
  118. except Exception:
  119. conn.rollback()
  120. raise
  121. finally:
  122. conn.close()
  123. if __name__ == "__main__":
  124. main()