from __future__ import annotations from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timezone import json from typing import Any from sqlalchemy import ( Index, Integer, String, Text, UniqueConstraint, delete, inspect, select, text, ) from sqlalchemy.orm import Mapped, Session, mapped_column from .config_api import list_topologies_with_group as api_list_topologies_with_group from .config_api import list_locations as api_list_locations from .config_api import list_system_tree as api_list_system_tree from .config_api import list_systems as api_list_systems from .config_api import list_device_types as api_list_device_types from .config_api import list_meter_types as api_list_meter_types from .config_api import search_devices as api_search_devices from .config_api import search_meters as api_search_meters from .config_api import get_topology_data as api_get_topology_data from .config_api import get_topology as api_get_topology from .db import Base, sql_engine PT_OBJ_TYPE_LOCATION = 11 PT_OBJ_TYPE_SYSTEMTYPE = 12 PT_OBJ_TYPE_SYSTEM = 13 PT_OBJ_TYPE_DEVICETYPE = 14 PT_OBJ_TYPE_DEVICE = 15 PT_OBJ_TYPE_METERTYPE = 16 PT_OBJ_TYPE_METERMODEL = 17 PT_OBJ_TYPE_METER = 18 PT_OBJ_TYPE_TOPOGROUP = 19 PT_OBJ_TYPE_TOPODIAGRAM = 20 OBJECT_TYPE_LABELS = { PT_OBJ_TYPE_LOCATION: "位置", PT_OBJ_TYPE_SYSTEMTYPE: "系统类型", PT_OBJ_TYPE_SYSTEM: "系统", PT_OBJ_TYPE_DEVICETYPE: "设备类型", PT_OBJ_TYPE_DEVICE: "设备", PT_OBJ_TYPE_METERTYPE: "仪表类型", PT_OBJ_TYPE_METERMODEL: "仪表型号", PT_OBJ_TYPE_METER: "仪表", PT_OBJ_TYPE_TOPOGROUP: "拓扑图分组", PT_OBJ_TYPE_TOPODIAGRAM: "拓扑图", } ORDER_LABELS = {1: "顺序", 2: "逆序"} FILTER_TYPE_LABELS = {1: "所有", 2: "任一"} MATCH_TYPE_LABELS = {1: "等于", 2: "不等于", 3: "包含", 4: "不包含"} def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _current_unix_ts() -> int: return int(datetime.now().timestamp()) def _safe_int(raw_value: Any) -> int | None: if raw_value is None: return None try: return int(str(raw_value).strip()) except Exception: return None def _text(raw_value: Any) -> str: return str(raw_value or "").strip() def _bool_as_int(raw_value: Any) -> int: return 1 if bool(raw_value) else 0 def _normalize_entity_type(entity_type: str) -> str: normalized = _text(entity_type).lower() if normalized not in {"meter", "device"}: raise ValueError("entity_type must be 'meter' or 'device'") return normalized class TopologyGroup(Base): __tablename__ = "topology_group" __table_args__ = ( UniqueConstraint( "project_key", "group_id", name="uq_topology_group_project_group" ), Index("ix_topology_group_project_parent", "project_key", "parent_group_id"), ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) project_key: Mapped[str] = mapped_column(String(128), nullable=False) group_id: Mapped[int] = mapped_column(Integer, nullable=False) group_name: Mapped[str] = mapped_column(String(255), nullable=False) parent_group_id: Mapped[int | None] = mapped_column(Integer, nullable=True) group_path_text: Mapped[str] = mapped_column(Text, nullable=False) level: Mapped[int] = mapped_column(Integer, nullable=False) sort_index: Mapped[int] = mapped_column(Integer, nullable=False) refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False) is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1) class TopologyRegistry(Base): __tablename__ = "topology_registry" __table_args__ = ( UniqueConstraint( "project_key", "topology_id", name="uq_topology_registry_project_topology" ), Index("ix_topology_registry_project_group", "project_key", "group_id"), ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) project_key: Mapped[str] = mapped_column(String(128), nullable=False) topology_id: Mapped[int] = mapped_column(Integer, nullable=False) topology_name: Mapped[str] = mapped_column(String(255), nullable=False) topology_type: Mapped[int] = mapped_column(Integer, nullable=False) object_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True) group_id: Mapped[int | None] = mapped_column(Integer, nullable=True) root_shape: Mapped[str] = mapped_column(String(32), nullable=False) source_updated_time: Mapped[str] = mapped_column( String(64), nullable=False, default="" ) data_options_json: Mapped[str] = mapped_column(Text, nullable=False, default="") dimension_config_json: Mapped[str] = mapped_column( Text, nullable=False, default="" ) refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False) is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1) class TopologyNode(Base): __tablename__ = "topology_node" __table_args__ = ( UniqueConstraint( "project_key", "topology_id", "node_id", name="uq_topology_node_project_topology_node", ), Index( "ix_topology_node_project_topology_parent", "project_key", "topology_id", "parent_node_id", ), Index( "ix_topology_node_project_topology_refer", "project_key", "topology_id", "refer_id", ), ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) project_key: Mapped[str] = mapped_column(String(128), nullable=False) topology_id: Mapped[int] = mapped_column(Integer, nullable=False) node_id: Mapped[str] = mapped_column(Text, nullable=False) node_name: Mapped[str] = mapped_column(Text, nullable=False) parent_node_id: Mapped[str | None] = mapped_column(Text, nullable=True) level: Mapped[int | None] = mapped_column(Integer, nullable=True) node_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True) refer_id: Mapped[int | None] = mapped_column(Integer, nullable=True) refer_level: Mapped[int | None] = mapped_column(Integer, nullable=True) is_virtual: Mapped[int] = mapped_column(Integer, nullable=False, default=0) path_text: Mapped[str] = mapped_column(Text, nullable=False, default="") child_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0) sort_index: Mapped[int | None] = mapped_column(Integer, nullable=True) class TopologyEdge(Base): __tablename__ = "topology_edge" __table_args__ = ( UniqueConstraint( "project_key", "topology_id", "source_node_id", "target_node_id", name="uq_topology_edge_project_topology_nodes", ), Index( "ix_topology_edge_project_topology_source", "project_key", "topology_id", "source_node_id", ), Index( "ix_topology_edge_project_topology_target", "project_key", "topology_id", "target_node_id", ), ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) project_key: Mapped[str] = mapped_column(String(128), nullable=False) topology_id: Mapped[int] = mapped_column(Integer, nullable=False) source_node_id: Mapped[str] = mapped_column(Text, nullable=False) target_node_id: Mapped[str] = mapped_column(Text, nullable=False) sort_index: Mapped[int] = mapped_column(Integer, nullable=False, default=0) class TopologyEntityIndex(Base): __tablename__ = "topology_entity_index" __table_args__ = ( UniqueConstraint( "project_key", "entity_type", "entity_id", "topology_id", "node_id", name="uq_topology_entity_index_project_entity_topology_node", ), Index( "ix_topology_entity_index_project_entity", "project_key", "entity_type", "entity_id", ), Index( "ix_topology_entity_index_project_topology_node", "project_key", "topology_id", "node_id", ), ) id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) project_key: Mapped[str] = mapped_column(String(128), nullable=False) entity_type: Mapped[str] = mapped_column(String(16), nullable=False) entity_id: Mapped[int] = mapped_column(Integer, nullable=False) topology_id: Mapped[int] = mapped_column(Integer, nullable=False) node_id: Mapped[str] = mapped_column(Text, nullable=False) depth: Mapped[int | None] = mapped_column(Integer, nullable=True) def ensure_topology_cache_tables() -> None: Base.metadata.create_all(sql_engine()) engine = sql_engine() topology_registry_columns = { item["name"] for item in inspect(engine).get_columns("topology_registry") } with engine.begin() as connection: if "data_options_json" not in topology_registry_columns: connection.execute( text("ALTER TABLE topology_registry ADD COLUMN data_options_json TEXT") ) if "dimension_config_json" not in topology_registry_columns: connection.execute( text( "ALTER TABLE topology_registry ADD COLUMN dimension_config_json TEXT" ) ) def _dump_json_text(raw_value: Any) -> str: if raw_value is None: return "" return json.dumps(raw_value, ensure_ascii=False, separators=(",", ":")) def _load_json_text(raw_value: str) -> Any: text_value = _text(raw_value) if not text_value: return None try: return json.loads(text_value) except json.JSONDecodeError: return text_value def _build_metric_definitions(data_options: Any) -> dict[str, dict[str, Any]]: if not isinstance(data_options, dict): return {"instant": {}, "accu": {}} result: dict[str, dict[str, Any]] = {"instant": {}, "accu": {}} for display in ("instant", "accu"): items = data_options.get(display) if not isinstance(items, list): continue mapped: dict[str, Any] = {} for item in items: if not isinstance(item, dict): continue code = _text(item.get("code")) if not code: continue mapped[code] = { "name": _text(item.get("name")), "unit": _text(item.get("unit")), "type": _safe_int(item.get("type")), "display": bool(item.get("display")), "value_key": _text(item.get("value")), } result[display] = mapped return result def _extract_page_items(payload: Any) -> list[dict[str, Any]]: if not isinstance(payload, dict): return [] data = payload.get("data") if isinstance(data, dict): items = data.get("data") if isinstance(items, list): return [item for item in items if isinstance(item, dict)] if isinstance(data, list): return [item for item in data if isinstance(item, dict)] return [] def _load_all_pages(fetch_page: Any) -> list[dict[str, Any]]: page_num = 1 result: list[dict[str, Any]] = [] while True: payload = fetch_page(page_num) result.extend(_extract_page_items(payload)) data = payload.get("data") if isinstance(payload, dict) else None if not isinstance(data, dict): break total_page = _safe_int(data.get("total_page")) or page_num if page_num >= total_page: break page_num += 1 return result def _flatten_tree_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] queue = deque(items) while queue: item = queue.popleft() result.append(item) children = item.get("children") if isinstance(children, list): queue.extend(child for child in children if isinstance(child, dict)) return result def _to_id_name_map(items: list[dict[str, Any]]) -> dict[int, str]: result: dict[int, str] = {} for item in items: item_id = _safe_int(item.get("id")) if item_id is None: continue result[item_id] = _text(item.get("name")) return result def _load_location_name_map(project_key: str) -> dict[int, str]: items = _load_all_pages( lambda page_num: api_list_locations( project_key, keyword="", page_size=100, page_num=page_num ) ) return _to_id_name_map(_flatten_tree_items(items)) def _load_system_type_name_map(project_key: str) -> dict[int, str]: payload = api_list_system_tree(project_key) items = _extract_page_items(payload) return _to_id_name_map( [item for item in _flatten_tree_items(items) if _safe_int(item.get("type")) == 1] ) def _load_system_name_map(project_key: str) -> dict[int, str]: items = _load_all_pages( lambda page_num: api_list_systems( project_key, page_size=100, page_num=page_num, system_type_id=0, show_below=True, ) ) return _to_id_name_map(items) def _load_device_type_name_map(project_key: str) -> dict[int, str]: return _to_id_name_map(_extract_page_items(api_list_device_types(project_key))) def _load_meter_type_name_map(project_key: str) -> dict[int, str]: return _to_id_name_map(_extract_page_items(api_list_meter_types(project_key))) def _load_device_name_map(project_key: str) -> dict[int, str]: items = _load_all_pages( lambda page_num: api_search_devices( project_key, page_size=100, page_num=page_num, keyword="", location_id=0, show_below=True, system_ids=[], device_type_ids=[], ) ) return _to_id_name_map(items) def _load_meter_name_map(project_key: str) -> dict[int, str]: items = _load_all_pages( lambda page_num: api_search_meters( project_key, page_size=100, page_num=page_num, keyword="", location_id=0, show_below=True, meter_type_id=0, measurement_location_ids=[], measurement_system_ids=[], measurement_device_type_ids=[], status=None, ) ) return _to_id_name_map(items) def _load_topology_group_name_map(session: Session, project_key: str) -> dict[int, str]: return {row.group_id: row.group_name for row in _load_group_rows(session, project_key)} def _load_topology_name_map(session: Session, project_key: str) -> dict[int, str]: return { row.topology_id: row.topology_name for row in _load_registry_rows(session, project_key) } def _object_type_label(type_code: int | None) -> str: return OBJECT_TYPE_LABELS.get(type_code or 0, "") def _resolve_field_name_map( session: Session, project_key: str, type_code: int ) -> dict[int, str]: if type_code == PT_OBJ_TYPE_LOCATION: return _load_location_name_map(project_key) if type_code == PT_OBJ_TYPE_SYSTEMTYPE: return _load_system_type_name_map(project_key) if type_code == PT_OBJ_TYPE_SYSTEM: return _load_system_name_map(project_key) if type_code == PT_OBJ_TYPE_DEVICETYPE: return _load_device_type_name_map(project_key) if type_code == PT_OBJ_TYPE_DEVICE: return _load_device_name_map(project_key) if type_code == PT_OBJ_TYPE_METERTYPE: return _load_meter_type_name_map(project_key) if type_code == PT_OBJ_TYPE_METERMODEL: raise ValueError("type=17 (仪表型号) needs additional API mapping confirmation") if type_code == PT_OBJ_TYPE_METER: return _load_meter_name_map(project_key) if type_code == PT_OBJ_TYPE_TOPOGROUP: return _load_topology_group_name_map(session, project_key) if type_code == PT_OBJ_TYPE_TOPODIAGRAM: return _load_topology_name_map(session, project_key) return {} def _resolve_field_items( session: Session, project_key: str, type_code: int, field_ids: list[int] ) -> list[dict[str, Any]]: name_map = _resolve_field_name_map(session, project_key, type_code) return [ {"id": field_id, "name": name_map.get(field_id, "")} for field_id in field_ids ] def _normalize_int_list(raw_value: Any) -> list[int]: if not isinstance(raw_value, list): return [] result: list[int] = [] for item in raw_value: normalized = _safe_int(item) if normalized is None: continue result.append(normalized) return result def get_topology_group_config(project_key: str, topology_id: int) -> dict[str, Any]: project_key = _text(project_key) if not project_key: raise ValueError("project_key is required") ensure_topology_cache_tables() with Session(sql_engine()) as session: _require_topology_cache(session, project_key) group_path_map = _group_path_map(_load_group_rows(session, project_key)) registry_row = _get_registry_or_error(session, project_key, topology_id) if registry_row.topology_type != 2: return { "supported": False, "raw_dimension_config": _load_json_text( registry_row.dimension_config_json ), "groupings": [], "filter": { "filter_type": None, "filter_type_label": "", "conditions": [], }, "mcp_note": "topology.get_group_config only applies to topology_type=2 topologies.", } raw_dimension_config = _load_json_text(registry_row.dimension_config_json) if not isinstance(raw_dimension_config, dict): raw_dimension_config = {} raw_dimensions = raw_dimension_config.get("dimensions") raw_filter = raw_dimension_config.get("filter") dimensions = raw_dimensions if isinstance(raw_dimensions, list) else [] filter_payload = raw_filter if isinstance(raw_filter, dict) else {} groupings: list[dict[str, Any]] = [] for item in dimensions: if not isinstance(item, dict): continue type_code = _safe_int(item.get("type")) level = _safe_int(item.get("level")) order = _safe_int(item.get("order")) groupings.append( { "name": _text(item.get("name")), "type": type_code, "type_label": _object_type_label(type_code), "level": level, "order": order, "order_label": ORDER_LABELS.get(order or 0, ""), } ) conditions_payload = filter_payload.get("conditions") conditions = conditions_payload if isinstance(conditions_payload, list) else [] resolved_conditions: list[dict[str, Any]] = [] for item in conditions: if not isinstance(item, dict): continue type_code = _safe_int(item.get("type")) or 0 level = _safe_int(item.get("level")) match_type = _safe_int(item.get("match_type")) field_ids = _normalize_int_list(item.get("fields")) resolved_conditions.append( { "type": type_code, "type_label": _object_type_label(type_code), "level": level, "match_type": match_type, "match_type_label": MATCH_TYPE_LABELS.get(match_type or 0, ""), "fields": field_ids, "field_items": _resolve_field_items( session, project_key, type_code, field_ids ), } ) filter_type = _safe_int(filter_payload.get("filter_type")) return { "supported": True, "raw_dimension_config": raw_dimension_config, "groupings": groupings, "filter": { "filter_type": filter_type, "filter_type_label": FILTER_TYPE_LABELS.get(filter_type or 0, ""), "conditions": resolved_conditions, }, } def _collect_group_and_topology_refs( project_key: str, items: list[Any], *, refreshed_at: str, parent_group_id: int | None = None, parent_group_path: tuple[str, ...] = (), ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: group_rows: list[dict[str, Any]] = [] topology_refs: list[dict[str, Any]] = [] for sort_index, item in enumerate(items, start=1): if not isinstance(item, dict): continue item_id = _safe_int(item.get("id")) item_name = _text(item.get("name")) or str(item_id or "") item_type = _safe_int(item.get("type")) children = ( item.get("children") if isinstance(item.get("children"), list) else [] ) if item_id is None: continue if item_type == 1: group_path = (*parent_group_path, item_name) group_rows.append( { "project_key": project_key, "group_id": item_id, "group_name": item_name, "parent_group_id": parent_group_id, "group_path_text": " / ".join(group_path), "level": len(group_path), "sort_index": sort_index, "refreshed_at": refreshed_at, "is_active": 1, } ) nested_group_rows, nested_topology_refs = _collect_group_and_topology_refs( project_key, children, refreshed_at=refreshed_at, parent_group_id=item_id, parent_group_path=group_path, ) group_rows.extend(nested_group_rows) topology_refs.extend(nested_topology_refs) continue topology_refs.append( { "topology_id": item_id, "topology_name": item_name, "group_id": parent_group_id, "sort_index": sort_index, } ) return group_rows, topology_refs def _as_int_list(raw_value: Any) -> list[int]: if not isinstance(raw_value, list): return [] result: list[int] = [] for item in raw_value: value = _safe_int(item) if value is None: continue result.append(value) return result def _record_deepest_entities( entity_best: dict[tuple[str, int], tuple[int, list[str]]], entity_node_ids: dict[tuple[str, int], list[str]], entity_type: str, entity_ids: list[int], *, depth: int, node_id: str, ) -> None: for entity_id in entity_ids: key = (entity_type, entity_id) existing = entity_best.get(key) if existing is None or depth > existing[0]: entity_best[key] = (depth, [node_id]) entity_node_ids[key] = [node_id] continue if depth == existing[0] and node_id not in entity_node_ids[key]: entity_node_ids[key].append(node_id) def _parse_tree_topology( project_key: str, topology_id: int, diagram: list[Any], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: node_rows: list[dict[str, Any]] = [] edge_rows: list[dict[str, Any]] = [] entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {} entity_node_ids: dict[tuple[str, int], list[str]] = {} def visit( node: dict[str, Any], parent_node_id: str | None, path_names: tuple[str, ...], sort_index: int, ) -> None: node_id = _text(node.get("id")) if not node_id: return node_name = _text(node.get("name")) or node_id children = ( node.get("children") if isinstance(node.get("children"), list) else [] ) level = _safe_int(node.get("level")) or (len(path_names) + 1) path_text = " / ".join((*path_names, node_name)) effective_parent_node_id = parent_node_id or ( _text(node.get("parent_id")) or None ) node_rows.append( { "project_key": project_key, "topology_id": topology_id, "node_id": node_id, "node_name": node_name, "parent_node_id": effective_parent_node_id, "level": level, "node_type_code": _safe_int(node.get("type")), "refer_id": _safe_int(node.get("refer_id")), "refer_level": _safe_int(node.get("refer_level")), "is_virtual": _bool_as_int(node.get("is_virtual")), "path_text": path_text, "child_count": len(children), "sort_index": sort_index, } ) if effective_parent_node_id: edge_rows.append( { "project_key": project_key, "topology_id": topology_id, "source_node_id": effective_parent_node_id, "target_node_id": node_id, "sort_index": sort_index, } ) _record_deepest_entities( entity_best, entity_node_ids, "meter", _as_int_list(node.get("meter_list")), depth=level, node_id=node_id, ) _record_deepest_entities( entity_best, entity_node_ids, "device", _as_int_list(node.get("device_list")), depth=level, node_id=node_id, ) for child_sort_index, child in enumerate(children, start=1): if not isinstance(child, dict): continue visit(child, node_id, (*path_names, node_name), child_sort_index) for root_sort_index, root in enumerate(diagram, start=1): if not isinstance(root, dict): continue visit(root, None, (), root_sort_index) entity_rows: list[dict[str, Any]] = [] for (entity_type, entity_id), (depth, _) in entity_best.items(): for node_id in entity_node_ids[(entity_type, entity_id)]: entity_rows.append( { "project_key": project_key, "entity_type": entity_type, "entity_id": entity_id, "topology_id": topology_id, "node_id": node_id, "depth": depth, } ) return node_rows, edge_rows, entity_rows def _build_graph_node_context( nodes: list[dict[str, Any]], edges: list[dict[str, Any]], ) -> tuple[dict[str, int], dict[str, str | None], dict[str, str], dict[str, int]]: incoming: dict[str, list[str]] = defaultdict(list) outgoing: dict[str, list[str]] = defaultdict(list) for edge in edges: source_node_id = _text(edge.get("source")) target_node_id = _text(edge.get("target")) if not source_node_id or not target_node_id: continue outgoing[source_node_id].append(target_node_id) incoming[target_node_id].append(source_node_id) node_ids = [_text(node.get("id")) for node in nodes if _text(node.get("id"))] roots = [node_id for node_id in node_ids if not incoming.get(node_id)] if not roots: roots = node_ids[:1] level_map: dict[str, int] = {} path_map: dict[str, str] = {} parent_map: dict[str, str | None] = {node_id: None for node_id in node_ids} child_count_map: dict[str, int] = { node_id: len(outgoing.get(node_id, [])) for node_id in node_ids } node_name_map = { _text(node.get("id")): _text(node.get("name")) or _text(node.get("id")) for node in nodes } queue: deque[tuple[str, int, str]] = deque() for root_node_id in roots: root_name = node_name_map.get(root_node_id) or root_node_id queue.append((root_node_id, 1, root_name)) while queue: node_id, depth, path_text = queue.popleft() previous = level_map.get(node_id) if previous is not None and previous >= depth: continue level_map[node_id] = depth path_map[node_id] = path_text for child_node_id in outgoing.get(node_id, []): if parent_map.get(child_node_id) is None: parent_map[child_node_id] = node_id child_name = node_name_map.get(child_node_id) or child_node_id queue.append((child_node_id, depth + 1, f"{path_text} / {child_name}")) for node in nodes: node_id = _text(node.get("id")) if not node_id: continue if node_id in level_map: continue fallback_level = _safe_int(node.get("level")) or 1 node_name = _text(node.get("name")) or node_id level_map[node_id] = fallback_level path_map[node_id] = node_name return level_map, parent_map, path_map, child_count_map def _parse_graph_topology( project_key: str, topology_id: int, diagram: dict[str, Any], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]: raw_nodes = [item for item in diagram.get("nodes", []) if isinstance(item, dict)] raw_edges = [item for item in diagram.get("edges", []) if isinstance(item, dict)] level_map, parent_map, path_map, child_count_map = _build_graph_node_context( raw_nodes, raw_edges ) node_rows: list[dict[str, Any]] = [] edge_rows: list[dict[str, Any]] = [] entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {} entity_node_ids: dict[tuple[str, int], list[str]] = {} for sort_index, node in enumerate(raw_nodes, start=1): node_id = _text(node.get("id")) if not node_id: continue level = level_map.get(node_id) or _safe_int(node.get("level")) or 1 node_rows.append( { "project_key": project_key, "topology_id": topology_id, "node_id": node_id, "node_name": _text(node.get("name")) or node_id, "parent_node_id": parent_map.get(node_id), "level": level, "node_type_code": _safe_int(node.get("type")), "refer_id": _safe_int(node.get("refer_id")), "refer_level": _safe_int(node.get("refer_level")), "is_virtual": _bool_as_int(node.get("is_virtual")), "path_text": path_map.get(node_id, _text(node.get("name")) or node_id), "child_count": child_count_map.get(node_id, 0), "sort_index": sort_index, } ) _record_deepest_entities( entity_best, entity_node_ids, "meter", _as_int_list(node.get("meter_list")), depth=level, node_id=node_id, ) _record_deepest_entities( entity_best, entity_node_ids, "device", _as_int_list(node.get("device_list")), depth=level, node_id=node_id, ) for sort_index, edge in enumerate(raw_edges, start=1): source_node_id = _text(edge.get("source")) target_node_id = _text(edge.get("target")) if not source_node_id or not target_node_id: continue edge_rows.append( { "project_key": project_key, "topology_id": topology_id, "source_node_id": source_node_id, "target_node_id": target_node_id, "sort_index": sort_index, } ) entity_rows: list[dict[str, Any]] = [] for (entity_type, entity_id), (depth, _) in entity_best.items(): for node_id in entity_node_ids[(entity_type, entity_id)]: entity_rows.append( { "project_key": project_key, "entity_type": entity_type, "entity_id": entity_id, "topology_id": topology_id, "node_id": node_id, "depth": depth, } ) return node_rows, edge_rows, entity_rows def refresh_topology_cache( project_key: str, topology_ids: list[int] | None = None ) -> dict[str, Any]: project_key = _text(project_key) if not project_key: raise ValueError("project_key is required") ensure_topology_cache_tables() refreshed_at = _utc_now_iso() list_payload = api_list_topologies_with_group(project_key, group_ids=[]) raw_items = list_payload.get("data") if not isinstance(raw_items, list): raise ValueError("topology list returned invalid data") group_rows, topology_refs = _collect_group_and_topology_refs( project_key, raw_items, refreshed_at=refreshed_at ) requested_topology_ids = {item for item in topology_ids or []} available_topology_map = {item["topology_id"]: item for item in topology_refs} if requested_topology_ids: missing = sorted(requested_topology_ids - set(available_topology_map)) if missing: raise ValueError( f"topology_id not found in upstream topology list: {missing}" ) selected_topology_refs = [ available_topology_map[item] for item in topology_ids or [] ] else: selected_topology_refs = topology_refs registry_rows: list[dict[str, Any]] = [] node_rows: list[dict[str, Any]] = [] edge_rows: list[dict[str, Any]] = [] entity_rows: list[dict[str, Any]] = [] for topology_ref in selected_topology_refs: topology_id = topology_ref["topology_id"] detail_payload = api_get_topology(project_key, topology_id) detail_data = detail_payload.get("data") if not isinstance(detail_data, dict): raise ValueError( f"topology get returned invalid data for topology_id={topology_id}" ) diagram = detail_data.get("diagram") if isinstance(diagram, list): root_shape = "tree" topology_node_rows, topology_edge_rows, topology_entity_rows = ( _parse_tree_topology(project_key, topology_id, diagram) ) elif isinstance(diagram, dict): root_shape = "graph" topology_node_rows, topology_edge_rows, topology_entity_rows = ( _parse_graph_topology(project_key, topology_id, diagram) ) else: root_shape = "tree" topology_node_rows, topology_edge_rows, topology_entity_rows = ([], [], []) registry_rows.append( { "project_key": project_key, "topology_id": topology_id, "topology_name": _text(detail_data.get("name")) or topology_ref["topology_name"], "topology_type": _safe_int(detail_data.get("type")) or 0, "object_type_code": _safe_int(detail_data.get("object")), "group_id": _safe_int(detail_data.get("group_id")) or topology_ref.get("group_id"), "root_shape": root_shape, "source_updated_time": _text(detail_data.get("updated_time")), "data_options_json": _dump_json_text(detail_data.get("data_options")), "dimension_config_json": _dump_json_text( detail_data.get("dimension_config") ), "refreshed_at": refreshed_at, "is_active": 1, } ) node_rows.extend(topology_node_rows) edge_rows.extend(topology_edge_rows) entity_rows.extend(topology_entity_rows) with Session(sql_engine()) as session: if requested_topology_ids: session.execute( delete(TopologyGroup).where(TopologyGroup.project_key == project_key) ) session.add_all(TopologyGroup(**row) for row in group_rows) session.execute( delete(TopologyEntityIndex).where( TopologyEntityIndex.project_key == project_key, TopologyEntityIndex.topology_id.in_(requested_topology_ids), ) ) session.execute( delete(TopologyEdge).where( TopologyEdge.project_key == project_key, TopologyEdge.topology_id.in_(requested_topology_ids), ) ) session.execute( delete(TopologyNode).where( TopologyNode.project_key == project_key, TopologyNode.topology_id.in_(requested_topology_ids), ) ) session.execute( delete(TopologyRegistry).where( TopologyRegistry.project_key == project_key, TopologyRegistry.topology_id.in_(requested_topology_ids), ) ) else: session.execute( delete(TopologyEntityIndex).where( TopologyEntityIndex.project_key == project_key ) ) session.execute( delete(TopologyEdge).where(TopologyEdge.project_key == project_key) ) session.execute( delete(TopologyNode).where(TopologyNode.project_key == project_key) ) session.execute( delete(TopologyRegistry).where( TopologyRegistry.project_key == project_key ) ) session.execute( delete(TopologyGroup).where(TopologyGroup.project_key == project_key) ) session.add_all(TopologyGroup(**row) for row in group_rows) session.add_all(TopologyRegistry(**row) for row in registry_rows) session.add_all(TopologyNode(**row) for row in node_rows) session.add_all(TopologyEdge(**row) for row in edge_rows) session.add_all(TopologyEntityIndex(**row) for row in entity_rows) session.commit() result = { "project_key": project_key, "refreshed_group_count": len(group_rows), "refreshed_topology_count": len(registry_rows), "refreshed_node_count": len(node_rows), "refreshed_edge_count": len(edge_rows), "refreshed_entity_index_count": len(entity_rows), "topology_ids": [row["topology_id"] for row in registry_rows], "refreshed_at": refreshed_at, } if not registry_rows: result["mcp_note"] = ( f"Project '{project_key}' currently has no topology data in upstream config." ) return result def _load_group_rows(session: Session, project_key: str) -> list[TopologyGroup]: return list( session.scalars( select(TopologyGroup) .where(TopologyGroup.project_key == project_key) .order_by( TopologyGroup.level.asc(), TopologyGroup.sort_index.asc(), TopologyGroup.group_id.asc(), ) ) ) def _load_registry_rows(session: Session, project_key: str) -> list[TopologyRegistry]: return list( session.scalars( select(TopologyRegistry) .where(TopologyRegistry.project_key == project_key) .order_by( TopologyRegistry.topology_name.asc(), TopologyRegistry.topology_id.asc() ) ) ) def _has_topology_cache(session: Session, project_key: str) -> bool: registry_rows = _load_registry_rows(session, project_key) return bool(registry_rows) def _require_topology_cache(session: Session, project_key: str) -> None: if _has_topology_cache(session, project_key): return raise ValueError( f"Project '{project_key}' has no cached topologies. Refresh it via " f"GET /topology/cache/refresh?project_key={project_key}. If the refresh " "already succeeded, the upstream project likely has no topology data." ) def list_topology_groups(project_key: str) -> dict[str, Any]: project_key = _text(project_key) if not project_key: raise ValueError("project_key is required") ensure_topology_cache_tables() with Session(sql_engine()) as session: if not _has_topology_cache(session, project_key): return { "project_key": project_key, "groups": [], "total": 0, "mcp_note": ( f"Project '{project_key}' has no cached topologies. Refresh it via " f"GET /topology/cache/refresh?project_key={project_key}. If the refresh " "already succeeded, the upstream project likely has no topology data." ), } group_rows = _load_group_rows(session, project_key) children_by_parent: dict[int | None, list[dict[str, Any]]] = defaultdict(list) node_by_group_id: dict[int, dict[str, Any]] = {} for group_row in group_rows: payload = { "group_id": group_row.group_id, "group_name": group_row.group_name, "parent_group_id": group_row.parent_group_id, "level": group_row.level, "group_path_text": group_row.group_path_text, "children": [], } node_by_group_id[group_row.group_id] = payload children_by_parent[group_row.parent_group_id].append(payload) for group_payload in node_by_group_id.values(): group_payload["children"] = children_by_parent.get( group_payload["group_id"], [] ) return { "project_key": project_key, "groups": children_by_parent.get(None, []), "total": len(group_rows), } def _group_path_map(group_rows: list[TopologyGroup]) -> dict[int, str]: return {group_row.group_id: group_row.group_path_text for group_row in group_rows} def _collect_descendant_group_ids( group_rows: list[TopologyGroup], group_id: int ) -> set[int]: group_children: dict[int | None, list[int]] = defaultdict(list) for row in group_rows: group_children[row.parent_group_id].append(row.group_id) result: set[int] = set() queue: deque[int] = deque([group_id]) while queue: current_group_id = queue.popleft() if current_group_id in result: continue result.add(current_group_id) queue.extend(group_children.get(current_group_id, [])) return result def list_topologies( project_key: str, *, group_id: int | None = None, object_type_code: int | None = None, ) -> dict[str, Any]: project_key = _text(project_key) if not project_key: raise ValueError("project_key is required") ensure_topology_cache_tables() with Session(sql_engine()) as session: if not _has_topology_cache(session, project_key): return { "project_key": project_key, "topologies": [], "total": 0, "mcp_note": ( f"Project '{project_key}' has no cached topologies. Refresh it via " f"GET /topology/cache/refresh?project_key={project_key}. If the refresh " "already succeeded, the upstream project likely has no topology data." ), } group_rows = _load_group_rows(session, project_key) registry_rows = _load_registry_rows(session, project_key) group_path_map = _group_path_map(group_rows) allowed_group_ids: set[int] | None = None if group_id is not None: allowed_group_ids = _collect_descendant_group_ids(group_rows, group_id) topologies: list[dict[str, Any]] = [] for registry_row in registry_rows: if ( allowed_group_ids is not None and registry_row.group_id not in allowed_group_ids ): continue if ( object_type_code is not None and registry_row.object_type_code != object_type_code ): continue topologies.append( { "topology_id": registry_row.topology_id, "topology_name": registry_row.topology_name, "topology_type": registry_row.topology_type, "object_type_code": registry_row.object_type_code, "group_id": registry_row.group_id, "group_path_text": group_path_map.get(registry_row.group_id or -1, ""), "root_shape": registry_row.root_shape, "refreshed_at": registry_row.refreshed_at, } ) return { "project_key": project_key, "topologies": topologies, "total": len(topologies), } def _get_registry_or_error( session: Session, project_key: str, topology_id: int ) -> TopologyRegistry: registry_row = session.scalar( select(TopologyRegistry).where( TopologyRegistry.project_key == project_key, TopologyRegistry.topology_id == topology_id, ) ) if registry_row is None: raise ValueError(f"topology_id not found in cache: {topology_id}") return registry_row def _get_node_or_error( session: Session, project_key: str, topology_id: int, node_id: str ) -> TopologyNode: node_row = session.scalar( select(TopologyNode).where( TopologyNode.project_key == project_key, TopologyNode.topology_id == topology_id, TopologyNode.node_id == node_id, ) ) if node_row is None: raise ValueError(f"node_id not found in cache: {node_id}") return node_row def _resolve_root_node_id( node_map: dict[str, TopologyNode], parents_by_node: dict[str, list[str]] ) -> str: root_ids = [node_id for node_id in node_map if not parents_by_node.get(node_id)] if not root_ids: raise ValueError("root node not found in cache") root_ids.sort( key=lambda item: ( node_map[item].level if node_map[item].level is not None else 10**9, node_map[item].path_text or node_map[item].node_name, item, ) ) return root_ids[0] def _normalize_requested_node_id( raw_node_id: str, node_map: dict[str, TopologyNode], parents_by_node: dict[str, list[str]], ) -> str: normalized = _text(raw_node_id) if normalized.lower() != "root": return normalized return _resolve_root_node_id(node_map, parents_by_node) def _node_payload(node_row: TopologyNode) -> dict[str, Any]: return { "node_id": node_row.node_id, "node_name": node_row.node_name, "level": node_row.level, "parent_node_id": node_row.parent_node_id, "refer_id": node_row.refer_id, "refer_level": node_row.refer_level, "is_virtual": bool(node_row.is_virtual), "path_text": node_row.path_text, "child_count": node_row.child_count, } def _floor_hour_ts(ts: int) -> int: return max(0, int(ts) - (int(ts) % 3600)) def _floor_day_ts(ts: int) -> int: current = datetime.fromtimestamp(int(ts)).astimezone() return int( current.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() ) def _hourly_window_timestamps(base_ts: int) -> list[int]: current_hour_ts = _floor_hour_ts(base_ts) return [current_hour_ts - 3600 * offset for offset in range(1, 13)] def _daily_window_timestamps(base_ts: int) -> list[int]: current_day_ts = _floor_day_ts(base_ts) return [current_day_ts - 86400 * offset for offset in range(0, 7)] def _extract_topology_data_map(payload: Any) -> dict[str, Any]: if not isinstance(payload, dict): return {} data = payload.get("data") if not isinstance(data, dict): return {} return {str(node_id): node_values for node_id, node_values in data.items()} def _fetch_topology_runtime_data( project_key: str, topology_id: int, *, base_ts: int | None = None ) -> dict[str, Any]: effective_base_ts = int(base_ts if base_ts is not None else _current_unix_ts()) hourly_timestamps = _hourly_window_timestamps(effective_base_ts) daily_timestamps = _daily_window_timestamps(effective_base_ts) def fetch_instant() -> dict[str, Any]: return _extract_topology_data_map( api_get_topology_data(project_key, topology_id, display="instant") ) def fetch_accu(accu_step: int, ts: int) -> tuple[int, dict[str, Any]]: payload = api_get_topology_data( project_key, topology_id, display="accu", accu_step=accu_step, ts=ts, ) return ts, _extract_topology_data_map(payload) max_workers = max(1, min(8, 1 + len(hourly_timestamps) + len(daily_timestamps))) with ThreadPoolExecutor(max_workers=max_workers) as executor: instant_future = executor.submit(fetch_instant) hourly_futures = [ executor.submit(fetch_accu, 2, ts_value) for ts_value in hourly_timestamps ] daily_futures = [ executor.submit(fetch_accu, 3, ts_value) for ts_value in daily_timestamps ] instant_map = instant_future.result() hourly_series = [future.result() for future in hourly_futures] daily_series = [future.result() for future in daily_futures] return { "base_ts": effective_base_ts, "instant": instant_map, "hourly": [ {"ts": ts_value, "data_map": data_map} for ts_value, data_map in hourly_series ], "daily": [ {"ts": ts_value, "data_map": data_map} for ts_value, data_map in daily_series ], "data_window": { "hourly_ts": hourly_timestamps, "daily_ts": daily_timestamps, }, } def _attach_runtime_data_to_node( node_payload: dict[str, Any], runtime_bundle: dict[str, Any] ) -> dict[str, Any]: node_id = str(node_payload.get("node_id") or "").strip() instant_map = runtime_bundle.get("instant") or {} hourly_series = runtime_bundle.get("hourly") or [] daily_series = runtime_bundle.get("daily") or [] node_payload["data"] = { "instant": instant_map.get(node_id), "accu": { "hourly": [ { "ts": item["ts"], "values": item["data_map"].get(node_id), } for item in hourly_series ], "daily": [ { "ts": item["ts"], "values": item["data_map"].get(node_id), } for item in daily_series ], }, } return node_payload def _attach_runtime_data_to_nodes( nodes: list[dict[str, Any]], runtime_bundle: dict[str, Any] ) -> list[dict[str, Any]]: return [_attach_runtime_data_to_node(node_payload, runtime_bundle) for node_payload in nodes] def _load_node_map( session: Session, project_key: str, topology_id: int ) -> dict[str, TopologyNode]: rows = session.scalars( select(TopologyNode).where( TopologyNode.project_key == project_key, TopologyNode.topology_id == topology_id, ) ) return {row.node_id: row for row in rows} def _load_adjacency( session: Session, project_key: str, topology_id: int ) -> tuple[dict[str, list[str]], dict[str, list[str]]]: edges = session.scalars( select(TopologyEdge) .where( TopologyEdge.project_key == project_key, TopologyEdge.topology_id == topology_id, ) .order_by(TopologyEdge.sort_index.asc(), TopologyEdge.id.asc()) ) parents_by_node: dict[str, list[str]] = defaultdict(list) children_by_node: dict[str, list[str]] = defaultdict(list) for edge in edges: parents_by_node[edge.target_node_id].append(edge.source_node_id) children_by_node[edge.source_node_id].append(edge.target_node_id) return parents_by_node, children_by_node def _dedupe_preserve_order(node_ids: list[str]) -> list[str]: result: list[str] = [] seen: set[str] = set() for node_id in node_ids: if node_id in seen: continue seen.add(node_id) result.append(node_id) return result def _node_list_payload( node_map: dict[str, TopologyNode], node_ids: list[str] ) -> list[dict[str, Any]]: payload: list[dict[str, Any]] = [] for node_id in _dedupe_preserve_order(node_ids): node_row = node_map.get(node_id) if node_row is None: continue payload.append(_node_payload(node_row)) return payload def _collect_ancestors( node_map: dict[str, TopologyNode], parents_by_node: dict[str, list[str]], node_id: str, depth_limit: int, ) -> list[dict[str, Any]]: if depth_limit <= 0: return [] visited: dict[str, int] = {} queue: deque[tuple[str, int]] = deque( (parent_id, 1) for parent_id in parents_by_node.get(node_id, []) ) while queue: current_node_id, distance = queue.popleft() if distance > depth_limit: continue previous_distance = visited.get(current_node_id) if previous_distance is not None and previous_distance <= distance: continue visited[current_node_id] = distance for parent_id in parents_by_node.get(current_node_id, []): queue.append((parent_id, distance + 1)) ordered_ids = sorted( visited, key=lambda item: ( -visited[item], node_map[item].path_text or node_map[item].node_name, item, ), ) result: list[dict[str, Any]] = [] for current_node_id in ordered_ids: node_payload = _node_payload(node_map[current_node_id]) node_payload["distance"] = visited[current_node_id] result.append(node_payload) return result def _collect_descendants( node_map: dict[str, TopologyNode], children_by_node: dict[str, list[str]], node_id: str, depth_limit: int, ) -> list[dict[str, Any]]: if depth_limit <= 0: return [] visited: dict[str, int] = {} queue: deque[tuple[str, int]] = deque( (child_id, 1) for child_id in children_by_node.get(node_id, []) ) while queue: current_node_id, distance = queue.popleft() if distance > depth_limit: continue previous_distance = visited.get(current_node_id) if previous_distance is not None and previous_distance <= distance: continue visited[current_node_id] = distance for child_id in children_by_node.get(current_node_id, []): queue.append((child_id, distance + 1)) ordered_ids = sorted( visited, key=lambda item: ( visited[item], node_map[item].path_text or node_map[item].node_name, item, ), ) result: list[dict[str, Any]] = [] for current_node_id in ordered_ids: node_payload = _node_payload(node_map[current_node_id]) node_payload["distance"] = visited[current_node_id] result.append(node_payload) return result def _topology_metadata_payload( registry_row: TopologyRegistry, group_path_text: str ) -> dict[str, Any]: data_options = _load_json_text(registry_row.data_options_json) return { "topology_id": registry_row.topology_id, "topology_name": registry_row.topology_name, "topology_type": registry_row.topology_type, "object_type_code": registry_row.object_type_code, "group_id": registry_row.group_id, "group_path_text": group_path_text, "root_shape": registry_row.root_shape, "data_options": data_options, "metric_definitions": _build_metric_definitions(data_options), "dimension_config": _load_json_text(registry_row.dimension_config_json), } def get_topology_node( project_key: str, topology_id: int, node_id: str = "root", *, include_siblings: bool = True, include_children: bool = True, ) -> dict[str, Any]: project_key = _text(project_key) node_id = _text(node_id) if not project_key: raise ValueError("project_key is required") if not node_id: raise ValueError("node_id is required") ensure_topology_cache_tables() with Session(sql_engine()) as session: _require_topology_cache(session, project_key) group_rows = _load_group_rows(session, project_key) group_path_map = _group_path_map(group_rows) registry_row = _get_registry_or_error(session, project_key, topology_id) node_map = _load_node_map(session, project_key, topology_id) parents_by_node, children_by_node = _load_adjacency( session, project_key, topology_id ) resolved_node_id = _normalize_requested_node_id( node_id, node_map, parents_by_node ) node_row = _get_node_or_error( session, project_key, topology_id, resolved_node_id ) runtime_bundle = _fetch_topology_runtime_data(project_key, topology_id) parent_ids = parents_by_node.get(resolved_node_id, []) child_ids = children_by_node.get(resolved_node_id, []) if include_children else [] sibling_ids: list[str] = [] if include_siblings and len(parent_ids) == 1: sibling_ids = [ candidate for candidate in children_by_node.get(parent_ids[0], []) if candidate != resolved_node_id ] return { "data_window": runtime_bundle["data_window"], "topology": _topology_metadata_payload( registry_row, group_path_map.get(registry_row.group_id or -1, "") ), "node": _attach_runtime_data_to_node(_node_payload(node_row), runtime_bundle), "parents": _attach_runtime_data_to_nodes( _node_list_payload(node_map, parent_ids), runtime_bundle ), "children": _attach_runtime_data_to_nodes( _node_list_payload(node_map, child_ids), runtime_bundle ), "siblings": _attach_runtime_data_to_nodes( _node_list_payload(node_map, sibling_ids), runtime_bundle ), } def find_topology_context( project_key: str, entity_type: str, entity_id: int, *, topology_id: int | None = None, include_siblings: bool = True, ancestor_depth: int = 5, descendant_depth: int = 2, ) -> dict[str, Any]: project_key = _text(project_key) normalized_entity_type = _normalize_entity_type(entity_type) if not project_key: raise ValueError("project_key is required") if entity_id <= 0: raise ValueError("entity_id must be a positive integer") ensure_topology_cache_tables() with Session(sql_engine()) as session: _require_topology_cache(session, project_key) group_rows = _load_group_rows(session, project_key) group_path_map = _group_path_map(group_rows) query = select(TopologyEntityIndex).where( TopologyEntityIndex.project_key == project_key, TopologyEntityIndex.entity_type == normalized_entity_type, TopologyEntityIndex.entity_id == entity_id, ) if topology_id is not None: query = query.where(TopologyEntityIndex.topology_id == topology_id) index_rows = list( session.scalars( query.order_by( TopologyEntityIndex.depth.desc(), TopologyEntityIndex.topology_id.asc(), ) ) ) matches: list[dict[str, Any]] = [] topology_node_maps: dict[int, dict[str, TopologyNode]] = {} topology_adjacency: dict[ int, tuple[dict[str, list[str]], dict[str, list[str]]] ] = {} topology_registry_rows: dict[int, TopologyRegistry] = {} for index_row in index_rows: current_topology_id = index_row.topology_id if current_topology_id not in topology_registry_rows: topology_registry_rows[current_topology_id] = _get_registry_or_error( session, project_key, current_topology_id ) topology_node_maps[current_topology_id] = _load_node_map( session, project_key, current_topology_id ) topology_adjacency[current_topology_id] = _load_adjacency( session, project_key, current_topology_id ) node_map = topology_node_maps[current_topology_id] node_row = node_map.get(index_row.node_id) if node_row is None: continue parents_by_node, children_by_node = topology_adjacency[current_topology_id] parent_ids = parents_by_node.get(index_row.node_id, []) child_ids = children_by_node.get(index_row.node_id, []) sibling_ids: list[str] = [] if include_siblings and len(parent_ids) == 1: sibling_ids = [ candidate for candidate in children_by_node.get(parent_ids[0], []) if candidate != index_row.node_id ] matches.append( { "topology": _topology_metadata_payload( topology_registry_rows[current_topology_id], group_path_map.get( topology_registry_rows[current_topology_id].group_id or -1, "", ), ), "self": _node_payload(node_row), "parents": _node_list_payload(node_map, parent_ids), "children": _node_list_payload(node_map, child_ids), "ancestors": _collect_ancestors( node_map, parents_by_node, index_row.node_id, max(ancestor_depth, 0), ), "descendants": _collect_descendants( node_map, children_by_node, index_row.node_id, max(descendant_depth, 0), ), "siblings": _node_list_payload(node_map, sibling_ids), } ) return { "query": { "project_key": project_key, "entity_type": normalized_entity_type, "entity_id": entity_id, "topology_id": topology_id, }, "matches": matches, "total_matches": len(matches), }