| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- from __future__ import annotations
- from typing import Any
- from .auth import find_project_config, resolve_project_token
- from .http_client import request_json
- from .protocols import MODBUS_SPEC
- from .protocols.modbus import MODBUS_POINT_TYPE_ALIASES, MODBUS_REGISTER_TYPE_ALIASES
- def _merge_defaults(defaults: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]:
- merged = dict(defaults)
- merged.update(payload)
- return merged
- def _require_non_empty_text(payload: dict[str, Any], field_name: str) -> str:
- value = str(payload.get(field_name) or "").strip()
- if not value:
- raise ValueError(f"payload.{field_name} is required")
- return value
- def _require_present(payload: dict[str, Any], field_name: str) -> Any:
- if field_name not in payload or payload.get(field_name) is None:
- raise ValueError(f"payload.{field_name} is required")
- return payload[field_name]
- def _normalize_modbus_device_payload(payload: dict[str, Any]) -> dict[str, Any]:
- normalized = dict(payload)
- normalized["name"] = _require_non_empty_text(normalized, "name")
- normalized["ip"] = _require_non_empty_text(normalized, "ip")
- _require_present(normalized, "device_type")
- _require_present(normalized, "port")
- _require_present(normalized, "slave_id")
- _require_present(normalized, "word_order")
- _require_present(normalized, "byte_order")
- address_base = _require_present(normalized, "address_base")
- normalized["address_offset"] = address_base
- normalized.pop("address_base", None)
- return normalized
- def _normalize_modbus_point_payload(payload: dict[str, Any]) -> dict[str, Any]:
- normalized = dict(payload)
- normalized["name"] = _require_non_empty_text(normalized, "name")
- _require_present(normalized, "address")
- raw_type = _require_non_empty_text(normalized, "type")
- normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type)
- if normalized_type is None:
- normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type.lower())
- if normalized_type is None:
- raise ValueError(
- "payload.type is invalid; use one of bool, int16, uint16, int32, "
- "uint32, int64, uint64, float32, float64, or a documented alias "
- "such as SHORT, WORD, LONG, DWORD, FLOAT, DOUBLE"
- )
- normalized["type"] = normalized_type
- if normalized.get("func_code") is None:
- register_type = _require_non_empty_text(normalized, "register_type")
- func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type)
- if func_code is None:
- func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type.lower())
- if func_code is None:
- raise ValueError(
- "payload.register_type is invalid; use coil, discrete_input, "
- "holding_register, input_register, or func_code 1/2/3/4"
- )
- normalized["func_code"] = func_code
- else:
- try:
- normalized["func_code"] = int(str(normalized["func_code"]).strip())
- except Exception as exc:
- raise ValueError("payload.func_code must be one of 1, 2, 3, 4") from exc
- if normalized["func_code"] not in {1, 2, 3, 4}:
- raise ValueError("payload.func_code must be one of 1, 2, 3, 4")
- if "register_type" in normalized:
- normalized.pop("register_type")
- return normalized
- def _request_collector(
- project_key: str,
- method: str,
- path: str,
- *,
- json_payload: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- project = find_project_config(project_key)
- authorization = resolve_project_token(project)
- response_payload = request_json(
- method,
- f"{project['data_collector_base_url']}{path}",
- authorization,
- json_payload=json_payload,
- )
- if not isinstance(response_payload, dict):
- raise ValueError(f"collector API returned invalid payload for {path}: {response_payload}")
- return response_payload
- def create_modbus_device(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- MODBUS_SPEC.create_device_path,
- json_payload=_merge_defaults(
- MODBUS_SPEC.device_defaults,
- _normalize_modbus_device_payload(payload),
- ),
- )
- def create_modbus_point(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- MODBUS_SPEC.create_point_path,
- json_payload=_merge_defaults(
- MODBUS_SPEC.point_defaults,
- _normalize_modbus_point_payload(payload),
- ),
- )
- def list_devices(project_key: str, num_points: bool = False) -> dict[str, Any]:
- num_points_text = "true" if num_points else "false"
- return _request_collector(
- project_key,
- "GET",
- f"/api/collector/device?num_points={num_points_text}",
- )
|