| 123456789101112131415161718192021222324252627282930313233343536373839404142 |
- from __future__ import annotations
- from typing import Any
- from .auth import find_project_config
- from .http_client import request_json
- from .protocols import MODBUS_SPEC
- def modbus_point_collect_test(
- project_key: str,
- *,
- ip: str,
- port: int,
- slave_id: int,
- points: list[dict[str, Any]],
- device_type: str = "ModbusTCP",
- word_byte_order: str = "ABCD",
- address_base: int = 0,
- ) -> dict[str, Any]:
- project = find_project_config(project_key)
- if not MODBUS_SPEC.point_test_path:
- raise ValueError("modbus point test path is not configured")
- payload = {
- "device_type": device_type,
- "ip": ip,
- "port": port,
- "word_byte_order": word_byte_order,
- "address_base": address_base,
- "slave_id": slave_id,
- "points": points,
- }
- response_payload = request_json(
- "POST",
- f"{project['base_url']}{MODBUS_SPEC.point_test_path}",
- json_payload=payload,
- )
- if not isinstance(response_payload, dict):
- raise ValueError(f"gateway API returned invalid payload: {response_payload}")
- return response_payload
|