from __future__ import annotations from typing import Any from .auth import find_project_config, resolve_project_token from .http_client import request_json from .protocols import MODBUS_SPEC from .protocols.modbus import MODBUS_POINT_TYPE_ALIASES, MODBUS_REGISTER_TYPE_ALIASES def _merge_defaults(defaults: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]: merged = dict(defaults) merged.update(payload) return merged def _require_non_empty_text(payload: dict[str, Any], field_name: str) -> str: value = str(payload.get(field_name) or "").strip() if not value: raise ValueError(f"payload.{field_name} is required") return value def _require_present(payload: dict[str, Any], field_name: str) -> Any: if field_name not in payload or payload.get(field_name) is None: raise ValueError(f"payload.{field_name} is required") return payload[field_name] def _normalize_modbus_device_payload(payload: dict[str, Any]) -> dict[str, Any]: normalized = dict(payload) normalized["name"] = _require_non_empty_text(normalized, "name") normalized["ip"] = _require_non_empty_text(normalized, "ip") _require_present(normalized, "device_type") _require_present(normalized, "port") _require_present(normalized, "slave_id") _require_present(normalized, "word_order") _require_present(normalized, "byte_order") address_base = _require_present(normalized, "address_base") normalized["address_offset"] = address_base normalized.pop("address_base", None) return normalized def _normalize_modbus_point_payload(payload: dict[str, Any]) -> dict[str, Any]: normalized = dict(payload) normalized["name"] = _require_non_empty_text(normalized, "name") _require_present(normalized, "address") raw_type = _require_non_empty_text(normalized, "type") normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type) if normalized_type is None: normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type.lower()) if normalized_type is None: raise ValueError( "payload.type is invalid; use one of bool, int16, uint16, int32, " "uint32, int64, uint64, float32, float64, or a documented alias " "such as SHORT, WORD, LONG, DWORD, FLOAT, DOUBLE" ) normalized["type"] = normalized_type if normalized.get("func_code") is None: register_type = _require_non_empty_text(normalized, "register_type") func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type) if func_code is None: func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type.lower()) if func_code is None: raise ValueError( "payload.register_type is invalid; use coil, discrete_input, " "holding_register, input_register, or func_code 1/2/3/4" ) normalized["func_code"] = func_code else: try: normalized["func_code"] = int(str(normalized["func_code"]).strip()) except Exception as exc: raise ValueError("payload.func_code must be one of 1, 2, 3, 4") from exc if normalized["func_code"] not in {1, 2, 3, 4}: raise ValueError("payload.func_code must be one of 1, 2, 3, 4") if "register_type" in normalized: normalized.pop("register_type") return normalized def _request_collector( project_key: str, method: str, path: str, *, json_payload: dict[str, Any] | None = None, ) -> dict[str, Any]: project = find_project_config(project_key) authorization = resolve_project_token(project) response_payload = request_json( method, f"{project['data_collector_base_url']}{path}", authorization, json_payload=json_payload, ) if not isinstance(response_payload, dict): raise ValueError(f"collector API returned invalid payload for {path}: {response_payload}") return response_payload def create_modbus_device(project_key: str, payload: dict[str, Any]) -> dict[str, Any]: return _request_collector( project_key, "POST", MODBUS_SPEC.create_device_path, json_payload=_merge_defaults( MODBUS_SPEC.device_defaults, _normalize_modbus_device_payload(payload), ), ) def create_modbus_point(project_key: str, payload: dict[str, Any]) -> dict[str, Any]: return _request_collector( project_key, "POST", MODBUS_SPEC.create_point_path, json_payload=_merge_defaults( MODBUS_SPEC.point_defaults, _normalize_modbus_point_payload(payload), ), ) def list_devices(project_key: str, num_points: bool = False) -> dict[str, Any]: num_points_text = "true" if num_points else "false" return _request_collector( project_key, "GET", f"/api/collector/device?num_points={num_points_text}", )