| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783 |
- from __future__ import annotations
- from collections import defaultdict, deque
- from concurrent.futures import ThreadPoolExecutor
- from datetime import datetime, timezone
- import json
- from typing import Any
- from sqlalchemy import (
- Index,
- Integer,
- String,
- Text,
- UniqueConstraint,
- delete,
- inspect,
- select,
- text,
- )
- from sqlalchemy.orm import Mapped, Session, mapped_column
- from .config_api import list_topologies_with_group as api_list_topologies_with_group
- from .config_api import list_locations as api_list_locations
- from .config_api import list_system_tree as api_list_system_tree
- from .config_api import list_systems as api_list_systems
- from .config_api import list_device_types as api_list_device_types
- from .config_api import list_meter_types as api_list_meter_types
- from .config_api import search_devices as api_search_devices
- from .config_api import search_meters as api_search_meters
- from .config_api import get_topology_data as api_get_topology_data
- from .config_api import get_topology as api_get_topology
- from .db import Base, sql_engine
- PT_OBJ_TYPE_LOCATION = 11
- PT_OBJ_TYPE_SYSTEMTYPE = 12
- PT_OBJ_TYPE_SYSTEM = 13
- PT_OBJ_TYPE_DEVICETYPE = 14
- PT_OBJ_TYPE_DEVICE = 15
- PT_OBJ_TYPE_METERTYPE = 16
- PT_OBJ_TYPE_METERMODEL = 17
- PT_OBJ_TYPE_METER = 18
- PT_OBJ_TYPE_TOPOGROUP = 19
- PT_OBJ_TYPE_TOPODIAGRAM = 20
- OBJECT_TYPE_LABELS = {
- PT_OBJ_TYPE_LOCATION: "位置",
- PT_OBJ_TYPE_SYSTEMTYPE: "系统类型",
- PT_OBJ_TYPE_SYSTEM: "系统",
- PT_OBJ_TYPE_DEVICETYPE: "设备类型",
- PT_OBJ_TYPE_DEVICE: "设备",
- PT_OBJ_TYPE_METERTYPE: "仪表类型",
- PT_OBJ_TYPE_METERMODEL: "仪表型号",
- PT_OBJ_TYPE_METER: "仪表",
- PT_OBJ_TYPE_TOPOGROUP: "拓扑图分组",
- PT_OBJ_TYPE_TOPODIAGRAM: "拓扑图",
- }
- ORDER_LABELS = {1: "顺序", 2: "逆序"}
- FILTER_TYPE_LABELS = {1: "所有", 2: "任一"}
- MATCH_TYPE_LABELS = {1: "等于", 2: "不等于", 3: "包含", 4: "不包含"}
- def _utc_now_iso() -> str:
- return datetime.now(timezone.utc).isoformat()
- def _current_unix_ts() -> int:
- return int(datetime.now().timestamp())
- def _safe_int(raw_value: Any) -> int | None:
- if raw_value is None:
- return None
- try:
- return int(str(raw_value).strip())
- except Exception:
- return None
- def _text(raw_value: Any) -> str:
- return str(raw_value or "").strip()
- def _bool_as_int(raw_value: Any) -> int:
- return 1 if bool(raw_value) else 0
- def _normalize_entity_type(entity_type: str) -> str:
- normalized = _text(entity_type).lower()
- if normalized not in {"meter", "device"}:
- raise ValueError("entity_type must be 'meter' or 'device'")
- return normalized
- class TopologyGroup(Base):
- __tablename__ = "topology_group"
- __table_args__ = (
- UniqueConstraint(
- "project_key", "group_id", name="uq_topology_group_project_group"
- ),
- Index("ix_topology_group_project_parent", "project_key", "parent_group_id"),
- )
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- project_key: Mapped[str] = mapped_column(String(128), nullable=False)
- group_id: Mapped[int] = mapped_column(Integer, nullable=False)
- group_name: Mapped[str] = mapped_column(String(255), nullable=False)
- parent_group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
- group_path_text: Mapped[str] = mapped_column(Text, nullable=False)
- level: Mapped[int] = mapped_column(Integer, nullable=False)
- sort_index: Mapped[int] = mapped_column(Integer, nullable=False)
- refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
- is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
- class TopologyRegistry(Base):
- __tablename__ = "topology_registry"
- __table_args__ = (
- UniqueConstraint(
- "project_key", "topology_id", name="uq_topology_registry_project_topology"
- ),
- Index("ix_topology_registry_project_group", "project_key", "group_id"),
- )
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- project_key: Mapped[str] = mapped_column(String(128), nullable=False)
- topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
- topology_name: Mapped[str] = mapped_column(String(255), nullable=False)
- topology_type: Mapped[int] = mapped_column(Integer, nullable=False)
- object_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
- group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
- root_shape: Mapped[str] = mapped_column(String(32), nullable=False)
- source_updated_time: Mapped[str] = mapped_column(
- String(64), nullable=False, default=""
- )
- data_options_json: Mapped[str] = mapped_column(Text, nullable=False, default="")
- dimension_config_json: Mapped[str] = mapped_column(
- Text, nullable=False, default=""
- )
- refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
- is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
- class TopologyNode(Base):
- __tablename__ = "topology_node"
- __table_args__ = (
- UniqueConstraint(
- "project_key",
- "topology_id",
- "node_id",
- name="uq_topology_node_project_topology_node",
- ),
- Index(
- "ix_topology_node_project_topology_parent",
- "project_key",
- "topology_id",
- "parent_node_id",
- ),
- Index(
- "ix_topology_node_project_topology_refer",
- "project_key",
- "topology_id",
- "refer_id",
- ),
- )
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- project_key: Mapped[str] = mapped_column(String(128), nullable=False)
- topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
- node_id: Mapped[str] = mapped_column(Text, nullable=False)
- node_name: Mapped[str] = mapped_column(Text, nullable=False)
- parent_node_id: Mapped[str | None] = mapped_column(Text, nullable=True)
- level: Mapped[int | None] = mapped_column(Integer, nullable=True)
- node_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
- refer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
- refer_level: Mapped[int | None] = mapped_column(Integer, nullable=True)
- is_virtual: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
- path_text: Mapped[str] = mapped_column(Text, nullable=False, default="")
- child_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
- sort_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
- class TopologyEdge(Base):
- __tablename__ = "topology_edge"
- __table_args__ = (
- UniqueConstraint(
- "project_key",
- "topology_id",
- "source_node_id",
- "target_node_id",
- name="uq_topology_edge_project_topology_nodes",
- ),
- Index(
- "ix_topology_edge_project_topology_source",
- "project_key",
- "topology_id",
- "source_node_id",
- ),
- Index(
- "ix_topology_edge_project_topology_target",
- "project_key",
- "topology_id",
- "target_node_id",
- ),
- )
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- project_key: Mapped[str] = mapped_column(String(128), nullable=False)
- topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
- source_node_id: Mapped[str] = mapped_column(Text, nullable=False)
- target_node_id: Mapped[str] = mapped_column(Text, nullable=False)
- sort_index: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
- class TopologyEntityIndex(Base):
- __tablename__ = "topology_entity_index"
- __table_args__ = (
- UniqueConstraint(
- "project_key",
- "entity_type",
- "entity_id",
- "topology_id",
- "node_id",
- name="uq_topology_entity_index_project_entity_topology_node",
- ),
- Index(
- "ix_topology_entity_index_project_entity",
- "project_key",
- "entity_type",
- "entity_id",
- ),
- Index(
- "ix_topology_entity_index_project_topology_node",
- "project_key",
- "topology_id",
- "node_id",
- ),
- )
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- project_key: Mapped[str] = mapped_column(String(128), nullable=False)
- entity_type: Mapped[str] = mapped_column(String(16), nullable=False)
- entity_id: Mapped[int] = mapped_column(Integer, nullable=False)
- topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
- node_id: Mapped[str] = mapped_column(Text, nullable=False)
- depth: Mapped[int | None] = mapped_column(Integer, nullable=True)
- def ensure_topology_cache_tables() -> None:
- Base.metadata.create_all(sql_engine())
- engine = sql_engine()
- topology_registry_columns = {
- item["name"] for item in inspect(engine).get_columns("topology_registry")
- }
- with engine.begin() as connection:
- if "data_options_json" not in topology_registry_columns:
- connection.execute(
- text("ALTER TABLE topology_registry ADD COLUMN data_options_json TEXT")
- )
- if "dimension_config_json" not in topology_registry_columns:
- connection.execute(
- text(
- "ALTER TABLE topology_registry ADD COLUMN dimension_config_json TEXT"
- )
- )
- def _dump_json_text(raw_value: Any) -> str:
- if raw_value is None:
- return ""
- return json.dumps(raw_value, ensure_ascii=False, separators=(",", ":"))
- def _load_json_text(raw_value: str) -> Any:
- text_value = _text(raw_value)
- if not text_value:
- return None
- try:
- return json.loads(text_value)
- except json.JSONDecodeError:
- return text_value
- def _build_metric_definitions(data_options: Any) -> dict[str, dict[str, Any]]:
- if not isinstance(data_options, dict):
- return {"instant": {}, "accu": {}}
- result: dict[str, dict[str, Any]] = {"instant": {}, "accu": {}}
- for display in ("instant", "accu"):
- items = data_options.get(display)
- if not isinstance(items, list):
- continue
- mapped: dict[str, Any] = {}
- for item in items:
- if not isinstance(item, dict):
- continue
- code = _text(item.get("code"))
- if not code:
- continue
- mapped[code] = {
- "name": _text(item.get("name")),
- "unit": _text(item.get("unit")),
- "type": _safe_int(item.get("type")),
- "display": bool(item.get("display")),
- "value_key": _text(item.get("value")),
- }
- result[display] = mapped
- return result
- def _extract_page_items(payload: Any) -> list[dict[str, Any]]:
- if not isinstance(payload, dict):
- return []
- data = payload.get("data")
- if isinstance(data, dict):
- items = data.get("data")
- if isinstance(items, list):
- return [item for item in items if isinstance(item, dict)]
- if isinstance(data, list):
- return [item for item in data if isinstance(item, dict)]
- return []
- def _load_all_pages(fetch_page: Any) -> list[dict[str, Any]]:
- page_num = 1
- result: list[dict[str, Any]] = []
- while True:
- payload = fetch_page(page_num)
- result.extend(_extract_page_items(payload))
- data = payload.get("data") if isinstance(payload, dict) else None
- if not isinstance(data, dict):
- break
- total_page = _safe_int(data.get("total_page")) or page_num
- if page_num >= total_page:
- break
- page_num += 1
- return result
- def _flatten_tree_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
- result: list[dict[str, Any]] = []
- queue = deque(items)
- while queue:
- item = queue.popleft()
- result.append(item)
- children = item.get("children")
- if isinstance(children, list):
- queue.extend(child for child in children if isinstance(child, dict))
- return result
- def _to_id_name_map(items: list[dict[str, Any]]) -> dict[int, str]:
- result: dict[int, str] = {}
- for item in items:
- item_id = _safe_int(item.get("id"))
- if item_id is None:
- continue
- result[item_id] = _text(item.get("name"))
- return result
- def _load_location_name_map(project_key: str) -> dict[int, str]:
- items = _load_all_pages(
- lambda page_num: api_list_locations(
- project_key, keyword="", page_size=100, page_num=page_num
- )
- )
- return _to_id_name_map(_flatten_tree_items(items))
- def _load_system_type_name_map(project_key: str) -> dict[int, str]:
- payload = api_list_system_tree(project_key)
- items = _extract_page_items(payload)
- return _to_id_name_map(
- [item for item in _flatten_tree_items(items) if _safe_int(item.get("type")) == 1]
- )
- def _load_system_name_map(project_key: str) -> dict[int, str]:
- items = _load_all_pages(
- lambda page_num: api_list_systems(
- project_key,
- page_size=100,
- page_num=page_num,
- system_type_id=0,
- show_below=True,
- )
- )
- return _to_id_name_map(items)
- def _load_device_type_name_map(project_key: str) -> dict[int, str]:
- return _to_id_name_map(_extract_page_items(api_list_device_types(project_key)))
- def _load_meter_type_name_map(project_key: str) -> dict[int, str]:
- return _to_id_name_map(_extract_page_items(api_list_meter_types(project_key)))
- def _load_device_name_map(project_key: str) -> dict[int, str]:
- items = _load_all_pages(
- lambda page_num: api_search_devices(
- project_key,
- page_size=100,
- page_num=page_num,
- keyword="",
- location_id=0,
- show_below=True,
- system_ids=[],
- device_type_ids=[],
- )
- )
- return _to_id_name_map(items)
- def _load_meter_name_map(project_key: str) -> dict[int, str]:
- items = _load_all_pages(
- lambda page_num: api_search_meters(
- project_key,
- page_size=100,
- page_num=page_num,
- keyword="",
- location_id=0,
- show_below=True,
- meter_type_id=0,
- measurement_location_ids=[],
- measurement_system_ids=[],
- measurement_device_type_ids=[],
- status=None,
- )
- )
- return _to_id_name_map(items)
- def _load_topology_group_name_map(session: Session, project_key: str) -> dict[int, str]:
- return {row.group_id: row.group_name for row in _load_group_rows(session, project_key)}
- def _load_topology_name_map(session: Session, project_key: str) -> dict[int, str]:
- return {
- row.topology_id: row.topology_name
- for row in _load_registry_rows(session, project_key)
- }
- def _object_type_label(type_code: int | None) -> str:
- return OBJECT_TYPE_LABELS.get(type_code or 0, "")
- def _resolve_field_name_map(
- session: Session, project_key: str, type_code: int
- ) -> dict[int, str]:
- if type_code == PT_OBJ_TYPE_LOCATION:
- return _load_location_name_map(project_key)
- if type_code == PT_OBJ_TYPE_SYSTEMTYPE:
- return _load_system_type_name_map(project_key)
- if type_code == PT_OBJ_TYPE_SYSTEM:
- return _load_system_name_map(project_key)
- if type_code == PT_OBJ_TYPE_DEVICETYPE:
- return _load_device_type_name_map(project_key)
- if type_code == PT_OBJ_TYPE_DEVICE:
- return _load_device_name_map(project_key)
- if type_code == PT_OBJ_TYPE_METERTYPE:
- return _load_meter_type_name_map(project_key)
- if type_code == PT_OBJ_TYPE_METERMODEL:
- raise ValueError("type=17 (仪表型号) needs additional API mapping confirmation")
- if type_code == PT_OBJ_TYPE_METER:
- return _load_meter_name_map(project_key)
- if type_code == PT_OBJ_TYPE_TOPOGROUP:
- return _load_topology_group_name_map(session, project_key)
- if type_code == PT_OBJ_TYPE_TOPODIAGRAM:
- return _load_topology_name_map(session, project_key)
- return {}
- def _resolve_field_items(
- session: Session, project_key: str, type_code: int, field_ids: list[int]
- ) -> list[dict[str, Any]]:
- name_map = _resolve_field_name_map(session, project_key, type_code)
- return [
- {"id": field_id, "name": name_map.get(field_id, "")}
- for field_id in field_ids
- ]
- def _normalize_int_list(raw_value: Any) -> list[int]:
- if not isinstance(raw_value, list):
- return []
- result: list[int] = []
- for item in raw_value:
- normalized = _safe_int(item)
- if normalized is None:
- continue
- result.append(normalized)
- return result
- def get_topology_group_config(project_key: str, topology_id: int) -> dict[str, Any]:
- project_key = _text(project_key)
- if not project_key:
- raise ValueError("project_key is required")
- ensure_topology_cache_tables()
- with Session(sql_engine()) as session:
- _require_topology_cache(session, project_key)
- group_path_map = _group_path_map(_load_group_rows(session, project_key))
- registry_row = _get_registry_or_error(session, project_key, topology_id)
- if registry_row.topology_type != 2:
- return {
- "supported": False,
- "raw_dimension_config": _load_json_text(
- registry_row.dimension_config_json
- ),
- "groupings": [],
- "filter": {
- "filter_type": None,
- "filter_type_label": "",
- "conditions": [],
- },
- "mcp_note": "topology.get_group_config only applies to topology_type=2 topologies.",
- }
- raw_dimension_config = _load_json_text(registry_row.dimension_config_json)
- if not isinstance(raw_dimension_config, dict):
- raw_dimension_config = {}
- raw_dimensions = raw_dimension_config.get("dimensions")
- raw_filter = raw_dimension_config.get("filter")
- dimensions = raw_dimensions if isinstance(raw_dimensions, list) else []
- filter_payload = raw_filter if isinstance(raw_filter, dict) else {}
- groupings: list[dict[str, Any]] = []
- for item in dimensions:
- if not isinstance(item, dict):
- continue
- type_code = _safe_int(item.get("type"))
- level = _safe_int(item.get("level"))
- order = _safe_int(item.get("order"))
- groupings.append(
- {
- "name": _text(item.get("name")),
- "type": type_code,
- "type_label": _object_type_label(type_code),
- "level": level,
- "order": order,
- "order_label": ORDER_LABELS.get(order or 0, ""),
- }
- )
- conditions_payload = filter_payload.get("conditions")
- conditions = conditions_payload if isinstance(conditions_payload, list) else []
- resolved_conditions: list[dict[str, Any]] = []
- for item in conditions:
- if not isinstance(item, dict):
- continue
- type_code = _safe_int(item.get("type")) or 0
- level = _safe_int(item.get("level"))
- match_type = _safe_int(item.get("match_type"))
- field_ids = _normalize_int_list(item.get("fields"))
- resolved_conditions.append(
- {
- "type": type_code,
- "type_label": _object_type_label(type_code),
- "level": level,
- "match_type": match_type,
- "match_type_label": MATCH_TYPE_LABELS.get(match_type or 0, ""),
- "fields": field_ids,
- "field_items": _resolve_field_items(
- session, project_key, type_code, field_ids
- ),
- }
- )
- filter_type = _safe_int(filter_payload.get("filter_type"))
- return {
- "supported": True,
- "raw_dimension_config": raw_dimension_config,
- "groupings": groupings,
- "filter": {
- "filter_type": filter_type,
- "filter_type_label": FILTER_TYPE_LABELS.get(filter_type or 0, ""),
- "conditions": resolved_conditions,
- },
- }
- def _collect_group_and_topology_refs(
- project_key: str,
- items: list[Any],
- *,
- refreshed_at: str,
- parent_group_id: int | None = None,
- parent_group_path: tuple[str, ...] = (),
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
- group_rows: list[dict[str, Any]] = []
- topology_refs: list[dict[str, Any]] = []
- for sort_index, item in enumerate(items, start=1):
- if not isinstance(item, dict):
- continue
- item_id = _safe_int(item.get("id"))
- item_name = _text(item.get("name")) or str(item_id or "")
- item_type = _safe_int(item.get("type"))
- children = (
- item.get("children") if isinstance(item.get("children"), list) else []
- )
- if item_id is None:
- continue
- if item_type == 1:
- group_path = (*parent_group_path, item_name)
- group_rows.append(
- {
- "project_key": project_key,
- "group_id": item_id,
- "group_name": item_name,
- "parent_group_id": parent_group_id,
- "group_path_text": " / ".join(group_path),
- "level": len(group_path),
- "sort_index": sort_index,
- "refreshed_at": refreshed_at,
- "is_active": 1,
- }
- )
- nested_group_rows, nested_topology_refs = _collect_group_and_topology_refs(
- project_key,
- children,
- refreshed_at=refreshed_at,
- parent_group_id=item_id,
- parent_group_path=group_path,
- )
- group_rows.extend(nested_group_rows)
- topology_refs.extend(nested_topology_refs)
- continue
- topology_refs.append(
- {
- "topology_id": item_id,
- "topology_name": item_name,
- "group_id": parent_group_id,
- "sort_index": sort_index,
- }
- )
- return group_rows, topology_refs
- def _as_int_list(raw_value: Any) -> list[int]:
- if not isinstance(raw_value, list):
- return []
- result: list[int] = []
- for item in raw_value:
- value = _safe_int(item)
- if value is None:
- continue
- result.append(value)
- return result
- def _record_deepest_entities(
- entity_best: dict[tuple[str, int], tuple[int, list[str]]],
- entity_node_ids: dict[tuple[str, int], list[str]],
- entity_type: str,
- entity_ids: list[int],
- *,
- depth: int,
- node_id: str,
- ) -> None:
- for entity_id in entity_ids:
- key = (entity_type, entity_id)
- existing = entity_best.get(key)
- if existing is None or depth > existing[0]:
- entity_best[key] = (depth, [node_id])
- entity_node_ids[key] = [node_id]
- continue
- if depth == existing[0] and node_id not in entity_node_ids[key]:
- entity_node_ids[key].append(node_id)
- def _parse_tree_topology(
- project_key: str,
- topology_id: int,
- diagram: list[Any],
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
- node_rows: list[dict[str, Any]] = []
- edge_rows: list[dict[str, Any]] = []
- entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
- entity_node_ids: dict[tuple[str, int], list[str]] = {}
- def visit(
- node: dict[str, Any],
- parent_node_id: str | None,
- path_names: tuple[str, ...],
- sort_index: int,
- ) -> None:
- node_id = _text(node.get("id"))
- if not node_id:
- return
- node_name = _text(node.get("name")) or node_id
- children = (
- node.get("children") if isinstance(node.get("children"), list) else []
- )
- level = _safe_int(node.get("level")) or (len(path_names) + 1)
- path_text = " / ".join((*path_names, node_name))
- effective_parent_node_id = parent_node_id or (
- _text(node.get("parent_id")) or None
- )
- node_rows.append(
- {
- "project_key": project_key,
- "topology_id": topology_id,
- "node_id": node_id,
- "node_name": node_name,
- "parent_node_id": effective_parent_node_id,
- "level": level,
- "node_type_code": _safe_int(node.get("type")),
- "refer_id": _safe_int(node.get("refer_id")),
- "refer_level": _safe_int(node.get("refer_level")),
- "is_virtual": _bool_as_int(node.get("is_virtual")),
- "path_text": path_text,
- "child_count": len(children),
- "sort_index": sort_index,
- }
- )
- if effective_parent_node_id:
- edge_rows.append(
- {
- "project_key": project_key,
- "topology_id": topology_id,
- "source_node_id": effective_parent_node_id,
- "target_node_id": node_id,
- "sort_index": sort_index,
- }
- )
- _record_deepest_entities(
- entity_best,
- entity_node_ids,
- "meter",
- _as_int_list(node.get("meter_list")),
- depth=level,
- node_id=node_id,
- )
- _record_deepest_entities(
- entity_best,
- entity_node_ids,
- "device",
- _as_int_list(node.get("device_list")),
- depth=level,
- node_id=node_id,
- )
- for child_sort_index, child in enumerate(children, start=1):
- if not isinstance(child, dict):
- continue
- visit(child, node_id, (*path_names, node_name), child_sort_index)
- for root_sort_index, root in enumerate(diagram, start=1):
- if not isinstance(root, dict):
- continue
- visit(root, None, (), root_sort_index)
- entity_rows: list[dict[str, Any]] = []
- for (entity_type, entity_id), (depth, _) in entity_best.items():
- for node_id in entity_node_ids[(entity_type, entity_id)]:
- entity_rows.append(
- {
- "project_key": project_key,
- "entity_type": entity_type,
- "entity_id": entity_id,
- "topology_id": topology_id,
- "node_id": node_id,
- "depth": depth,
- }
- )
- return node_rows, edge_rows, entity_rows
- def _build_graph_node_context(
- nodes: list[dict[str, Any]],
- edges: list[dict[str, Any]],
- ) -> tuple[dict[str, int], dict[str, str | None], dict[str, str], dict[str, int]]:
- incoming: dict[str, list[str]] = defaultdict(list)
- outgoing: dict[str, list[str]] = defaultdict(list)
- for edge in edges:
- source_node_id = _text(edge.get("source"))
- target_node_id = _text(edge.get("target"))
- if not source_node_id or not target_node_id:
- continue
- outgoing[source_node_id].append(target_node_id)
- incoming[target_node_id].append(source_node_id)
- node_ids = [_text(node.get("id")) for node in nodes if _text(node.get("id"))]
- roots = [node_id for node_id in node_ids if not incoming.get(node_id)]
- if not roots:
- roots = node_ids[:1]
- level_map: dict[str, int] = {}
- path_map: dict[str, str] = {}
- parent_map: dict[str, str | None] = {node_id: None for node_id in node_ids}
- child_count_map: dict[str, int] = {
- node_id: len(outgoing.get(node_id, [])) for node_id in node_ids
- }
- node_name_map = {
- _text(node.get("id")): _text(node.get("name")) or _text(node.get("id"))
- for node in nodes
- }
- queue: deque[tuple[str, int, str]] = deque()
- for root_node_id in roots:
- root_name = node_name_map.get(root_node_id) or root_node_id
- queue.append((root_node_id, 1, root_name))
- while queue:
- node_id, depth, path_text = queue.popleft()
- previous = level_map.get(node_id)
- if previous is not None and previous >= depth:
- continue
- level_map[node_id] = depth
- path_map[node_id] = path_text
- for child_node_id in outgoing.get(node_id, []):
- if parent_map.get(child_node_id) is None:
- parent_map[child_node_id] = node_id
- child_name = node_name_map.get(child_node_id) or child_node_id
- queue.append((child_node_id, depth + 1, f"{path_text} / {child_name}"))
- for node in nodes:
- node_id = _text(node.get("id"))
- if not node_id:
- continue
- if node_id in level_map:
- continue
- fallback_level = _safe_int(node.get("level")) or 1
- node_name = _text(node.get("name")) or node_id
- level_map[node_id] = fallback_level
- path_map[node_id] = node_name
- return level_map, parent_map, path_map, child_count_map
- def _parse_graph_topology(
- project_key: str,
- topology_id: int,
- diagram: dict[str, Any],
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
- raw_nodes = [item for item in diagram.get("nodes", []) if isinstance(item, dict)]
- raw_edges = [item for item in diagram.get("edges", []) if isinstance(item, dict)]
- level_map, parent_map, path_map, child_count_map = _build_graph_node_context(
- raw_nodes, raw_edges
- )
- node_rows: list[dict[str, Any]] = []
- edge_rows: list[dict[str, Any]] = []
- entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
- entity_node_ids: dict[tuple[str, int], list[str]] = {}
- for sort_index, node in enumerate(raw_nodes, start=1):
- node_id = _text(node.get("id"))
- if not node_id:
- continue
- level = level_map.get(node_id) or _safe_int(node.get("level")) or 1
- node_rows.append(
- {
- "project_key": project_key,
- "topology_id": topology_id,
- "node_id": node_id,
- "node_name": _text(node.get("name")) or node_id,
- "parent_node_id": parent_map.get(node_id),
- "level": level,
- "node_type_code": _safe_int(node.get("type")),
- "refer_id": _safe_int(node.get("refer_id")),
- "refer_level": _safe_int(node.get("refer_level")),
- "is_virtual": _bool_as_int(node.get("is_virtual")),
- "path_text": path_map.get(node_id, _text(node.get("name")) or node_id),
- "child_count": child_count_map.get(node_id, 0),
- "sort_index": sort_index,
- }
- )
- _record_deepest_entities(
- entity_best,
- entity_node_ids,
- "meter",
- _as_int_list(node.get("meter_list")),
- depth=level,
- node_id=node_id,
- )
- _record_deepest_entities(
- entity_best,
- entity_node_ids,
- "device",
- _as_int_list(node.get("device_list")),
- depth=level,
- node_id=node_id,
- )
- for sort_index, edge in enumerate(raw_edges, start=1):
- source_node_id = _text(edge.get("source"))
- target_node_id = _text(edge.get("target"))
- if not source_node_id or not target_node_id:
- continue
- edge_rows.append(
- {
- "project_key": project_key,
- "topology_id": topology_id,
- "source_node_id": source_node_id,
- "target_node_id": target_node_id,
- "sort_index": sort_index,
- }
- )
- entity_rows: list[dict[str, Any]] = []
- for (entity_type, entity_id), (depth, _) in entity_best.items():
- for node_id in entity_node_ids[(entity_type, entity_id)]:
- entity_rows.append(
- {
- "project_key": project_key,
- "entity_type": entity_type,
- "entity_id": entity_id,
- "topology_id": topology_id,
- "node_id": node_id,
- "depth": depth,
- }
- )
- return node_rows, edge_rows, entity_rows
- def refresh_topology_cache(
- project_key: str, topology_ids: list[int] | None = None
- ) -> dict[str, Any]:
- project_key = _text(project_key)
- if not project_key:
- raise ValueError("project_key is required")
- ensure_topology_cache_tables()
- refreshed_at = _utc_now_iso()
- list_payload = api_list_topologies_with_group(project_key, group_ids=[])
- raw_items = list_payload.get("data")
- if not isinstance(raw_items, list):
- raise ValueError("topology list returned invalid data")
- group_rows, topology_refs = _collect_group_and_topology_refs(
- project_key, raw_items, refreshed_at=refreshed_at
- )
- requested_topology_ids = {item for item in topology_ids or []}
- available_topology_map = {item["topology_id"]: item for item in topology_refs}
- if requested_topology_ids:
- missing = sorted(requested_topology_ids - set(available_topology_map))
- if missing:
- raise ValueError(
- f"topology_id not found in upstream topology list: {missing}"
- )
- selected_topology_refs = [
- available_topology_map[item] for item in topology_ids or []
- ]
- else:
- selected_topology_refs = topology_refs
- registry_rows: list[dict[str, Any]] = []
- node_rows: list[dict[str, Any]] = []
- edge_rows: list[dict[str, Any]] = []
- entity_rows: list[dict[str, Any]] = []
- for topology_ref in selected_topology_refs:
- topology_id = topology_ref["topology_id"]
- detail_payload = api_get_topology(project_key, topology_id)
- detail_data = detail_payload.get("data")
- if not isinstance(detail_data, dict):
- raise ValueError(
- f"topology get returned invalid data for topology_id={topology_id}"
- )
- diagram = detail_data.get("diagram")
- if isinstance(diagram, list):
- root_shape = "tree"
- topology_node_rows, topology_edge_rows, topology_entity_rows = (
- _parse_tree_topology(project_key, topology_id, diagram)
- )
- elif isinstance(diagram, dict):
- root_shape = "graph"
- topology_node_rows, topology_edge_rows, topology_entity_rows = (
- _parse_graph_topology(project_key, topology_id, diagram)
- )
- else:
- root_shape = "tree"
- topology_node_rows, topology_edge_rows, topology_entity_rows = ([], [], [])
- registry_rows.append(
- {
- "project_key": project_key,
- "topology_id": topology_id,
- "topology_name": _text(detail_data.get("name"))
- or topology_ref["topology_name"],
- "topology_type": _safe_int(detail_data.get("type")) or 0,
- "object_type_code": _safe_int(detail_data.get("object")),
- "group_id": _safe_int(detail_data.get("group_id"))
- or topology_ref.get("group_id"),
- "root_shape": root_shape,
- "source_updated_time": _text(detail_data.get("updated_time")),
- "data_options_json": _dump_json_text(detail_data.get("data_options")),
- "dimension_config_json": _dump_json_text(
- detail_data.get("dimension_config")
- ),
- "refreshed_at": refreshed_at,
- "is_active": 1,
- }
- )
- node_rows.extend(topology_node_rows)
- edge_rows.extend(topology_edge_rows)
- entity_rows.extend(topology_entity_rows)
- with Session(sql_engine()) as session:
- if requested_topology_ids:
- session.execute(
- delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
- )
- session.add_all(TopologyGroup(**row) for row in group_rows)
- session.execute(
- delete(TopologyEntityIndex).where(
- TopologyEntityIndex.project_key == project_key,
- TopologyEntityIndex.topology_id.in_(requested_topology_ids),
- )
- )
- session.execute(
- delete(TopologyEdge).where(
- TopologyEdge.project_key == project_key,
- TopologyEdge.topology_id.in_(requested_topology_ids),
- )
- )
- session.execute(
- delete(TopologyNode).where(
- TopologyNode.project_key == project_key,
- TopologyNode.topology_id.in_(requested_topology_ids),
- )
- )
- session.execute(
- delete(TopologyRegistry).where(
- TopologyRegistry.project_key == project_key,
- TopologyRegistry.topology_id.in_(requested_topology_ids),
- )
- )
- else:
- session.execute(
- delete(TopologyEntityIndex).where(
- TopologyEntityIndex.project_key == project_key
- )
- )
- session.execute(
- delete(TopologyEdge).where(TopologyEdge.project_key == project_key)
- )
- session.execute(
- delete(TopologyNode).where(TopologyNode.project_key == project_key)
- )
- session.execute(
- delete(TopologyRegistry).where(
- TopologyRegistry.project_key == project_key
- )
- )
- session.execute(
- delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
- )
- session.add_all(TopologyGroup(**row) for row in group_rows)
- session.add_all(TopologyRegistry(**row) for row in registry_rows)
- session.add_all(TopologyNode(**row) for row in node_rows)
- session.add_all(TopologyEdge(**row) for row in edge_rows)
- session.add_all(TopologyEntityIndex(**row) for row in entity_rows)
- session.commit()
- result = {
- "project_key": project_key,
- "refreshed_group_count": len(group_rows),
- "refreshed_topology_count": len(registry_rows),
- "refreshed_node_count": len(node_rows),
- "refreshed_edge_count": len(edge_rows),
- "refreshed_entity_index_count": len(entity_rows),
- "topology_ids": [row["topology_id"] for row in registry_rows],
- "refreshed_at": refreshed_at,
- }
- if not registry_rows:
- result["mcp_note"] = (
- f"Project '{project_key}' currently has no topology data in upstream config."
- )
- return result
- def _load_group_rows(session: Session, project_key: str) -> list[TopologyGroup]:
- return list(
- session.scalars(
- select(TopologyGroup)
- .where(TopologyGroup.project_key == project_key)
- .order_by(
- TopologyGroup.level.asc(),
- TopologyGroup.sort_index.asc(),
- TopologyGroup.group_id.asc(),
- )
- )
- )
- def _load_registry_rows(session: Session, project_key: str) -> list[TopologyRegistry]:
- return list(
- session.scalars(
- select(TopologyRegistry)
- .where(TopologyRegistry.project_key == project_key)
- .order_by(
- TopologyRegistry.topology_name.asc(), TopologyRegistry.topology_id.asc()
- )
- )
- )
- def _has_topology_cache(session: Session, project_key: str) -> bool:
- registry_rows = _load_registry_rows(session, project_key)
- return bool(registry_rows)
- def _require_topology_cache(session: Session, project_key: str) -> None:
- if _has_topology_cache(session, project_key):
- return
- raise ValueError(
- f"Project '{project_key}' has no cached topologies. Refresh it via "
- f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
- "already succeeded, the upstream project likely has no topology data."
- )
- def list_topology_groups(project_key: str) -> dict[str, Any]:
- project_key = _text(project_key)
- if not project_key:
- raise ValueError("project_key is required")
- ensure_topology_cache_tables()
- with Session(sql_engine()) as session:
- if not _has_topology_cache(session, project_key):
- return {
- "project_key": project_key,
- "groups": [],
- "total": 0,
- "mcp_note": (
- f"Project '{project_key}' has no cached topologies. Refresh it via "
- f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
- "already succeeded, the upstream project likely has no topology data."
- ),
- }
- group_rows = _load_group_rows(session, project_key)
- children_by_parent: dict[int | None, list[dict[str, Any]]] = defaultdict(list)
- node_by_group_id: dict[int, dict[str, Any]] = {}
- for group_row in group_rows:
- payload = {
- "group_id": group_row.group_id,
- "group_name": group_row.group_name,
- "parent_group_id": group_row.parent_group_id,
- "level": group_row.level,
- "group_path_text": group_row.group_path_text,
- "children": [],
- }
- node_by_group_id[group_row.group_id] = payload
- children_by_parent[group_row.parent_group_id].append(payload)
- for group_payload in node_by_group_id.values():
- group_payload["children"] = children_by_parent.get(
- group_payload["group_id"], []
- )
- return {
- "project_key": project_key,
- "groups": children_by_parent.get(None, []),
- "total": len(group_rows),
- }
- def _group_path_map(group_rows: list[TopologyGroup]) -> dict[int, str]:
- return {group_row.group_id: group_row.group_path_text for group_row in group_rows}
- def _collect_descendant_group_ids(
- group_rows: list[TopologyGroup], group_id: int
- ) -> set[int]:
- group_children: dict[int | None, list[int]] = defaultdict(list)
- for row in group_rows:
- group_children[row.parent_group_id].append(row.group_id)
- result: set[int] = set()
- queue: deque[int] = deque([group_id])
- while queue:
- current_group_id = queue.popleft()
- if current_group_id in result:
- continue
- result.add(current_group_id)
- queue.extend(group_children.get(current_group_id, []))
- return result
- def list_topologies(
- project_key: str,
- *,
- group_id: int | None = None,
- object_type_code: int | None = None,
- ) -> dict[str, Any]:
- project_key = _text(project_key)
- if not project_key:
- raise ValueError("project_key is required")
- ensure_topology_cache_tables()
- with Session(sql_engine()) as session:
- if not _has_topology_cache(session, project_key):
- return {
- "project_key": project_key,
- "topologies": [],
- "total": 0,
- "mcp_note": (
- f"Project '{project_key}' has no cached topologies. Refresh it via "
- f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
- "already succeeded, the upstream project likely has no topology data."
- ),
- }
- group_rows = _load_group_rows(session, project_key)
- registry_rows = _load_registry_rows(session, project_key)
- group_path_map = _group_path_map(group_rows)
- allowed_group_ids: set[int] | None = None
- if group_id is not None:
- allowed_group_ids = _collect_descendant_group_ids(group_rows, group_id)
- topologies: list[dict[str, Any]] = []
- for registry_row in registry_rows:
- if (
- allowed_group_ids is not None
- and registry_row.group_id not in allowed_group_ids
- ):
- continue
- if (
- object_type_code is not None
- and registry_row.object_type_code != object_type_code
- ):
- continue
- topologies.append(
- {
- "topology_id": registry_row.topology_id,
- "topology_name": registry_row.topology_name,
- "topology_type": registry_row.topology_type,
- "object_type_code": registry_row.object_type_code,
- "group_id": registry_row.group_id,
- "group_path_text": group_path_map.get(registry_row.group_id or -1, ""),
- "root_shape": registry_row.root_shape,
- "refreshed_at": registry_row.refreshed_at,
- }
- )
- return {
- "project_key": project_key,
- "topologies": topologies,
- "total": len(topologies),
- }
- def _get_registry_or_error(
- session: Session, project_key: str, topology_id: int
- ) -> TopologyRegistry:
- registry_row = session.scalar(
- select(TopologyRegistry).where(
- TopologyRegistry.project_key == project_key,
- TopologyRegistry.topology_id == topology_id,
- )
- )
- if registry_row is None:
- raise ValueError(f"topology_id not found in cache: {topology_id}")
- return registry_row
- def _get_node_or_error(
- session: Session, project_key: str, topology_id: int, node_id: str
- ) -> TopologyNode:
- node_row = session.scalar(
- select(TopologyNode).where(
- TopologyNode.project_key == project_key,
- TopologyNode.topology_id == topology_id,
- TopologyNode.node_id == node_id,
- )
- )
- if node_row is None:
- raise ValueError(f"node_id not found in cache: {node_id}")
- return node_row
- def _resolve_root_node_id(
- node_map: dict[str, TopologyNode], parents_by_node: dict[str, list[str]]
- ) -> str:
- root_ids = [node_id for node_id in node_map if not parents_by_node.get(node_id)]
- if not root_ids:
- raise ValueError("root node not found in cache")
- root_ids.sort(
- key=lambda item: (
- node_map[item].level if node_map[item].level is not None else 10**9,
- node_map[item].path_text or node_map[item].node_name,
- item,
- )
- )
- return root_ids[0]
- def _normalize_requested_node_id(
- raw_node_id: str,
- node_map: dict[str, TopologyNode],
- parents_by_node: dict[str, list[str]],
- ) -> str:
- normalized = _text(raw_node_id)
- if normalized.lower() != "root":
- return normalized
- return _resolve_root_node_id(node_map, parents_by_node)
- def _node_payload(node_row: TopologyNode) -> dict[str, Any]:
- return {
- "node_id": node_row.node_id,
- "node_name": node_row.node_name,
- "level": node_row.level,
- "parent_node_id": node_row.parent_node_id,
- "refer_id": node_row.refer_id,
- "refer_level": node_row.refer_level,
- "is_virtual": bool(node_row.is_virtual),
- "path_text": node_row.path_text,
- "child_count": node_row.child_count,
- }
- def _floor_hour_ts(ts: int) -> int:
- return max(0, int(ts) - (int(ts) % 3600))
- def _floor_day_ts(ts: int) -> int:
- current = datetime.fromtimestamp(int(ts)).astimezone()
- return int(
- current.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
- )
- def _hourly_window_timestamps(base_ts: int) -> list[int]:
- current_hour_ts = _floor_hour_ts(base_ts)
- return [current_hour_ts - 3600 * offset for offset in range(1, 13)]
- def _daily_window_timestamps(base_ts: int) -> list[int]:
- current_day_ts = _floor_day_ts(base_ts)
- return [current_day_ts - 86400 * offset for offset in range(0, 7)]
- def _extract_topology_data_map(payload: Any) -> dict[str, Any]:
- if not isinstance(payload, dict):
- return {}
- data = payload.get("data")
- if not isinstance(data, dict):
- return {}
- return {str(node_id): node_values for node_id, node_values in data.items()}
- def _fetch_topology_runtime_data(
- project_key: str, topology_id: int, *, base_ts: int | None = None
- ) -> dict[str, Any]:
- effective_base_ts = int(base_ts if base_ts is not None else _current_unix_ts())
- hourly_timestamps = _hourly_window_timestamps(effective_base_ts)
- daily_timestamps = _daily_window_timestamps(effective_base_ts)
- def fetch_instant() -> dict[str, Any]:
- return _extract_topology_data_map(
- api_get_topology_data(project_key, topology_id, display="instant")
- )
- def fetch_accu(accu_step: int, ts: int) -> tuple[int, dict[str, Any]]:
- payload = api_get_topology_data(
- project_key,
- topology_id,
- display="accu",
- accu_step=accu_step,
- ts=ts,
- )
- return ts, _extract_topology_data_map(payload)
- max_workers = max(1, min(8, 1 + len(hourly_timestamps) + len(daily_timestamps)))
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- instant_future = executor.submit(fetch_instant)
- hourly_futures = [
- executor.submit(fetch_accu, 2, ts_value) for ts_value in hourly_timestamps
- ]
- daily_futures = [
- executor.submit(fetch_accu, 3, ts_value) for ts_value in daily_timestamps
- ]
- instant_map = instant_future.result()
- hourly_series = [future.result() for future in hourly_futures]
- daily_series = [future.result() for future in daily_futures]
- return {
- "base_ts": effective_base_ts,
- "instant": instant_map,
- "hourly": [
- {"ts": ts_value, "data_map": data_map} for ts_value, data_map in hourly_series
- ],
- "daily": [
- {"ts": ts_value, "data_map": data_map} for ts_value, data_map in daily_series
- ],
- "data_window": {
- "hourly_ts": hourly_timestamps,
- "daily_ts": daily_timestamps,
- },
- }
- def _attach_runtime_data_to_node(
- node_payload: dict[str, Any], runtime_bundle: dict[str, Any]
- ) -> dict[str, Any]:
- node_id = str(node_payload.get("node_id") or "").strip()
- instant_map = runtime_bundle.get("instant") or {}
- hourly_series = runtime_bundle.get("hourly") or []
- daily_series = runtime_bundle.get("daily") or []
- node_payload["data"] = {
- "instant": instant_map.get(node_id),
- "accu": {
- "hourly": [
- {
- "ts": item["ts"],
- "values": item["data_map"].get(node_id),
- }
- for item in hourly_series
- ],
- "daily": [
- {
- "ts": item["ts"],
- "values": item["data_map"].get(node_id),
- }
- for item in daily_series
- ],
- },
- }
- return node_payload
- def _attach_runtime_data_to_nodes(
- nodes: list[dict[str, Any]], runtime_bundle: dict[str, Any]
- ) -> list[dict[str, Any]]:
- return [_attach_runtime_data_to_node(node_payload, runtime_bundle) for node_payload in nodes]
- def _load_node_map(
- session: Session, project_key: str, topology_id: int
- ) -> dict[str, TopologyNode]:
- rows = session.scalars(
- select(TopologyNode).where(
- TopologyNode.project_key == project_key,
- TopologyNode.topology_id == topology_id,
- )
- )
- return {row.node_id: row for row in rows}
- def _load_adjacency(
- session: Session, project_key: str, topology_id: int
- ) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
- edges = session.scalars(
- select(TopologyEdge)
- .where(
- TopologyEdge.project_key == project_key,
- TopologyEdge.topology_id == topology_id,
- )
- .order_by(TopologyEdge.sort_index.asc(), TopologyEdge.id.asc())
- )
- parents_by_node: dict[str, list[str]] = defaultdict(list)
- children_by_node: dict[str, list[str]] = defaultdict(list)
- for edge in edges:
- parents_by_node[edge.target_node_id].append(edge.source_node_id)
- children_by_node[edge.source_node_id].append(edge.target_node_id)
- return parents_by_node, children_by_node
- def _dedupe_preserve_order(node_ids: list[str]) -> list[str]:
- result: list[str] = []
- seen: set[str] = set()
- for node_id in node_ids:
- if node_id in seen:
- continue
- seen.add(node_id)
- result.append(node_id)
- return result
- def _node_list_payload(
- node_map: dict[str, TopologyNode], node_ids: list[str]
- ) -> list[dict[str, Any]]:
- payload: list[dict[str, Any]] = []
- for node_id in _dedupe_preserve_order(node_ids):
- node_row = node_map.get(node_id)
- if node_row is None:
- continue
- payload.append(_node_payload(node_row))
- return payload
- def _collect_ancestors(
- node_map: dict[str, TopologyNode],
- parents_by_node: dict[str, list[str]],
- node_id: str,
- depth_limit: int,
- ) -> list[dict[str, Any]]:
- if depth_limit <= 0:
- return []
- visited: dict[str, int] = {}
- queue: deque[tuple[str, int]] = deque(
- (parent_id, 1) for parent_id in parents_by_node.get(node_id, [])
- )
- while queue:
- current_node_id, distance = queue.popleft()
- if distance > depth_limit:
- continue
- previous_distance = visited.get(current_node_id)
- if previous_distance is not None and previous_distance <= distance:
- continue
- visited[current_node_id] = distance
- for parent_id in parents_by_node.get(current_node_id, []):
- queue.append((parent_id, distance + 1))
- ordered_ids = sorted(
- visited,
- key=lambda item: (
- -visited[item],
- node_map[item].path_text or node_map[item].node_name,
- item,
- ),
- )
- result: list[dict[str, Any]] = []
- for current_node_id in ordered_ids:
- node_payload = _node_payload(node_map[current_node_id])
- node_payload["distance"] = visited[current_node_id]
- result.append(node_payload)
- return result
- def _collect_descendants(
- node_map: dict[str, TopologyNode],
- children_by_node: dict[str, list[str]],
- node_id: str,
- depth_limit: int,
- ) -> list[dict[str, Any]]:
- if depth_limit <= 0:
- return []
- visited: dict[str, int] = {}
- queue: deque[tuple[str, int]] = deque(
- (child_id, 1) for child_id in children_by_node.get(node_id, [])
- )
- while queue:
- current_node_id, distance = queue.popleft()
- if distance > depth_limit:
- continue
- previous_distance = visited.get(current_node_id)
- if previous_distance is not None and previous_distance <= distance:
- continue
- visited[current_node_id] = distance
- for child_id in children_by_node.get(current_node_id, []):
- queue.append((child_id, distance + 1))
- ordered_ids = sorted(
- visited,
- key=lambda item: (
- visited[item],
- node_map[item].path_text or node_map[item].node_name,
- item,
- ),
- )
- result: list[dict[str, Any]] = []
- for current_node_id in ordered_ids:
- node_payload = _node_payload(node_map[current_node_id])
- node_payload["distance"] = visited[current_node_id]
- result.append(node_payload)
- return result
- def _topology_metadata_payload(
- registry_row: TopologyRegistry, group_path_text: str
- ) -> dict[str, Any]:
- data_options = _load_json_text(registry_row.data_options_json)
- return {
- "topology_id": registry_row.topology_id,
- "topology_name": registry_row.topology_name,
- "topology_type": registry_row.topology_type,
- "object_type_code": registry_row.object_type_code,
- "group_id": registry_row.group_id,
- "group_path_text": group_path_text,
- "root_shape": registry_row.root_shape,
- "data_options": data_options,
- "metric_definitions": _build_metric_definitions(data_options),
- "dimension_config": _load_json_text(registry_row.dimension_config_json),
- }
- def get_topology_node(
- project_key: str,
- topology_id: int,
- node_id: str = "root",
- *,
- include_siblings: bool = True,
- include_children: bool = True,
- ) -> dict[str, Any]:
- project_key = _text(project_key)
- node_id = _text(node_id)
- if not project_key:
- raise ValueError("project_key is required")
- if not node_id:
- raise ValueError("node_id is required")
- ensure_topology_cache_tables()
- with Session(sql_engine()) as session:
- _require_topology_cache(session, project_key)
- group_rows = _load_group_rows(session, project_key)
- group_path_map = _group_path_map(group_rows)
- registry_row = _get_registry_or_error(session, project_key, topology_id)
- node_map = _load_node_map(session, project_key, topology_id)
- parents_by_node, children_by_node = _load_adjacency(
- session, project_key, topology_id
- )
- resolved_node_id = _normalize_requested_node_id(
- node_id, node_map, parents_by_node
- )
- node_row = _get_node_or_error(
- session, project_key, topology_id, resolved_node_id
- )
- runtime_bundle = _fetch_topology_runtime_data(project_key, topology_id)
- parent_ids = parents_by_node.get(resolved_node_id, [])
- child_ids = children_by_node.get(resolved_node_id, []) if include_children else []
- sibling_ids: list[str] = []
- if include_siblings and len(parent_ids) == 1:
- sibling_ids = [
- candidate
- for candidate in children_by_node.get(parent_ids[0], [])
- if candidate != resolved_node_id
- ]
- return {
- "data_window": runtime_bundle["data_window"],
- "topology": _topology_metadata_payload(
- registry_row, group_path_map.get(registry_row.group_id or -1, "")
- ),
- "node": _attach_runtime_data_to_node(_node_payload(node_row), runtime_bundle),
- "parents": _attach_runtime_data_to_nodes(
- _node_list_payload(node_map, parent_ids), runtime_bundle
- ),
- "children": _attach_runtime_data_to_nodes(
- _node_list_payload(node_map, child_ids), runtime_bundle
- ),
- "siblings": _attach_runtime_data_to_nodes(
- _node_list_payload(node_map, sibling_ids), runtime_bundle
- ),
- }
- def find_topology_context(
- project_key: str,
- entity_type: str,
- entity_id: int,
- *,
- topology_id: int | None = None,
- include_siblings: bool = True,
- ancestor_depth: int = 5,
- descendant_depth: int = 2,
- ) -> dict[str, Any]:
- project_key = _text(project_key)
- normalized_entity_type = _normalize_entity_type(entity_type)
- if not project_key:
- raise ValueError("project_key is required")
- if entity_id <= 0:
- raise ValueError("entity_id must be a positive integer")
- ensure_topology_cache_tables()
- with Session(sql_engine()) as session:
- _require_topology_cache(session, project_key)
- group_rows = _load_group_rows(session, project_key)
- group_path_map = _group_path_map(group_rows)
- query = select(TopologyEntityIndex).where(
- TopologyEntityIndex.project_key == project_key,
- TopologyEntityIndex.entity_type == normalized_entity_type,
- TopologyEntityIndex.entity_id == entity_id,
- )
- if topology_id is not None:
- query = query.where(TopologyEntityIndex.topology_id == topology_id)
- index_rows = list(
- session.scalars(
- query.order_by(
- TopologyEntityIndex.depth.desc(),
- TopologyEntityIndex.topology_id.asc(),
- )
- )
- )
- matches: list[dict[str, Any]] = []
- topology_node_maps: dict[int, dict[str, TopologyNode]] = {}
- topology_adjacency: dict[
- int, tuple[dict[str, list[str]], dict[str, list[str]]]
- ] = {}
- topology_registry_rows: dict[int, TopologyRegistry] = {}
- for index_row in index_rows:
- current_topology_id = index_row.topology_id
- if current_topology_id not in topology_registry_rows:
- topology_registry_rows[current_topology_id] = _get_registry_or_error(
- session, project_key, current_topology_id
- )
- topology_node_maps[current_topology_id] = _load_node_map(
- session, project_key, current_topology_id
- )
- topology_adjacency[current_topology_id] = _load_adjacency(
- session, project_key, current_topology_id
- )
- node_map = topology_node_maps[current_topology_id]
- node_row = node_map.get(index_row.node_id)
- if node_row is None:
- continue
- parents_by_node, children_by_node = topology_adjacency[current_topology_id]
- parent_ids = parents_by_node.get(index_row.node_id, [])
- child_ids = children_by_node.get(index_row.node_id, [])
- sibling_ids: list[str] = []
- if include_siblings and len(parent_ids) == 1:
- sibling_ids = [
- candidate
- for candidate in children_by_node.get(parent_ids[0], [])
- if candidate != index_row.node_id
- ]
- matches.append(
- {
- "topology": _topology_metadata_payload(
- topology_registry_rows[current_topology_id],
- group_path_map.get(
- topology_registry_rows[current_topology_id].group_id or -1,
- "",
- ),
- ),
- "self": _node_payload(node_row),
- "parents": _node_list_payload(node_map, parent_ids),
- "children": _node_list_payload(node_map, child_ids),
- "ancestors": _collect_ancestors(
- node_map,
- parents_by_node,
- index_row.node_id,
- max(ancestor_depth, 0),
- ),
- "descendants": _collect_descendants(
- node_map,
- children_by_node,
- index_row.node_id,
- max(descendant_depth, 0),
- ),
- "siblings": _node_list_payload(node_map, sibling_ids),
- }
- )
- return {
- "query": {
- "project_key": project_key,
- "entity_type": normalized_entity_type,
- "entity_id": entity_id,
- "topology_id": topology_id,
- },
- "matches": matches,
- "total_matches": len(matches),
- }
|