| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- """One-off script to import mock Modbus server points from Excel into PostgreSQL."""
- from __future__ import annotations
- from dataclasses import dataclass
- from pathlib import Path
- import psycopg2
- from openpyxl import load_workbook
- from psycopg2.extras import execute_values
- BASE_DIR = Path(__file__).resolve().parent
- EXCEL_PATH = BASE_DIR / "mock_modbus_server_points.xlsx"
- DB_CONFIG = {
- "host": "192.168.1.109",
- "port": 48324,
- "dbname": "proj_dev2024_config",
- "user": "postgres",
- "password": "aragronprod",
- }
- TABLE_NAME = "modbus_server_point"
- BATCH_SIZE = 200
- EXPECTED_HEADERS = ["point_id", "name", "data_type", "slave_id", "address"]
- SUPPORTED_DATA_TYPES = {"int16", "int32", "float32"}
- @dataclass(frozen=True)
- class ModbusPoint:
- point_id: str
- name: str
- data_type: str
- slave_id: int
- address: int
- def main() -> None:
- points = load_points_from_xlsx(EXCEL_PATH)
- validate_points(points)
- insert_points(points)
- print(f"Inserted {len(points)} rows into {TABLE_NAME}")
- def load_points_from_xlsx(path: Path) -> list[ModbusPoint]:
- if not path.exists():
- raise FileNotFoundError(path)
- workbook = load_workbook(path, read_only=True, data_only=True)
- worksheet = workbook.active
- try:
- rows = worksheet.iter_rows(values_only=True)
- headers = [normalize_header(value) for value in next(rows)]
- if headers != EXPECTED_HEADERS:
- raise ValueError(f"Excel headers must be {EXPECTED_HEADERS}, got {headers}")
- points: list[ModbusPoint] = []
- for row_number, row in enumerate(rows, start=2):
- if not any(value is not None and str(value).strip() for value in row):
- continue
- values = pad_row(list(row), len(EXPECTED_HEADERS))
- points.append(
- ModbusPoint(
- point_id=require_text(values[0], row_number, "point_id"),
- name=require_text(values[1], row_number, "name"),
- data_type=require_text(values[2], row_number, "data_type"),
- slave_id=require_int(values[3], row_number, "slave_id"),
- address=require_int(values[4], row_number, "address"),
- )
- )
- return points
- finally:
- workbook.close()
- def normalize_header(value: object) -> str:
- return "" if value is None else str(value).strip()
- def pad_row(row: list[object], length: int) -> list[object | None]:
- return row[:length] + [None] * max(0, length - len(row))
- def require_text(value: object, row_number: int, field: str) -> str:
- if value is None or str(value).strip() == "":
- raise ValueError(f"row {row_number}: {field} is required")
- return str(value).strip()
- def require_int(value: object, row_number: int, field: str) -> int:
- if isinstance(value, bool):
- raise ValueError(f"row {row_number}: {field} must be an integer")
- if isinstance(value, int):
- return value
- if isinstance(value, float) and value.is_integer():
- return int(value)
- if isinstance(value, str) and value.strip().isdigit():
- return int(value.strip())
- raise ValueError(f"row {row_number}: {field} must be an integer")
- def validate_points(points: list[ModbusPoint]) -> None:
- if not points:
- raise ValueError("Excel has no data rows")
- errors: list[str] = []
- seen_point_ids: set[str] = set()
- seen_slave_addresses: set[tuple[int, int]] = set()
- for point in points:
- if len(point.point_id) > 128:
- errors.append(f"point_id too long: {point.point_id}")
- if len(point.name) > 128:
- errors.append(f"name too long: {point.point_id}")
- if point.data_type not in SUPPORTED_DATA_TYPES:
- errors.append(f"unsupported data_type: {point.point_id}={point.data_type}")
- if not 1 <= point.slave_id <= 247:
- errors.append(f"slave_id out of range: {point.point_id}={point.slave_id}")
- if point.address < 0:
- errors.append(f"address out of range: {point.point_id}={point.address}")
- if point.point_id in seen_point_ids:
- errors.append(f"duplicate point_id: {point.point_id}")
- slave_address = (point.slave_id, point.address)
- if slave_address in seen_slave_addresses:
- errors.append(f"duplicate slave_id/address: {point.slave_id}/{point.address}")
- seen_point_ids.add(point.point_id)
- seen_slave_addresses.add(slave_address)
- if errors:
- preview = "\n".join(errors[:50])
- suffix = "" if len(errors) <= 50 else f"\n... total errors={len(errors)}"
- raise ValueError(f"Point validation failed:\n{preview}{suffix}")
- def insert_points(points: list[ModbusPoint]) -> None:
- sql = f"""
- INSERT INTO {TABLE_NAME} (point_id, name, data_type, slave_id, address)
- VALUES %s
- """
- rows = [(point.point_id, point.name, point.data_type, point.slave_id, point.address) for point in points]
- conn = psycopg2.connect(**DB_CONFIG)
- try:
- with conn.cursor() as cursor:
- for start in range(0, len(rows), BATCH_SIZE):
- execute_values(cursor, sql, rows[start:start + BATCH_SIZE])
- conn.commit()
- except Exception:
- conn.rollback()
- raise
- finally:
- conn.close()
- if __name__ == "__main__":
- main()
|