| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- from __future__ import annotations
- from typing import Any
- from .auth import find_project_config, resolve_project_token
- from .http_client import request_json
- from .protocols import MODBUS_SPEC
- from .protocols.modbus import MODBUS_POINT_TYPE_ALIASES, MODBUS_REGISTER_TYPE_ALIASES
- def _merge_defaults(defaults: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]:
- merged = dict(defaults)
- merged.update(payload)
- return merged
- def _require_non_empty_text(payload: dict[str, Any], field_name: str) -> str:
- value = str(payload.get(field_name) or "").strip()
- if not value:
- raise ValueError(f"payload.{field_name} is required")
- return value
- def _require_present(payload: dict[str, Any], field_name: str) -> Any:
- if field_name not in payload or payload.get(field_name) is None:
- raise ValueError(f"payload.{field_name} is required")
- return payload[field_name]
- def _normalize_modbus_device_payload(payload: dict[str, Any]) -> dict[str, Any]:
- normalized = dict(payload)
- normalized["name"] = _require_non_empty_text(normalized, "name")
- normalized["ip"] = _require_non_empty_text(normalized, "ip")
- _require_present(normalized, "device_type")
- _require_present(normalized, "port")
- _require_present(normalized, "slave_id")
- _require_present(normalized, "word_order")
- _require_present(normalized, "byte_order")
- address_base = _require_present(normalized, "address_base")
- normalized["address_offset"] = address_base
- normalized.pop("address_base", None)
- return normalized
- def _normalize_modbus_device_edit_payload(payload: dict[str, Any]) -> dict[str, Any]:
- normalized = dict(payload)
- normalized["ori_id"] = _normalize_positive_int(_require_present(normalized, "ori_id"), "payload.ori_id")
- normalized["name"] = _require_non_empty_text(normalized, "name")
- if normalized.get("type") is None:
- normalized["type"] = _require_present(normalized, "device_type")
- try:
- normalized["type"] = int(normalized["type"])
- except Exception as exc:
- raise ValueError("payload.type must be one of 1, 2, 3, 4, 5") from exc
- if normalized["type"] not in {1, 2, 3, 4, 5}:
- raise ValueError("payload.type must be one of 1, 2, 3, 4, 5")
- normalized.pop("device_type", None)
- if normalized["type"] == 2:
- normalized["serial_port"] = _require_non_empty_text(normalized, "serial_port")
- else:
- normalized["ip"] = _require_non_empty_text(normalized, "ip")
- _require_present(normalized, "port")
- _require_present(normalized, "slave_id")
- _require_present(normalized, "word_order")
- _require_present(normalized, "byte_order")
- if "address_base" in normalized:
- normalized["address_offset"] = normalized["address_base"]
- normalized.pop("address_base", None)
- if "group_id" in normalized and "device_group_id" not in normalized:
- normalized["device_group_id"] = normalized["group_id"]
- normalized.pop("group_id", None)
- return normalized
- def _normalize_modbus_point_payload(payload: dict[str, Any]) -> dict[str, Any]:
- normalized = dict(payload)
- normalized["name"] = _require_non_empty_text(normalized, "name")
- _require_present(normalized, "address")
- raw_type = _require_non_empty_text(normalized, "type")
- normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type)
- if normalized_type is None:
- normalized_type = MODBUS_POINT_TYPE_ALIASES.get(raw_type.lower())
- if normalized_type is None:
- raise ValueError(
- "payload.type is invalid; use one of bool, int16, uint16, int32, "
- "uint32, int64, uint64, float32, float64, or a documented alias "
- "such as SHORT, WORD, LONG, DWORD, FLOAT, DOUBLE"
- )
- normalized["type"] = normalized_type
- if normalized.get("func_code") is None:
- register_type = _require_non_empty_text(normalized, "register_type")
- func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type)
- if func_code is None:
- func_code = MODBUS_REGISTER_TYPE_ALIASES.get(register_type.lower())
- if func_code is None:
- raise ValueError(
- "payload.register_type is invalid; use coil, discrete_input, "
- "holding_register, input_register, or func_code 1/2/3/4"
- )
- normalized["func_code"] = func_code
- else:
- try:
- normalized["func_code"] = int(str(normalized["func_code"]).strip())
- except Exception as exc:
- raise ValueError("payload.func_code must be one of 1, 2, 3, 4") from exc
- if normalized["func_code"] not in {1, 2, 3, 4}:
- raise ValueError("payload.func_code must be one of 1, 2, 3, 4")
- if "register_type" in normalized:
- normalized.pop("register_type")
- return normalized
- def _normalize_modbus_point_edit_payload(payload: dict[str, Any]) -> dict[str, Any]:
- _require_present(payload, "ori_id")
- normalized = _normalize_modbus_point_payload(payload)
- normalized["ori_id"] = _normalize_positive_int(_require_present(normalized, "ori_id"), "payload.ori_id")
- return normalized
- def _request_collector(
- project_key: str,
- method: str,
- path: str,
- *,
- json_payload: dict[str, Any] | None = None,
- ) -> dict[str, Any]:
- project = find_project_config(project_key)
- authorization = resolve_project_token(project)
- response_payload = request_json(
- method,
- f"{project['data_collector_base_url']}{path}",
- authorization,
- json_payload=json_payload,
- )
- if not isinstance(response_payload, dict):
- raise ValueError(f"collector API returned invalid payload for {path}: {response_payload}")
- return response_payload
- def create_modbus_device(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- MODBUS_SPEC.create_device_path,
- json_payload=_merge_defaults(
- MODBUS_SPEC.device_defaults,
- _normalize_modbus_device_payload(payload),
- ),
- )
- def create_modbus_point(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- MODBUS_SPEC.create_point_path,
- json_payload=_merge_defaults(
- MODBUS_SPEC.point_defaults,
- _normalize_modbus_point_payload(payload),
- ),
- )
- def edit_modbus_device(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- "/api/collector/modbus/device/edit",
- json_payload=_merge_defaults(
- {
- "timeout": 3,
- "is_persistent": True,
- "device_group_id": 0,
- "alarm_interval": 90,
- "collect_interval": 5,
- "address_offset": 0,
- "retry_times": 0,
- "mode": 0,
- },
- _normalize_modbus_device_edit_payload(payload),
- ),
- )
- def edit_modbus_point(project_key: str, payload: dict[str, Any]) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- "/api/collector/modbus/point/edit_collect_point",
- json_payload=_merge_defaults(
- MODBUS_SPEC.point_defaults,
- _normalize_modbus_point_edit_payload(payload),
- ),
- )
- def list_devices(project_key: str, num_points: bool = False) -> dict[str, Any]:
- num_points_text = "true" if num_points else "false"
- return _request_collector(
- project_key,
- "GET",
- f"/api/collector/device?num_points={num_points_text}",
- )
- def connect_device(project_key: str, device_id: int, device_type: str = "modbus") -> dict[str, Any]:
- return _set_device_connect_status(
- project_key,
- device_id=device_id,
- device_type=device_type,
- status=2,
- )
- def disconnect_device(project_key: str, device_id: int, device_type: str = "modbus") -> dict[str, Any]:
- return _set_device_connect_status(
- project_key,
- device_id=device_id,
- device_type=device_type,
- status=1,
- )
- def list_device_points(
- project_key: str,
- device_id: int,
- device_type: str = "modbus",
- group_id: int = 0,
- ) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- "/api/collector/common/device/get_collect_point",
- json_payload={
- "id": _normalize_positive_int(device_id, "device_id"),
- "type": _normalize_device_type(device_type),
- "group_id": _normalize_non_negative_int(group_id, "group_id"),
- },
- )
- def _set_device_connect_status(
- project_key: str,
- *,
- device_id: int,
- device_type: str,
- status: int,
- ) -> dict[str, Any]:
- return _request_collector(
- project_key,
- "POST",
- "/api/collector/common/device/set_connect_status",
- json_payload={
- "id": _normalize_positive_int(device_id, "device_id"),
- "type": _normalize_device_type(device_type),
- "status": status,
- },
- )
- def _normalize_device_type(device_type: str) -> str:
- normalized = str(device_type or "").strip()
- if not normalized:
- raise ValueError("device_type is required")
- return normalized
- def _normalize_positive_int(value: int, field_name: str) -> int:
- try:
- normalized = int(value)
- except Exception as exc:
- raise ValueError(f"{field_name} must be a positive integer") from exc
- if normalized <= 0:
- raise ValueError(f"{field_name} must be a positive integer")
- return normalized
- def _normalize_non_negative_int(value: int, field_name: str) -> int:
- try:
- normalized = int(value)
- except Exception as exc:
- raise ValueError(f"{field_name} must be a non-negative integer") from exc
- if normalized < 0:
- raise ValueError(f"{field_name} must be a non-negative integer")
- return normalized
|