"""One-off script to import mock Modbus server points from Excel into PostgreSQL.""" from __future__ import annotations from dataclasses import dataclass from pathlib import Path import psycopg2 from openpyxl import load_workbook from psycopg2.extras import execute_values BASE_DIR = Path(__file__).resolve().parent EXCEL_PATH = BASE_DIR / "mock_modbus_server_points.xlsx" DB_CONFIG = { "host": "192.168.1.109", "port": 48324, "dbname": "proj_dev2024_config", "user": "postgres", "password": "aragronprod", } TABLE_NAME = "modbus_server_point" BATCH_SIZE = 200 EXPECTED_HEADERS = ["point_id", "name", "data_type", "slave_id", "address"] SUPPORTED_DATA_TYPES = {"int16", "int32", "float32"} @dataclass(frozen=True) class ModbusPoint: point_id: str name: str data_type: str slave_id: int address: int def main() -> None: points = load_points_from_xlsx(EXCEL_PATH) validate_points(points) insert_points(points) print(f"Inserted {len(points)} rows into {TABLE_NAME}") def load_points_from_xlsx(path: Path) -> list[ModbusPoint]: if not path.exists(): raise FileNotFoundError(path) workbook = load_workbook(path, read_only=True, data_only=True) worksheet = workbook.active try: rows = worksheet.iter_rows(values_only=True) headers = [normalize_header(value) for value in next(rows)] if headers != EXPECTED_HEADERS: raise ValueError(f"Excel headers must be {EXPECTED_HEADERS}, got {headers}") points: list[ModbusPoint] = [] for row_number, row in enumerate(rows, start=2): if not any(value is not None and str(value).strip() for value in row): continue values = pad_row(list(row), len(EXPECTED_HEADERS)) points.append( ModbusPoint( point_id=require_text(values[0], row_number, "point_id"), name=require_text(values[1], row_number, "name"), data_type=require_text(values[2], row_number, "data_type"), slave_id=require_int(values[3], row_number, "slave_id"), address=require_int(values[4], row_number, "address"), ) ) return points finally: workbook.close() def normalize_header(value: object) -> str: return "" if value is None else str(value).strip() def pad_row(row: list[object], length: int) -> list[object | None]: return row[:length] + [None] * max(0, length - len(row)) def require_text(value: object, row_number: int, field: str) -> str: if value is None or str(value).strip() == "": raise ValueError(f"row {row_number}: {field} is required") return str(value).strip() def require_int(value: object, row_number: int, field: str) -> int: if isinstance(value, bool): raise ValueError(f"row {row_number}: {field} must be an integer") if isinstance(value, int): return value if isinstance(value, float) and value.is_integer(): return int(value) if isinstance(value, str) and value.strip().isdigit(): return int(value.strip()) raise ValueError(f"row {row_number}: {field} must be an integer") def validate_points(points: list[ModbusPoint]) -> None: if not points: raise ValueError("Excel has no data rows") errors: list[str] = [] seen_point_ids: set[str] = set() seen_slave_addresses: set[tuple[int, int]] = set() for point in points: if len(point.point_id) > 128: errors.append(f"point_id too long: {point.point_id}") if len(point.name) > 128: errors.append(f"name too long: {point.point_id}") if point.data_type not in SUPPORTED_DATA_TYPES: errors.append(f"unsupported data_type: {point.point_id}={point.data_type}") if not 1 <= point.slave_id <= 247: errors.append(f"slave_id out of range: {point.point_id}={point.slave_id}") if point.address < 0: errors.append(f"address out of range: {point.point_id}={point.address}") if point.point_id in seen_point_ids: errors.append(f"duplicate point_id: {point.point_id}") slave_address = (point.slave_id, point.address) if slave_address in seen_slave_addresses: errors.append(f"duplicate slave_id/address: {point.slave_id}/{point.address}") seen_point_ids.add(point.point_id) seen_slave_addresses.add(slave_address) if errors: preview = "\n".join(errors[:50]) suffix = "" if len(errors) <= 50 else f"\n... total errors={len(errors)}" raise ValueError(f"Point validation failed:\n{preview}{suffix}") def insert_points(points: list[ModbusPoint]) -> None: sql = f""" INSERT INTO {TABLE_NAME} (point_id, name, data_type, slave_id, address) VALUES %s """ rows = [(point.point_id, point.name, point.data_type, point.slave_id, point.address) for point in points] conn = psycopg2.connect(**DB_CONFIG) try: with conn.cursor() as cursor: for start in range(0, len(rows), BATCH_SIZE): execute_values(cursor, sql, rows[start:start + BATCH_SIZE]) conn.commit() except Exception: conn.rollback() raise finally: conn.close() if __name__ == "__main__": main()