config_api.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from __future__ import annotations
  2. from typing import Any
  3. from .auth import find_project_config, resolve_project_token, _request_json
  4. CONFIG_OPERATOR = "Dataturing"
  5. def _post_config(project_key: str, path: str, payload: dict[str, Any]) -> Any:
  6. project = find_project_config(project_key)
  7. authorization = resolve_project_token(project)
  8. response_payload = _request_json(
  9. "POST",
  10. f"{project['base_url']}{path}",
  11. authorization,
  12. json_payload=payload,
  13. )
  14. if not isinstance(response_payload, dict):
  15. raise ValueError(f"config API returned invalid payload for {path}: {response_payload}")
  16. state = response_payload.get("state")
  17. if str(state) not in {"0", "0.0"}:
  18. state_info = str(response_payload.get("state_info") or "").strip()
  19. raise ValueError(f"config API failed for {path}: {state_info or response_payload}")
  20. return response_payload
  21. def list_locations(project_key: str, keyword: str | None = None, page_size: int = 100, page_num: int = 1) -> Any:
  22. return _post_config(
  23. project_key,
  24. "/api/configapi/location/list",
  25. {
  26. "operator": CONFIG_OPERATOR,
  27. "page_size": page_size,
  28. "page_num": page_num,
  29. "keyword": keyword or "",
  30. },
  31. )
  32. def list_system_tree(project_key: str) -> Any:
  33. return _post_config(
  34. project_key,
  35. "/api/configapi/system/tree",
  36. {"operator": CONFIG_OPERATOR},
  37. )
  38. def list_systems(project_key: str, page_size: int = 100, page_num: int = 1, system_type_id: int = 0, show_below: bool = True) -> Any:
  39. return _post_config(
  40. project_key,
  41. "/api/configapi/system/list",
  42. {
  43. "operator": CONFIG_OPERATOR,
  44. "page_size": page_size,
  45. "page_num": page_num,
  46. "system_type_id": system_type_id,
  47. "show_below": show_below,
  48. },
  49. )
  50. def list_device_types(project_key: str) -> Any:
  51. return _post_config(
  52. project_key,
  53. "/api/configapi/devicetype/list",
  54. {
  55. "operator": CONFIG_OPERATOR,
  56. "page_size": -1,
  57. },
  58. )
  59. def list_meter_types(project_key: str) -> Any:
  60. return _post_config(
  61. project_key,
  62. "/api/configapi/metertype/list",
  63. {
  64. "operator": CONFIG_OPERATOR,
  65. "page_size": -1,
  66. },
  67. )
  68. def search_devices(
  69. project_key: str,
  70. page_size: int = 100,
  71. page_num: int = 1,
  72. keyword: str | None = None,
  73. location_id: int = 0,
  74. show_below: bool = True,
  75. system_ids: list[int] | None = None,
  76. device_type_ids: list[int] | None = None,
  77. ) -> Any:
  78. return _post_config(
  79. project_key,
  80. "/api/configapi/device/list",
  81. {
  82. "operator": CONFIG_OPERATOR,
  83. "page_size": page_size,
  84. "page_num": page_num,
  85. "location_id": location_id,
  86. "show_below": show_below,
  87. "keyword": keyword or "",
  88. "system_ids": system_ids or [],
  89. "device_type_ids": device_type_ids or [],
  90. },
  91. )
  92. def search_meters(
  93. project_key: str,
  94. page_size: int = 100,
  95. page_num: int = 1,
  96. keyword: str | None = None,
  97. location_id: int = 0,
  98. show_below: bool = True,
  99. meter_type_id: int = 0,
  100. measurement_location_ids: list[int] | None = None,
  101. measurement_system_ids: list[int] | None = None,
  102. measurement_device_type_ids: list[int] | None = None,
  103. status: int | None = None,
  104. ) -> Any:
  105. payload: dict[str, Any] = {
  106. "operator": CONFIG_OPERATOR,
  107. "page_size": page_size,
  108. "page_num": page_num,
  109. "location_id": location_id,
  110. "show_below": show_below,
  111. "keyword": keyword or "",
  112. "meter_type_id": meter_type_id,
  113. "measurement_location_ids": measurement_location_ids or [],
  114. "measurement_system_ids": measurement_system_ids or [],
  115. "measurement_device_type_ids": measurement_device_type_ids or [],
  116. }
  117. if status is not None:
  118. payload["status"] = status
  119. return _post_config(project_key, "/api/configapi/meter/list", payload)