config_api.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from __future__ import annotations
  2. from typing import Any
  3. from .auth import find_project_config, resolve_project_token, _request_json
  4. CONFIG_OPERATOR = "Dataturing"
  5. def _post_config(project_key: str, path: str, payload: dict[str, Any]) -> Any:
  6. project = find_project_config(project_key)
  7. authorization = resolve_project_token(project)
  8. response_payload = _request_json(
  9. "POST",
  10. f"{project['base_url']}{path}",
  11. authorization,
  12. json_payload=payload,
  13. )
  14. if not isinstance(response_payload, dict):
  15. raise ValueError(
  16. f"config API returned invalid payload for {path}: {response_payload}"
  17. )
  18. state = response_payload.get("state")
  19. if str(state) not in {"0", "0.0"}:
  20. state_info = str(response_payload.get("state_info") or "").strip()
  21. raise ValueError(
  22. f"config API failed for {path}: {state_info or response_payload}"
  23. )
  24. return response_payload
  25. def list_locations(
  26. project_key: str,
  27. keyword: str | None = None,
  28. page_size: int = 100,
  29. page_num: int = 1,
  30. ) -> Any:
  31. return _post_config(
  32. project_key,
  33. "/api/configapi/location/list",
  34. {
  35. "operator": CONFIG_OPERATOR,
  36. "page_size": page_size,
  37. "page_num": page_num,
  38. "keyword": keyword or "",
  39. },
  40. )
  41. def list_system_tree(project_key: str) -> Any:
  42. return _post_config(
  43. project_key,
  44. "/api/configapi/system/tree",
  45. {"operator": CONFIG_OPERATOR},
  46. )
  47. def list_systems(
  48. project_key: str,
  49. page_size: int = 100,
  50. page_num: int = 1,
  51. system_type_id: int = 0,
  52. show_below: bool = True,
  53. ) -> Any:
  54. return _post_config(
  55. project_key,
  56. "/api/configapi/system/list",
  57. {
  58. "operator": CONFIG_OPERATOR,
  59. "page_size": page_size,
  60. "page_num": page_num,
  61. "system_type_id": system_type_id,
  62. "show_below": show_below,
  63. },
  64. )
  65. def list_device_types(project_key: str) -> Any:
  66. return _post_config(
  67. project_key,
  68. "/api/configapi/devicetype/list",
  69. {
  70. "operator": CONFIG_OPERATOR,
  71. "page_size": -1,
  72. },
  73. )
  74. def list_meter_types(project_key: str) -> Any:
  75. return _post_config(
  76. project_key,
  77. "/api/configapi/metertype/list",
  78. {
  79. "operator": CONFIG_OPERATOR,
  80. "page_size": -1,
  81. },
  82. )
  83. def search_devices(
  84. project_key: str,
  85. page_size: int = 100,
  86. page_num: int = 1,
  87. keyword: str | None = None,
  88. location_id: int = 0,
  89. show_below: bool = True,
  90. system_ids: list[int] | None = None,
  91. device_type_ids: list[int] | None = None,
  92. ) -> Any:
  93. return _post_config(
  94. project_key,
  95. "/api/configapi/device/list",
  96. {
  97. "operator": CONFIG_OPERATOR,
  98. "page_size": page_size,
  99. "page_num": page_num,
  100. "location_id": location_id,
  101. "show_below": show_below,
  102. "keyword": keyword or "",
  103. "system_ids": system_ids or [],
  104. "device_type_ids": device_type_ids or [],
  105. },
  106. )
  107. def search_meters(
  108. project_key: str,
  109. page_size: int = 100,
  110. page_num: int = 1,
  111. keyword: str | None = None,
  112. location_id: int = 0,
  113. show_below: bool = True,
  114. meter_type_id: int = 0,
  115. measurement_location_ids: list[int] | None = None,
  116. measurement_system_ids: list[int] | None = None,
  117. measurement_device_type_ids: list[int] | None = None,
  118. status: int | None = None,
  119. ) -> Any:
  120. payload: dict[str, Any] = {
  121. "operator": CONFIG_OPERATOR,
  122. "page_size": page_size,
  123. "page_num": page_num,
  124. "location_id": location_id,
  125. "show_below": show_below,
  126. "keyword": keyword or "",
  127. "meter_type_id": meter_type_id,
  128. "measurement_location_ids": measurement_location_ids or [],
  129. "measurement_system_ids": measurement_system_ids or [],
  130. "measurement_device_type_ids": measurement_device_type_ids or [],
  131. }
  132. if status is not None:
  133. payload["status"] = status
  134. return _post_config(project_key, "/api/configapi/meter/list", payload)
  135. def search_points(
  136. project_key: str, id: int, page_size: int = 100, page_num: int = 1
  137. ) -> Any:
  138. return _post_config(
  139. project_key,
  140. "/api/configapi/meter/search_point",
  141. {
  142. "operator": CONFIG_OPERATOR,
  143. "page_size": page_size,
  144. "page_num": page_num,
  145. "id": id,
  146. },
  147. )
  148. def list_topologies_with_group(
  149. project_key: str, group_ids: list[int] | None = None
  150. ) -> Any:
  151. return _post_config(
  152. project_key,
  153. "/api/configapi/topo/list_with_group",
  154. {
  155. "operator": CONFIG_OPERATOR,
  156. "group_ids": group_ids or [],
  157. },
  158. )
  159. def get_topology(project_key: str, id: int) -> Any:
  160. return _post_config(
  161. project_key,
  162. "/api/configapi/topo/get",
  163. {
  164. "operator": CONFIG_OPERATOR,
  165. "id": id,
  166. },
  167. )