gateway_api.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from __future__ import annotations
  2. from typing import Any
  3. from .auth import find_project_config
  4. from .http_client import request_json
  5. from .protocols import MODBUS_SPEC
  6. def modbus_point_collect_test(
  7. project_key: str,
  8. *,
  9. ip: str,
  10. port: int,
  11. slave_id: int,
  12. points: list[dict[str, Any]],
  13. device_type: str = "ModbusTCP",
  14. word_byte_order: str = "ABCD",
  15. address_base: int = 0,
  16. ) -> dict[str, Any]:
  17. project = find_project_config(project_key)
  18. if not MODBUS_SPEC.point_test_path:
  19. raise ValueError("modbus point test path is not configured")
  20. payload = {
  21. "device_type": device_type,
  22. "ip": ip,
  23. "port": port,
  24. "word_byte_order": word_byte_order,
  25. "address_base": address_base,
  26. "slave_id": slave_id,
  27. "points": points,
  28. }
  29. response_payload = request_json(
  30. "POST",
  31. f"{project['base_url']}{MODBUS_SPEC.point_test_path}",
  32. json_payload=payload,
  33. )
  34. if not isinstance(response_payload, dict):
  35. raise ValueError(f"gateway API returned invalid payload: {response_payload}")
  36. return response_payload