| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from __future__ import annotations
- from typing import Any
- from .auth import find_project_config, resolve_project_token, _request_json
- CONFIG_OPERATOR = "Dataturing"
- def _post_config(project_key: str, path: str, payload: dict[str, Any]) -> Any:
- project = find_project_config(project_key)
- authorization = resolve_project_token(project)
- response_payload = _request_json(
- "POST",
- f"{project['base_url']}{path}",
- authorization,
- json_payload=payload,
- )
- if not isinstance(response_payload, dict):
- raise ValueError(
- f"config API returned invalid payload for {path}: {response_payload}"
- )
- state = response_payload.get("state")
- if str(state) not in {"0", "0.0"}:
- state_info = str(response_payload.get("state_info") or "").strip()
- raise ValueError(
- f"config API failed for {path}: {state_info or response_payload}"
- )
- return response_payload
- def list_locations(
- project_key: str,
- keyword: str | None = None,
- page_size: int = 100,
- page_num: int = 1,
- ) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/location/list",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": page_size,
- "page_num": page_num,
- "keyword": keyword or "",
- },
- )
- def list_system_tree(project_key: str) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/system/tree",
- {"operator": CONFIG_OPERATOR},
- )
- def list_systems(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- system_type_id: int = 0,
- show_below: bool = True,
- ) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/system/list",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": page_size,
- "page_num": page_num,
- "system_type_id": system_type_id,
- "show_below": show_below,
- },
- )
- def list_device_types(project_key: str) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/devicetype/list",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": -1,
- },
- )
- def list_meter_types(project_key: str) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/metertype/list",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": -1,
- },
- )
- def search_devices(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- keyword: str | None = None,
- location_id: int = 0,
- show_below: bool = True,
- system_ids: list[int] | None = None,
- device_type_ids: list[int] | None = None,
- ) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/device/list",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": page_size,
- "page_num": page_num,
- "location_id": location_id,
- "show_below": show_below,
- "keyword": keyword or "",
- "system_ids": system_ids or [],
- "device_type_ids": device_type_ids or [],
- },
- )
- def search_meters(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- keyword: str | None = None,
- location_id: int = 0,
- show_below: bool = True,
- meter_type_id: int = 0,
- measurement_location_ids: list[int] | None = None,
- measurement_system_ids: list[int] | None = None,
- measurement_device_type_ids: list[int] | None = None,
- status: int | None = None,
- ) -> Any:
- payload: dict[str, Any] = {
- "operator": CONFIG_OPERATOR,
- "page_size": page_size,
- "page_num": page_num,
- "location_id": location_id,
- "show_below": show_below,
- "keyword": keyword or "",
- "meter_type_id": meter_type_id,
- "measurement_location_ids": measurement_location_ids or [],
- "measurement_system_ids": measurement_system_ids or [],
- "measurement_device_type_ids": measurement_device_type_ids or [],
- }
- if status is not None:
- payload["status"] = status
- return _post_config(project_key, "/api/configapi/meter/list", payload)
- def search_points(
- project_key: str, id: int, page_size: int = 100, page_num: int = 1
- ) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/meter/search_point",
- {
- "operator": CONFIG_OPERATOR,
- "page_size": page_size,
- "page_num": page_num,
- "id": id,
- },
- )
- def list_topologies_with_group(
- project_key: str, group_ids: list[int] | None = None
- ) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/topo/list_with_group",
- {
- "operator": CONFIG_OPERATOR,
- "group_ids": group_ids or [],
- },
- )
- def get_topology(project_key: str, id: int) -> Any:
- return _post_config(
- project_key,
- "/api/configapi/topo/get",
- {
- "operator": CONFIG_OPERATOR,
- "id": id,
- },
- )
|