Explorar o código

add topology tools

Guangyu hai 3 semanas
pai
achega
15c562be7a

+ 400 - 2
README.md

@@ -17,8 +17,6 @@
 - 分页保持后端原样,不自动翻页
 - 返回结果尽量保持后端原始结构
 
-详细设计见 `mcp-design.md`。
-
 ## 环境要求
 
 - Python 3.11 到 3.13
@@ -124,9 +122,409 @@ uv run --python 3.13 python -m instrument_config_mcp
 - `search_devices`
 - `search_meters`
 - `search_points`
+- `topology_group_list`
+- `topology_list`
+- `topology_get_node`
+- `topology_find_context`
 
 除 `project.list` 外,其他工具都必须传 `project_key`。
 
+## 拓扑能力概览
+
+当前拓扑能力采用“上游拉取 + 本地缓存索引 + 缓存查询工具”的模式。
+
+目标:
+
+- 通过本地缓存索引替代每次实时全量扫描拓扑
+- 支持按设备/仪表快速定位所在拓扑及节点上下文
+- 为异常分析场景提供稳定、可重复的查询能力
+
+当前实现主要落在以下文件中:
+
+- `instrument_config_mcp/config_api.py`
+- `instrument_config_mcp/topology_cache.py`
+- `instrument_config_mcp/server.py`
+
+当前已落地的核心能力:
+
+- 从上游读取拓扑分组列表
+- 从上游读取单个拓扑结构
+- 支持树形拓扑和图形拓扑两种解析路径
+- 缓存拓扑分组、拓扑主信息、节点、边、实体索引
+- 基于缓存查询拓扑列表
+- 基于缓存查询节点局部上下文
+- 基于缓存按设备/仪表定位拓扑上下文
+
+## 拓扑上游接口
+
+当前代码实际使用的上游接口只有两个,均定义在 `instrument_config_mcp/config_api.py`:
+
+| Python function | Upstream API | Purpose |
+| --- | --- | --- |
+| `list_topologies_with_group(project_key, group_ids=None)` | `POST /api/configapi/topo/list_with_group` | 拉取拓扑分组与拓扑列表 |
+| `get_topology(project_key, id)` | `POST /api/configapi/topo/get` | 拉取单张拓扑详情 |
+
+说明:
+
+- 当前没有实现 `topo/get_data` 的封装与使用
+- 当前缓存设计围绕“结构查询”和“异常定位”场景,没有引入展示层时序数据缓存
+
+## 拓扑缓存刷新
+
+核心入口是 `refresh_topology_cache(project_key, topology_ids=None)`。
+
+实际行为:
+
+1. 校验 `project_key`
+2. 调 `list_topologies_with_group(project_key, group_ids=[])` 拉取全量拓扑树
+3. 从上游返回中提取分组记录和拓扑候选记录
+4. 如果传了 `topology_ids`,只刷新指定拓扑;否则刷新该项目全部拓扑
+5. 对每张目标拓扑调用 `get_topology(project_key, topology_id)`
+6. 根据 `data.diagram` 形态选择解析策略:`list` 走树形拓扑,`dict` 走图形拓扑
+7. 解析后写入本地缓存表
+
+刷新结果会返回以下统计字段:
+
+- `refreshed_group_count`
+- `refreshed_topology_count`
+- `refreshed_node_count`
+- `refreshed_edge_count`
+- `refreshed_entity_index_count`
+- `topology_ids`
+- `refreshed_at`
+
+刷新模式:
+
+| Mode | Trigger | Behavior |
+| --- | --- | --- |
+| Full refresh | `topology_ids is None` | 删除该项目全部拓扑缓存,再重建全部分组、拓扑、节点、边、实体索引 |
+| Partial refresh | `topology_ids` 非空 | 仅删除指定拓扑对应的节点、边、实体索引、拓扑注册信息,但会重建全量分组表 |
+
+说明:
+
+- 分组表在局部刷新时也会整体重写,避免分组树和拓扑注册信息不一致
+- 当前没有增量 merge 逻辑,采用“删后重建”的方式,简单且可预测
+
+## 拓扑解析设计
+
+### 树形拓扑
+
+入口函数:`_parse_tree_topology(project_key, topology_id, diagram)`
+
+处理逻辑:
+
+- 递归遍历树节点
+- 写入 `TopologyNode`
+- 根据父子关系写入 `TopologyEdge`
+- 从每个节点的 `meter_list` / `device_list` 中抽取实体引用
+- 仅保留“最深层命中的节点”作为实体索引
+
+### 图形拓扑
+
+入口函数:`_parse_graph_topology(project_key, topology_id, diagram)`
+
+处理逻辑:
+
+- 读取 `diagram.nodes` 和 `diagram.edges`
+- 用 `_build_graph_node_context(...)` 推导节点层级、父节点关系、路径文本、子节点数量
+- 写入 `TopologyNode`
+- 写入 `TopologyEdge`
+- 从节点的 `meter_list` / `device_list` 中抽取实体引用
+- 同样只保留“最深层命中的节点”进入实体索引
+
+### 实体索引规则
+
+当前实现采用统一实体索引表:
+
+- `entity_type = meter | device`
+- `entity_id`
+- `topology_id`
+- `node_id`
+- `depth`
+
+其中 `depth` 表示该实体命中的节点深度。
+
+当前通过 `_record_deepest_entities(...)` 保证:
+
+- 同一个实体如果在多个节点出现,优先保留更深层节点
+- 如果同深度有多个节点,则保留多个 `node_id`
+
+## 拓扑缓存表结构
+
+当前数据库模型定义在 `instrument_config_mcp/topology_cache.py` 中。
+
+### `topology_group`
+
+保存拓扑分组树。
+
+关键字段:
+
+- `project_key`
+- `group_id`
+- `group_name`
+- `parent_group_id`
+- `group_path_text`
+- `level`
+- `sort_index`
+- `refreshed_at`
+- `is_active`
+
+关键约束和索引:
+
+- `uq_topology_group_project_group`
+- `ix_topology_group_project_parent`
+
+### `topology_registry`
+
+保存拓扑主信息。
+
+关键字段:
+
+- `project_key`
+- `topology_id`
+- `topology_name`
+- `topology_type`
+- `object_type_code`
+- `group_id`
+- `root_shape`
+- `source_updated_time`
+- `refreshed_at`
+- `is_active`
+
+关键约束和索引:
+
+- `uq_topology_registry_project_topology`
+- `ix_topology_registry_project_group`
+
+### `topology_node`
+
+保存拓扑节点。
+
+关键字段:
+
+- `project_key`
+- `topology_id`
+- `node_id`
+- `node_name`
+- `parent_node_id`
+- `level`
+- `node_type_code`
+- `refer_id`
+- `refer_level`
+- `is_virtual`
+- `path_text`
+- `child_count`
+- `sort_index`
+
+关键约束和索引:
+
+- `uq_topology_node_project_topology_node`
+- `ix_topology_node_project_topology_parent`
+- `ix_topology_node_project_topology_refer`
+
+### `topology_edge`
+
+保存节点边关系。
+
+关键字段:
+
+- `project_key`
+- `topology_id`
+- `source_node_id`
+- `target_node_id`
+- `sort_index`
+
+关键约束和索引:
+
+- `uq_topology_edge_project_topology_nodes`
+- `ix_topology_edge_project_topology_source`
+- `ix_topology_edge_project_topology_target`
+
+### `topology_entity_index`
+
+保存设备/仪表到节点的反查索引。
+
+关键字段:
+
+- `project_key`
+- `entity_type`
+- `entity_id`
+- `topology_id`
+- `node_id`
+- `depth`
+
+关键约束和索引:
+
+- `uq_topology_entity_index_project_entity_topology_node`
+- `ix_topology_entity_index_project_entity`
+- `ix_topology_entity_index_project_topology_node`
+
+Schema 说明:
+
+- `topology_entity_index` 同时承载 `meter` 和 `device` 两类实体关系
+- 刷新时间保存在分组表和拓扑注册表中
+- 缓存刷新采用整批删除后重建的方式,不单独维护刷新状态表
+
+## 拓扑工具说明
+
+### `topology_group_list(project_key)`
+
+用途:
+
+- 返回缓存中的拓扑分组树
+
+输出结构:
+
+- `project_key`
+- `groups`
+- `total`
+
+### `topology_list(project_key, group_id=None, object_type_code=None)`
+
+用途:
+
+- 返回缓存中的拓扑列表
+- 可按分组或对象类型过滤
+
+输出结构:
+
+- `project_key`
+- `topologies`
+- `total`
+
+`topologies[]` 当前包含:
+
+- `topology_id`
+- `topology_name`
+- `topology_type`
+- `object_type_code`
+- `group_id`
+- `group_path_text`
+- `root_shape`
+- `refreshed_at`
+
+### `topology_get_node(project_key, topology_id, node_id, include_siblings=True, include_children=True)`
+
+用途:
+
+- 获取一个节点及其直接邻域
+
+输出结构:
+
+- `topology`
+- `node`
+- `parents`
+- `children`
+- `siblings`
+
+### `topology_find_context(project_key, entity_type, entity_id, topology_id=None, include_siblings=True, ancestor_depth=5, descendant_depth=2)`
+
+用途:
+
+- 按实体快速反查命中的拓扑节点上下文
+
+输出结构:
+
+- `query`
+- `matches`
+- `total_matches`
+
+每个 `matches[]` 当前包含:
+
+- `topology`
+- `self`
+- `parents`
+- `children`
+- `ancestors`
+- `descendants`
+- `siblings`
+
+## 拓扑查询行为
+
+### 分组查询
+
+`topology.group_list` 直接从 `topology_group` 组装树结构返回,不再依赖上游实时调用。
+
+### 拓扑列表查询
+
+`topology.list` 的过滤逻辑:
+
+- 如果传 `group_id`,会把该分组及其全部子分组都纳入可见范围
+- 如果传 `object_type_code`,再叠加对象类型过滤
+
+### 节点邻域查询
+
+`topology.get_node` 返回的是一个节点的直接邻域:
+
+- 直接父节点
+- 直接子节点
+- 同父兄弟节点
+
+它不做深层遍历。
+
+### 实体上下文查询
+
+`topology.find_context` 会:
+
+1. 先查 `topology_entity_index`
+2. 对每个命中结果加载节点图关系
+3. 返回当前命中节点、直接父子、祖先链、后代链、同级兄弟
+
+## 拓扑存储和初始化
+
+当前拓扑缓存不是单独写入 JSON 文件,而是落在服务使用的数据库中。
+
+数据库连接来源:
+
+- `instrument_config_mcp/db.py` 中的 `database_url()`
+- 默认值是 `sqlite:///llm_proxy.db`
+- 如果部署时设置了 `DATABASE_URL`,则以环境变量为准
+
+以默认配置为例:
+
+- SQLite 文件位于工作目录下的 `llm_proxy.db`
+- 拓扑缓存表、`sys_config` 表都在同一个数据库中
+
+当前仓库没有单独提供 `.sql` 建表脚本,也没有 migration 目录。
+
+建表行为分两类:
+
+- `scripts/init_local_sys_config.py` 会执行 `Base.metadata.create_all(engine)`,适合部署初始化时一次性创建 ORM 已注册的表
+- 拓扑缓存相关逻辑会在运行前调用 `ensure_topology_cache_tables()`,内部同样使用 `Base.metadata.create_all(sql_engine())`
+
+这意味着:
+
+- 仅启动 MCP 服务本身,不会做一次全局的统一建表
+- 但首次执行拓扑缓存刷新或查询时,会自动确保拓扑缓存表存在
+- 如果希望部署时显式准备好数据库,建议先运行一次初始化脚本,再启动服务
+
+## 手动刷新入口
+
+除了 MCP 工具,当前还额外暴露了一个内部路由:
+
+- `GET /topology/cache/refresh?project_key=...`
+
+对应函数:`refresh_topology_cache_route`
+
+能力:
+
+- 支持通过 query string 触发缓存刷新
+- 支持传多个 `topology_id` 做局部刷新
+
+说明:
+
+- 这个路由设置了 `include_in_schema=False`
+- 它不是正式 MCP 工具,而是给手动测试和运维刷新缓存用的入口
+
+## 当前限制
+
+从当前代码和设计对照看,还存在这些空缺:
+
+- 当前没有将拓扑刷新能力以正式 MCP 工具名暴露出去
+- 当前没有 `topology.get_structure` 这种直接返回整张拓扑缓存结构的工具
+- 当前没有独立刷新状态表,因此无法直接查询某项目最后一次刷新状态
+- 当前也没有对外暴露节点绑定实体列表的专门工具
+
 ## 分页规则
 
 - 默认 `page_size=100`

+ 46 - 5
instrument_config_mcp/config_api.py

@@ -18,17 +18,26 @@ def _post_config(project_key: str, path: str, payload: dict[str, Any]) -> Any:
     )
 
     if not isinstance(response_payload, dict):
-        raise ValueError(f"config API returned invalid payload for {path}: {response_payload}")
+        raise ValueError(
+            f"config API returned invalid payload for {path}: {response_payload}"
+        )
 
     state = response_payload.get("state")
     if str(state) not in {"0", "0.0"}:
         state_info = str(response_payload.get("state_info") or "").strip()
-        raise ValueError(f"config API failed for {path}: {state_info or response_payload}")
+        raise ValueError(
+            f"config API failed for {path}: {state_info or response_payload}"
+        )
 
     return response_payload
 
 
-def list_locations(project_key: str, keyword: str | None = None, page_size: int = 100, page_num: int = 1) -> Any:
+def list_locations(
+    project_key: str,
+    keyword: str | None = None,
+    page_size: int = 100,
+    page_num: int = 1,
+) -> Any:
     return _post_config(
         project_key,
         "/api/configapi/location/list",
@@ -49,7 +58,13 @@ def list_system_tree(project_key: str) -> Any:
     )
 
 
-def list_systems(project_key: str, page_size: int = 100, page_num: int = 1, system_type_id: int = 0, show_below: bool = True) -> Any:
+def list_systems(
+    project_key: str,
+    page_size: int = 100,
+    page_num: int = 1,
+    system_type_id: int = 0,
+    show_below: bool = True,
+) -> Any:
     return _post_config(
         project_key,
         "/api/configapi/system/list",
@@ -141,7 +156,9 @@ def search_meters(
     return _post_config(project_key, "/api/configapi/meter/list", payload)
 
 
-def search_points(project_key: str, id: int, page_size: int = 100, page_num: int = 1) -> Any:
+def search_points(
+    project_key: str, id: int, page_size: int = 100, page_num: int = 1
+) -> Any:
     return _post_config(
         project_key,
         "/api/configapi/meter/search_point",
@@ -152,3 +169,27 @@ def search_points(project_key: str, id: int, page_size: int = 100, page_num: int
             "id": id,
         },
     )
+
+
+def list_topologies_with_group(
+    project_key: str, group_ids: list[int] | None = None
+) -> Any:
+    return _post_config(
+        project_key,
+        "/api/configapi/topo/list_with_group",
+        {
+            "operator": CONFIG_OPERATOR,
+            "group_ids": group_ids or [],
+        },
+    )
+
+
+def get_topology(project_key: str, id: int) -> Any:
+    return _post_config(
+        project_key,
+        "/api/configapi/topo/get",
+        {
+            "operator": CONFIG_OPERATOR,
+            "id": id,
+        },
+    )

+ 115 - 4
instrument_config_mcp/server.py

@@ -4,6 +4,8 @@ import os
 from typing import Any
 
 from fastmcp import FastMCP
+from starlette.requests import Request
+from starlette.responses import JSONResponse, Response
 
 from .auth import load_projects_config
 from .config_api import (
@@ -16,6 +18,13 @@ from .config_api import (
     search_meters as api_search_meters,
     search_points as api_search_points,
 )
+from .topology_cache import (
+    find_topology_context,
+    get_topology_node,
+    list_topologies,
+    list_topology_groups,
+    refresh_topology_cache,
+)
 
 
 mcp = FastMCP("instrument-config")
@@ -63,10 +72,52 @@ def _append_next_page_hint(payload: Any, page_num: int) -> Any:
     return payload
 
 
+def _parse_bool_query(raw_value: str | None) -> bool:
+    text = str(raw_value or "").strip().lower()
+    if text in {"1", "true", "yes", "y", "on"}:
+        return True
+    if text in {"0", "false", "no", "n", "off", ""}:
+        return False
+    raise ValueError(f"invalid boolean query value: {raw_value}")
+
+
+def _parse_int_list_query(values: list[str]) -> list[int]:
+    result: list[int] = []
+    for item in values:
+        text = str(item or "").strip()
+        if not text:
+            continue
+        result.append(int(text))
+    return result
+
+
+@mcp.custom_route("/topology/cache/refresh", methods=["GET"], include_in_schema=False)
+async def refresh_topology_cache_route(request: Request) -> Response:
+    try:
+        project_key = str(request.query_params.get("project_key") or "").strip()
+        if not project_key:
+            raise ValueError("project_key is required")
+
+        topology_ids = _parse_int_list_query(
+            request.query_params.getlist("topology_id")
+        )
+        force = _parse_bool_query(request.query_params.get("force"))
+        del force  # Manual refresh always rebuilds the requested cache scope.
+
+        payload = refresh_topology_cache(project_key, topology_ids=topology_ids or None)
+        return JSONResponse(payload)
+    except Exception as exc:
+        return JSONResponse({"error": str(exc)}, status_code=400)
+
+
 @mcp.tool()
-def list_locations(project_key: str, keyword: str = "", page_size: int = 100, page_num: int = 1) -> Any:
+def list_locations(
+    project_key: str, keyword: str = "", page_size: int = 100, page_num: int = 1
+) -> Any:
     """List location data from the config API."""
-    payload = api_list_locations(project_key, keyword=keyword, page_size=page_size, page_num=page_num)
+    payload = api_list_locations(
+        project_key, keyword=keyword, page_size=page_size, page_num=page_num
+    )
     return _append_next_page_hint(payload, page_num)
 
 
@@ -164,12 +215,72 @@ def search_meters(
 
 
 @mcp.tool()
-def search_points(project_key: str, id: int, page_size: int = 100, page_num: int = 1) -> Any:
+def search_points(
+    project_key: str, id: int, page_size: int = 100, page_num: int = 1
+) -> Any:
     """Search points under a meter by meter id."""
-    payload = api_search_points(project_key, id=id, page_size=page_size, page_num=page_num)
+    payload = api_search_points(
+        project_key, id=id, page_size=page_size, page_num=page_num
+    )
     return _append_next_page_hint(payload, page_num)
 
 
+@mcp.tool(name="topology.group_list")
+def topology_group_list(project_key: str) -> dict[str, Any]:
+    """List cached topology groups as a tree."""
+    return list_topology_groups(project_key)
+
+
+@mcp.tool(name="topology.list")
+def topology_list(
+    project_key: str, group_id: int | None = None, object_type_code: int | None = None
+) -> dict[str, Any]:
+    """List cached topologies, optionally filtered by group or object type."""
+    return list_topologies(
+        project_key, group_id=group_id, object_type_code=object_type_code
+    )
+
+
+@mcp.tool(name="topology.get_node")
+def topology_get_node(
+    project_key: str,
+    topology_id: int,
+    node_id: str,
+    include_siblings: bool = True,
+    include_children: bool = True,
+) -> dict[str, Any]:
+    """Get one cached topology node with its immediate neighborhood."""
+    return get_topology_node(
+        project_key,
+        topology_id,
+        node_id,
+        include_siblings=include_siblings,
+        include_children=include_children,
+    )
+
+
+@mcp.tool(name="topology.find_context")
+def topology_find_context(
+    project_key: str,
+    entity_type: str,
+    entity_id: int,
+    topology_id: int | None = None,
+    include_siblings: bool = True,
+    ancestor_depth: int = 5,
+    descendant_depth: int = 2,
+) -> dict[str, Any]:
+    """Find cached topology context for a device or meter."""
+    return find_topology_context(
+        project_key,
+        entity_type,
+        entity_id,
+        topology_id=topology_id,
+        include_siblings=include_siblings,
+        ancestor_depth=ancestor_depth,
+        descendant_depth=descendant_depth,
+    )
+
+
 def main() -> None:
     host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0"
     port = int(os.getenv("MCP_PORT", "8500"))

+ 1216 - 0
instrument_config_mcp/topology_cache.py

@@ -0,0 +1,1216 @@
+from __future__ import annotations
+
+from collections import defaultdict, deque
+from datetime import datetime, timezone
+from typing import Any
+
+from sqlalchemy import Index, Integer, String, Text, UniqueConstraint, delete, select
+from sqlalchemy.orm import Mapped, Session, mapped_column
+
+from .config_api import list_topologies_with_group as api_list_topologies_with_group
+from .config_api import get_topology as api_get_topology
+from .db import Base, sql_engine
+
+
+def _utc_now_iso() -> str:
+    return datetime.now(timezone.utc).isoformat()
+
+
+def _safe_int(raw_value: Any) -> int | None:
+    if raw_value is None:
+        return None
+    try:
+        return int(str(raw_value).strip())
+    except Exception:
+        return None
+
+
+def _text(raw_value: Any) -> str:
+    return str(raw_value or "").strip()
+
+
+def _bool_as_int(raw_value: Any) -> int:
+    return 1 if bool(raw_value) else 0
+
+
+def _normalize_entity_type(entity_type: str) -> str:
+    normalized = _text(entity_type).lower()
+    if normalized not in {"meter", "device"}:
+        raise ValueError("entity_type must be 'meter' or 'device'")
+    return normalized
+
+
+class TopologyGroup(Base):
+    __tablename__ = "topology_group"
+    __table_args__ = (
+        UniqueConstraint(
+            "project_key", "group_id", name="uq_topology_group_project_group"
+        ),
+        Index("ix_topology_group_project_parent", "project_key", "parent_group_id"),
+    )
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+    project_key: Mapped[str] = mapped_column(String(128), nullable=False)
+    group_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    group_name: Mapped[str] = mapped_column(String(255), nullable=False)
+    parent_group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    group_path_text: Mapped[str] = mapped_column(Text, nullable=False)
+    level: Mapped[int] = mapped_column(Integer, nullable=False)
+    sort_index: Mapped[int] = mapped_column(Integer, nullable=False)
+    refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
+    is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
+
+
+class TopologyRegistry(Base):
+    __tablename__ = "topology_registry"
+    __table_args__ = (
+        UniqueConstraint(
+            "project_key", "topology_id", name="uq_topology_registry_project_topology"
+        ),
+        Index("ix_topology_registry_project_group", "project_key", "group_id"),
+    )
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+    project_key: Mapped[str] = mapped_column(String(128), nullable=False)
+    topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    topology_name: Mapped[str] = mapped_column(String(255), nullable=False)
+    topology_type: Mapped[int] = mapped_column(Integer, nullable=False)
+    object_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    root_shape: Mapped[str] = mapped_column(String(32), nullable=False)
+    source_updated_time: Mapped[str] = mapped_column(
+        String(64), nullable=False, default=""
+    )
+    refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
+    is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
+
+
+class TopologyNode(Base):
+    __tablename__ = "topology_node"
+    __table_args__ = (
+        UniqueConstraint(
+            "project_key",
+            "topology_id",
+            "node_id",
+            name="uq_topology_node_project_topology_node",
+        ),
+        Index(
+            "ix_topology_node_project_topology_parent",
+            "project_key",
+            "topology_id",
+            "parent_node_id",
+        ),
+        Index(
+            "ix_topology_node_project_topology_refer",
+            "project_key",
+            "topology_id",
+            "refer_id",
+        ),
+    )
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+    project_key: Mapped[str] = mapped_column(String(128), nullable=False)
+    topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    node_id: Mapped[str] = mapped_column(Text, nullable=False)
+    node_name: Mapped[str] = mapped_column(Text, nullable=False)
+    parent_node_id: Mapped[str | None] = mapped_column(Text, nullable=True)
+    level: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    node_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    refer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    refer_level: Mapped[int | None] = mapped_column(Integer, nullable=True)
+    is_virtual: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    path_text: Mapped[str] = mapped_column(Text, nullable=False, default="")
+    child_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+    sort_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
+
+
+class TopologyEdge(Base):
+    __tablename__ = "topology_edge"
+    __table_args__ = (
+        UniqueConstraint(
+            "project_key",
+            "topology_id",
+            "source_node_id",
+            "target_node_id",
+            name="uq_topology_edge_project_topology_nodes",
+        ),
+        Index(
+            "ix_topology_edge_project_topology_source",
+            "project_key",
+            "topology_id",
+            "source_node_id",
+        ),
+        Index(
+            "ix_topology_edge_project_topology_target",
+            "project_key",
+            "topology_id",
+            "target_node_id",
+        ),
+    )
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+    project_key: Mapped[str] = mapped_column(String(128), nullable=False)
+    topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    source_node_id: Mapped[str] = mapped_column(Text, nullable=False)
+    target_node_id: Mapped[str] = mapped_column(Text, nullable=False)
+    sort_index: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
+
+
+class TopologyEntityIndex(Base):
+    __tablename__ = "topology_entity_index"
+    __table_args__ = (
+        UniqueConstraint(
+            "project_key",
+            "entity_type",
+            "entity_id",
+            "topology_id",
+            "node_id",
+            name="uq_topology_entity_index_project_entity_topology_node",
+        ),
+        Index(
+            "ix_topology_entity_index_project_entity",
+            "project_key",
+            "entity_type",
+            "entity_id",
+        ),
+        Index(
+            "ix_topology_entity_index_project_topology_node",
+            "project_key",
+            "topology_id",
+            "node_id",
+        ),
+    )
+
+    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
+    project_key: Mapped[str] = mapped_column(String(128), nullable=False)
+    entity_type: Mapped[str] = mapped_column(String(16), nullable=False)
+    entity_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
+    node_id: Mapped[str] = mapped_column(Text, nullable=False)
+    depth: Mapped[int | None] = mapped_column(Integer, nullable=True)
+
+
+def ensure_topology_cache_tables() -> None:
+    Base.metadata.create_all(sql_engine())
+
+
+def _collect_group_and_topology_refs(
+    project_key: str,
+    items: list[Any],
+    *,
+    refreshed_at: str,
+    parent_group_id: int | None = None,
+    parent_group_path: tuple[str, ...] = (),
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
+    group_rows: list[dict[str, Any]] = []
+    topology_refs: list[dict[str, Any]] = []
+
+    for sort_index, item in enumerate(items, start=1):
+        if not isinstance(item, dict):
+            continue
+
+        item_id = _safe_int(item.get("id"))
+        item_name = _text(item.get("name")) or str(item_id or "")
+        item_type = _safe_int(item.get("type"))
+        children = (
+            item.get("children") if isinstance(item.get("children"), list) else []
+        )
+
+        if item_id is None:
+            continue
+
+        if item_type == 1:
+            group_path = (*parent_group_path, item_name)
+            group_rows.append(
+                {
+                    "project_key": project_key,
+                    "group_id": item_id,
+                    "group_name": item_name,
+                    "parent_group_id": parent_group_id,
+                    "group_path_text": " / ".join(group_path),
+                    "level": len(group_path),
+                    "sort_index": sort_index,
+                    "refreshed_at": refreshed_at,
+                    "is_active": 1,
+                }
+            )
+            nested_group_rows, nested_topology_refs = _collect_group_and_topology_refs(
+                project_key,
+                children,
+                refreshed_at=refreshed_at,
+                parent_group_id=item_id,
+                parent_group_path=group_path,
+            )
+            group_rows.extend(nested_group_rows)
+            topology_refs.extend(nested_topology_refs)
+            continue
+
+        topology_refs.append(
+            {
+                "topology_id": item_id,
+                "topology_name": item_name,
+                "group_id": parent_group_id,
+                "sort_index": sort_index,
+            }
+        )
+
+    return group_rows, topology_refs
+
+
+def _as_int_list(raw_value: Any) -> list[int]:
+    if not isinstance(raw_value, list):
+        return []
+    result: list[int] = []
+    for item in raw_value:
+        value = _safe_int(item)
+        if value is None:
+            continue
+        result.append(value)
+    return result
+
+
+def _record_deepest_entities(
+    entity_best: dict[tuple[str, int], tuple[int, list[str]]],
+    entity_node_ids: dict[tuple[str, int], list[str]],
+    entity_type: str,
+    entity_ids: list[int],
+    *,
+    depth: int,
+    node_id: str,
+) -> None:
+    for entity_id in entity_ids:
+        key = (entity_type, entity_id)
+        existing = entity_best.get(key)
+        if existing is None or depth > existing[0]:
+            entity_best[key] = (depth, [node_id])
+            entity_node_ids[key] = [node_id]
+            continue
+        if depth == existing[0] and node_id not in entity_node_ids[key]:
+            entity_node_ids[key].append(node_id)
+
+
+def _parse_tree_topology(
+    project_key: str,
+    topology_id: int,
+    diagram: list[Any],
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
+    node_rows: list[dict[str, Any]] = []
+    edge_rows: list[dict[str, Any]] = []
+    entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
+    entity_node_ids: dict[tuple[str, int], list[str]] = {}
+
+    def visit(
+        node: dict[str, Any],
+        parent_node_id: str | None,
+        path_names: tuple[str, ...],
+        sort_index: int,
+    ) -> None:
+        node_id = _text(node.get("id"))
+        if not node_id:
+            return
+
+        node_name = _text(node.get("name")) or node_id
+        children = (
+            node.get("children") if isinstance(node.get("children"), list) else []
+        )
+        level = _safe_int(node.get("level")) or (len(path_names) + 1)
+        path_text = " / ".join((*path_names, node_name))
+        effective_parent_node_id = parent_node_id or (
+            _text(node.get("parent_id")) or None
+        )
+
+        node_rows.append(
+            {
+                "project_key": project_key,
+                "topology_id": topology_id,
+                "node_id": node_id,
+                "node_name": node_name,
+                "parent_node_id": effective_parent_node_id,
+                "level": level,
+                "node_type_code": _safe_int(node.get("type")),
+                "refer_id": _safe_int(node.get("refer_id")),
+                "refer_level": _safe_int(node.get("refer_level")),
+                "is_virtual": _bool_as_int(node.get("is_virtual")),
+                "path_text": path_text,
+                "child_count": len(children),
+                "sort_index": sort_index,
+            }
+        )
+
+        if effective_parent_node_id:
+            edge_rows.append(
+                {
+                    "project_key": project_key,
+                    "topology_id": topology_id,
+                    "source_node_id": effective_parent_node_id,
+                    "target_node_id": node_id,
+                    "sort_index": sort_index,
+                }
+            )
+
+        _record_deepest_entities(
+            entity_best,
+            entity_node_ids,
+            "meter",
+            _as_int_list(node.get("meter_list")),
+            depth=level,
+            node_id=node_id,
+        )
+        _record_deepest_entities(
+            entity_best,
+            entity_node_ids,
+            "device",
+            _as_int_list(node.get("device_list")),
+            depth=level,
+            node_id=node_id,
+        )
+
+        for child_sort_index, child in enumerate(children, start=1):
+            if not isinstance(child, dict):
+                continue
+            visit(child, node_id, (*path_names, node_name), child_sort_index)
+
+    for root_sort_index, root in enumerate(diagram, start=1):
+        if not isinstance(root, dict):
+            continue
+        visit(root, None, (), root_sort_index)
+
+    entity_rows: list[dict[str, Any]] = []
+    for (entity_type, entity_id), (depth, _) in entity_best.items():
+        for node_id in entity_node_ids[(entity_type, entity_id)]:
+            entity_rows.append(
+                {
+                    "project_key": project_key,
+                    "entity_type": entity_type,
+                    "entity_id": entity_id,
+                    "topology_id": topology_id,
+                    "node_id": node_id,
+                    "depth": depth,
+                }
+            )
+
+    return node_rows, edge_rows, entity_rows
+
+
+def _build_graph_node_context(
+    nodes: list[dict[str, Any]],
+    edges: list[dict[str, Any]],
+) -> tuple[dict[str, int], dict[str, str | None], dict[str, str], dict[str, int]]:
+    incoming: dict[str, list[str]] = defaultdict(list)
+    outgoing: dict[str, list[str]] = defaultdict(list)
+    for edge in edges:
+        source_node_id = _text(edge.get("source"))
+        target_node_id = _text(edge.get("target"))
+        if not source_node_id or not target_node_id:
+            continue
+        outgoing[source_node_id].append(target_node_id)
+        incoming[target_node_id].append(source_node_id)
+
+    node_ids = [_text(node.get("id")) for node in nodes if _text(node.get("id"))]
+    roots = [node_id for node_id in node_ids if not incoming.get(node_id)]
+    if not roots:
+        roots = node_ids[:1]
+
+    level_map: dict[str, int] = {}
+    path_map: dict[str, str] = {}
+    parent_map: dict[str, str | None] = {node_id: None for node_id in node_ids}
+    child_count_map: dict[str, int] = {
+        node_id: len(outgoing.get(node_id, [])) for node_id in node_ids
+    }
+    node_name_map = {
+        _text(node.get("id")): _text(node.get("name")) or _text(node.get("id"))
+        for node in nodes
+    }
+
+    queue: deque[tuple[str, int, str]] = deque()
+    for root_node_id in roots:
+        root_name = node_name_map.get(root_node_id) or root_node_id
+        queue.append((root_node_id, 1, root_name))
+
+    while queue:
+        node_id, depth, path_text = queue.popleft()
+        previous = level_map.get(node_id)
+        if previous is not None and previous >= depth:
+            continue
+        level_map[node_id] = depth
+        path_map[node_id] = path_text
+        for child_node_id in outgoing.get(node_id, []):
+            if parent_map.get(child_node_id) is None:
+                parent_map[child_node_id] = node_id
+            child_name = node_name_map.get(child_node_id) or child_node_id
+            queue.append((child_node_id, depth + 1, f"{path_text} / {child_name}"))
+
+    for node in nodes:
+        node_id = _text(node.get("id"))
+        if not node_id:
+            continue
+        if node_id in level_map:
+            continue
+        fallback_level = _safe_int(node.get("level")) or 1
+        node_name = _text(node.get("name")) or node_id
+        level_map[node_id] = fallback_level
+        path_map[node_id] = node_name
+
+    return level_map, parent_map, path_map, child_count_map
+
+
+def _parse_graph_topology(
+    project_key: str,
+    topology_id: int,
+    diagram: dict[str, Any],
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
+    raw_nodes = [item for item in diagram.get("nodes", []) if isinstance(item, dict)]
+    raw_edges = [item for item in diagram.get("edges", []) if isinstance(item, dict)]
+    level_map, parent_map, path_map, child_count_map = _build_graph_node_context(
+        raw_nodes, raw_edges
+    )
+
+    node_rows: list[dict[str, Any]] = []
+    edge_rows: list[dict[str, Any]] = []
+    entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
+    entity_node_ids: dict[tuple[str, int], list[str]] = {}
+
+    for sort_index, node in enumerate(raw_nodes, start=1):
+        node_id = _text(node.get("id"))
+        if not node_id:
+            continue
+        level = level_map.get(node_id) or _safe_int(node.get("level")) or 1
+        node_rows.append(
+            {
+                "project_key": project_key,
+                "topology_id": topology_id,
+                "node_id": node_id,
+                "node_name": _text(node.get("name")) or node_id,
+                "parent_node_id": parent_map.get(node_id),
+                "level": level,
+                "node_type_code": _safe_int(node.get("type")),
+                "refer_id": _safe_int(node.get("refer_id")),
+                "refer_level": _safe_int(node.get("refer_level")),
+                "is_virtual": _bool_as_int(node.get("is_virtual")),
+                "path_text": path_map.get(node_id, _text(node.get("name")) or node_id),
+                "child_count": child_count_map.get(node_id, 0),
+                "sort_index": sort_index,
+            }
+        )
+
+        _record_deepest_entities(
+            entity_best,
+            entity_node_ids,
+            "meter",
+            _as_int_list(node.get("meter_list")),
+            depth=level,
+            node_id=node_id,
+        )
+        _record_deepest_entities(
+            entity_best,
+            entity_node_ids,
+            "device",
+            _as_int_list(node.get("device_list")),
+            depth=level,
+            node_id=node_id,
+        )
+
+    for sort_index, edge in enumerate(raw_edges, start=1):
+        source_node_id = _text(edge.get("source"))
+        target_node_id = _text(edge.get("target"))
+        if not source_node_id or not target_node_id:
+            continue
+        edge_rows.append(
+            {
+                "project_key": project_key,
+                "topology_id": topology_id,
+                "source_node_id": source_node_id,
+                "target_node_id": target_node_id,
+                "sort_index": sort_index,
+            }
+        )
+
+    entity_rows: list[dict[str, Any]] = []
+    for (entity_type, entity_id), (depth, _) in entity_best.items():
+        for node_id in entity_node_ids[(entity_type, entity_id)]:
+            entity_rows.append(
+                {
+                    "project_key": project_key,
+                    "entity_type": entity_type,
+                    "entity_id": entity_id,
+                    "topology_id": topology_id,
+                    "node_id": node_id,
+                    "depth": depth,
+                }
+            )
+
+    return node_rows, edge_rows, entity_rows
+
+
+def refresh_topology_cache(
+    project_key: str, topology_ids: list[int] | None = None
+) -> dict[str, Any]:
+    project_key = _text(project_key)
+    if not project_key:
+        raise ValueError("project_key is required")
+
+    ensure_topology_cache_tables()
+
+    refreshed_at = _utc_now_iso()
+    list_payload = api_list_topologies_with_group(project_key, group_ids=[])
+    raw_items = list_payload.get("data")
+    if not isinstance(raw_items, list):
+        raise ValueError("topology list returned invalid data")
+
+    group_rows, topology_refs = _collect_group_and_topology_refs(
+        project_key, raw_items, refreshed_at=refreshed_at
+    )
+    requested_topology_ids = {item for item in topology_ids or []}
+    available_topology_map = {item["topology_id"]: item for item in topology_refs}
+
+    if requested_topology_ids:
+        missing = sorted(requested_topology_ids - set(available_topology_map))
+        if missing:
+            raise ValueError(
+                f"topology_id not found in upstream topology list: {missing}"
+            )
+        selected_topology_refs = [
+            available_topology_map[item] for item in topology_ids or []
+        ]
+    else:
+        selected_topology_refs = topology_refs
+
+    registry_rows: list[dict[str, Any]] = []
+    node_rows: list[dict[str, Any]] = []
+    edge_rows: list[dict[str, Any]] = []
+    entity_rows: list[dict[str, Any]] = []
+
+    for topology_ref in selected_topology_refs:
+        topology_id = topology_ref["topology_id"]
+        detail_payload = api_get_topology(project_key, topology_id)
+        detail_data = detail_payload.get("data")
+        if not isinstance(detail_data, dict):
+            raise ValueError(
+                f"topology get returned invalid data for topology_id={topology_id}"
+            )
+
+        diagram = detail_data.get("diagram")
+        if isinstance(diagram, list):
+            root_shape = "tree"
+            topology_node_rows, topology_edge_rows, topology_entity_rows = (
+                _parse_tree_topology(project_key, topology_id, diagram)
+            )
+        elif isinstance(diagram, dict):
+            root_shape = "graph"
+            topology_node_rows, topology_edge_rows, topology_entity_rows = (
+                _parse_graph_topology(project_key, topology_id, diagram)
+            )
+        else:
+            root_shape = "tree"
+            topology_node_rows, topology_edge_rows, topology_entity_rows = ([], [], [])
+
+        registry_rows.append(
+            {
+                "project_key": project_key,
+                "topology_id": topology_id,
+                "topology_name": _text(detail_data.get("name"))
+                or topology_ref["topology_name"],
+                "topology_type": _safe_int(detail_data.get("type")) or 0,
+                "object_type_code": _safe_int(detail_data.get("object")),
+                "group_id": _safe_int(detail_data.get("group_id"))
+                or topology_ref.get("group_id"),
+                "root_shape": root_shape,
+                "source_updated_time": _text(detail_data.get("updated_time")),
+                "refreshed_at": refreshed_at,
+                "is_active": 1,
+            }
+        )
+        node_rows.extend(topology_node_rows)
+        edge_rows.extend(topology_edge_rows)
+        entity_rows.extend(topology_entity_rows)
+
+    with Session(sql_engine()) as session:
+        if requested_topology_ids:
+            session.execute(
+                delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
+            )
+            session.add_all(TopologyGroup(**row) for row in group_rows)
+
+            session.execute(
+                delete(TopologyEntityIndex).where(
+                    TopologyEntityIndex.project_key == project_key,
+                    TopologyEntityIndex.topology_id.in_(requested_topology_ids),
+                )
+            )
+            session.execute(
+                delete(TopologyEdge).where(
+                    TopologyEdge.project_key == project_key,
+                    TopologyEdge.topology_id.in_(requested_topology_ids),
+                )
+            )
+            session.execute(
+                delete(TopologyNode).where(
+                    TopologyNode.project_key == project_key,
+                    TopologyNode.topology_id.in_(requested_topology_ids),
+                )
+            )
+            session.execute(
+                delete(TopologyRegistry).where(
+                    TopologyRegistry.project_key == project_key,
+                    TopologyRegistry.topology_id.in_(requested_topology_ids),
+                )
+            )
+        else:
+            session.execute(
+                delete(TopologyEntityIndex).where(
+                    TopologyEntityIndex.project_key == project_key
+                )
+            )
+            session.execute(
+                delete(TopologyEdge).where(TopologyEdge.project_key == project_key)
+            )
+            session.execute(
+                delete(TopologyNode).where(TopologyNode.project_key == project_key)
+            )
+            session.execute(
+                delete(TopologyRegistry).where(
+                    TopologyRegistry.project_key == project_key
+                )
+            )
+            session.execute(
+                delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
+            )
+            session.add_all(TopologyGroup(**row) for row in group_rows)
+
+        session.add_all(TopologyRegistry(**row) for row in registry_rows)
+        session.add_all(TopologyNode(**row) for row in node_rows)
+        session.add_all(TopologyEdge(**row) for row in edge_rows)
+        session.add_all(TopologyEntityIndex(**row) for row in entity_rows)
+        session.commit()
+
+    result = {
+        "project_key": project_key,
+        "refreshed_group_count": len(group_rows),
+        "refreshed_topology_count": len(registry_rows),
+        "refreshed_node_count": len(node_rows),
+        "refreshed_edge_count": len(edge_rows),
+        "refreshed_entity_index_count": len(entity_rows),
+        "topology_ids": [row["topology_id"] for row in registry_rows],
+        "refreshed_at": refreshed_at,
+    }
+    if not registry_rows:
+        result["mcp_note"] = (
+            f"Project '{project_key}' currently has no topology data in upstream config."
+        )
+    return result
+
+
+def _load_group_rows(session: Session, project_key: str) -> list[TopologyGroup]:
+    return list(
+        session.scalars(
+            select(TopologyGroup)
+            .where(TopologyGroup.project_key == project_key)
+            .order_by(
+                TopologyGroup.level.asc(),
+                TopologyGroup.sort_index.asc(),
+                TopologyGroup.group_id.asc(),
+            )
+        )
+    )
+
+
+def _load_registry_rows(session: Session, project_key: str) -> list[TopologyRegistry]:
+    return list(
+        session.scalars(
+            select(TopologyRegistry)
+            .where(TopologyRegistry.project_key == project_key)
+            .order_by(
+                TopologyRegistry.topology_name.asc(), TopologyRegistry.topology_id.asc()
+            )
+        )
+    )
+
+
+def _has_topology_cache(session: Session, project_key: str) -> bool:
+    registry_rows = _load_registry_rows(session, project_key)
+    return bool(registry_rows)
+
+
+def list_topology_groups(project_key: str) -> dict[str, Any]:
+    project_key = _text(project_key)
+    if not project_key:
+        raise ValueError("project_key is required")
+
+    ensure_topology_cache_tables()
+    with Session(sql_engine()) as session:
+        if not _has_topology_cache(session, project_key):
+            return {
+                "project_key": project_key,
+                "groups": [],
+                "total": 0,
+                "mcp_note": (
+                    f"Project '{project_key}' has no cached topologies. Refresh it via "
+                    f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
+                    "already succeeded, the upstream project likely has no topology data."
+                ),
+            }
+        group_rows = _load_group_rows(session, project_key)
+
+    children_by_parent: dict[int | None, list[dict[str, Any]]] = defaultdict(list)
+    node_by_group_id: dict[int, dict[str, Any]] = {}
+    for group_row in group_rows:
+        payload = {
+            "group_id": group_row.group_id,
+            "group_name": group_row.group_name,
+            "parent_group_id": group_row.parent_group_id,
+            "level": group_row.level,
+            "group_path_text": group_row.group_path_text,
+            "children": [],
+        }
+        node_by_group_id[group_row.group_id] = payload
+        children_by_parent[group_row.parent_group_id].append(payload)
+
+    for group_payload in node_by_group_id.values():
+        group_payload["children"] = children_by_parent.get(
+            group_payload["group_id"], []
+        )
+
+    return {
+        "project_key": project_key,
+        "groups": children_by_parent.get(None, []),
+        "total": len(group_rows),
+    }
+
+
+def _group_path_map(group_rows: list[TopologyGroup]) -> dict[int, str]:
+    return {group_row.group_id: group_row.group_path_text for group_row in group_rows}
+
+
+def _collect_descendant_group_ids(
+    group_rows: list[TopologyGroup], group_id: int
+) -> set[int]:
+    group_children: dict[int | None, list[int]] = defaultdict(list)
+    for row in group_rows:
+        group_children[row.parent_group_id].append(row.group_id)
+
+    result: set[int] = set()
+    queue: deque[int] = deque([group_id])
+    while queue:
+        current_group_id = queue.popleft()
+        if current_group_id in result:
+            continue
+        result.add(current_group_id)
+        queue.extend(group_children.get(current_group_id, []))
+    return result
+
+
+def list_topologies(
+    project_key: str,
+    *,
+    group_id: int | None = None,
+    object_type_code: int | None = None,
+) -> dict[str, Any]:
+    project_key = _text(project_key)
+    if not project_key:
+        raise ValueError("project_key is required")
+
+    ensure_topology_cache_tables()
+    with Session(sql_engine()) as session:
+        if not _has_topology_cache(session, project_key):
+            return {
+                "project_key": project_key,
+                "topologies": [],
+                "total": 0,
+                "mcp_note": (
+                    f"Project '{project_key}' has no cached topologies. Refresh it via "
+                    f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
+                    "already succeeded, the upstream project likely has no topology data."
+                ),
+            }
+        group_rows = _load_group_rows(session, project_key)
+        registry_rows = _load_registry_rows(session, project_key)
+
+    group_path_map = _group_path_map(group_rows)
+    allowed_group_ids: set[int] | None = None
+    if group_id is not None:
+        allowed_group_ids = _collect_descendant_group_ids(group_rows, group_id)
+
+    topologies: list[dict[str, Any]] = []
+    for registry_row in registry_rows:
+        if (
+            allowed_group_ids is not None
+            and registry_row.group_id not in allowed_group_ids
+        ):
+            continue
+        if (
+            object_type_code is not None
+            and registry_row.object_type_code != object_type_code
+        ):
+            continue
+        topologies.append(
+            {
+                "topology_id": registry_row.topology_id,
+                "topology_name": registry_row.topology_name,
+                "topology_type": registry_row.topology_type,
+                "object_type_code": registry_row.object_type_code,
+                "group_id": registry_row.group_id,
+                "group_path_text": group_path_map.get(registry_row.group_id or -1, ""),
+                "root_shape": registry_row.root_shape,
+                "refreshed_at": registry_row.refreshed_at,
+            }
+        )
+
+    return {
+        "project_key": project_key,
+        "topologies": topologies,
+        "total": len(topologies),
+    }
+
+
+def _get_registry_or_error(
+    session: Session, project_key: str, topology_id: int
+) -> TopologyRegistry:
+    registry_row = session.scalar(
+        select(TopologyRegistry).where(
+            TopologyRegistry.project_key == project_key,
+            TopologyRegistry.topology_id == topology_id,
+        )
+    )
+    if registry_row is None:
+        raise ValueError(f"topology_id not found in cache: {topology_id}")
+    return registry_row
+
+
+def _get_node_or_error(
+    session: Session, project_key: str, topology_id: int, node_id: str
+) -> TopologyNode:
+    node_row = session.scalar(
+        select(TopologyNode).where(
+            TopologyNode.project_key == project_key,
+            TopologyNode.topology_id == topology_id,
+            TopologyNode.node_id == node_id,
+        )
+    )
+    if node_row is None:
+        raise ValueError(f"node_id not found in cache: {node_id}")
+    return node_row
+
+
+def _node_payload(node_row: TopologyNode) -> dict[str, Any]:
+    return {
+        "node_id": node_row.node_id,
+        "node_name": node_row.node_name,
+        "level": node_row.level,
+        "parent_node_id": node_row.parent_node_id,
+        "refer_id": node_row.refer_id,
+        "refer_level": node_row.refer_level,
+        "is_virtual": bool(node_row.is_virtual),
+        "path_text": node_row.path_text,
+        "child_count": node_row.child_count,
+    }
+
+
+def _load_node_map(
+    session: Session, project_key: str, topology_id: int
+) -> dict[str, TopologyNode]:
+    rows = session.scalars(
+        select(TopologyNode).where(
+            TopologyNode.project_key == project_key,
+            TopologyNode.topology_id == topology_id,
+        )
+    )
+    return {row.node_id: row for row in rows}
+
+
+def _load_adjacency(
+    session: Session, project_key: str, topology_id: int
+) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
+    edges = session.scalars(
+        select(TopologyEdge)
+        .where(
+            TopologyEdge.project_key == project_key,
+            TopologyEdge.topology_id == topology_id,
+        )
+        .order_by(TopologyEdge.sort_index.asc(), TopologyEdge.id.asc())
+    )
+    parents_by_node: dict[str, list[str]] = defaultdict(list)
+    children_by_node: dict[str, list[str]] = defaultdict(list)
+    for edge in edges:
+        parents_by_node[edge.target_node_id].append(edge.source_node_id)
+        children_by_node[edge.source_node_id].append(edge.target_node_id)
+    return parents_by_node, children_by_node
+
+
+def _dedupe_preserve_order(node_ids: list[str]) -> list[str]:
+    result: list[str] = []
+    seen: set[str] = set()
+    for node_id in node_ids:
+        if node_id in seen:
+            continue
+        seen.add(node_id)
+        result.append(node_id)
+    return result
+
+
+def _node_list_payload(
+    node_map: dict[str, TopologyNode], node_ids: list[str]
+) -> list[dict[str, Any]]:
+    payload: list[dict[str, Any]] = []
+    for node_id in _dedupe_preserve_order(node_ids):
+        node_row = node_map.get(node_id)
+        if node_row is None:
+            continue
+        payload.append(_node_payload(node_row))
+    return payload
+
+
+def _collect_ancestors(
+    node_map: dict[str, TopologyNode],
+    parents_by_node: dict[str, list[str]],
+    node_id: str,
+    depth_limit: int,
+) -> list[dict[str, Any]]:
+    if depth_limit <= 0:
+        return []
+
+    visited: dict[str, int] = {}
+    queue: deque[tuple[str, int]] = deque(
+        (parent_id, 1) for parent_id in parents_by_node.get(node_id, [])
+    )
+    while queue:
+        current_node_id, distance = queue.popleft()
+        if distance > depth_limit:
+            continue
+        previous_distance = visited.get(current_node_id)
+        if previous_distance is not None and previous_distance <= distance:
+            continue
+        visited[current_node_id] = distance
+        for parent_id in parents_by_node.get(current_node_id, []):
+            queue.append((parent_id, distance + 1))
+
+    ordered_ids = sorted(
+        visited,
+        key=lambda item: (
+            -visited[item],
+            node_map[item].path_text or node_map[item].node_name,
+            item,
+        ),
+    )
+    result: list[dict[str, Any]] = []
+    for current_node_id in ordered_ids:
+        node_payload = _node_payload(node_map[current_node_id])
+        node_payload["distance"] = visited[current_node_id]
+        result.append(node_payload)
+    return result
+
+
+def _collect_descendants(
+    node_map: dict[str, TopologyNode],
+    children_by_node: dict[str, list[str]],
+    node_id: str,
+    depth_limit: int,
+) -> list[dict[str, Any]]:
+    if depth_limit <= 0:
+        return []
+
+    visited: dict[str, int] = {}
+    queue: deque[tuple[str, int]] = deque(
+        (child_id, 1) for child_id in children_by_node.get(node_id, [])
+    )
+    while queue:
+        current_node_id, distance = queue.popleft()
+        if distance > depth_limit:
+            continue
+        previous_distance = visited.get(current_node_id)
+        if previous_distance is not None and previous_distance <= distance:
+            continue
+        visited[current_node_id] = distance
+        for child_id in children_by_node.get(current_node_id, []):
+            queue.append((child_id, distance + 1))
+
+    ordered_ids = sorted(
+        visited,
+        key=lambda item: (
+            visited[item],
+            node_map[item].path_text or node_map[item].node_name,
+            item,
+        ),
+    )
+    result: list[dict[str, Any]] = []
+    for current_node_id in ordered_ids:
+        node_payload = _node_payload(node_map[current_node_id])
+        node_payload["distance"] = visited[current_node_id]
+        result.append(node_payload)
+    return result
+
+
+def _topology_metadata_payload(
+    registry_row: TopologyRegistry, group_path_text: str
+) -> dict[str, Any]:
+    return {
+        "topology_id": registry_row.topology_id,
+        "topology_name": registry_row.topology_name,
+        "topology_type": registry_row.topology_type,
+        "object_type_code": registry_row.object_type_code,
+        "group_id": registry_row.group_id,
+        "group_path_text": group_path_text,
+        "root_shape": registry_row.root_shape,
+    }
+
+
+def get_topology_node(
+    project_key: str,
+    topology_id: int,
+    node_id: str,
+    *,
+    include_siblings: bool = True,
+    include_children: bool = True,
+) -> dict[str, Any]:
+    project_key = _text(project_key)
+    node_id = _text(node_id)
+    if not project_key:
+        raise ValueError("project_key is required")
+    if not node_id:
+        raise ValueError("node_id is required")
+
+    ensure_topology_cache_tables()
+    with Session(sql_engine()) as session:
+        _require_topology_cache(session, project_key)
+        group_rows = _load_group_rows(session, project_key)
+        group_path_map = _group_path_map(group_rows)
+        registry_row = _get_registry_or_error(session, project_key, topology_id)
+        node_row = _get_node_or_error(session, project_key, topology_id, node_id)
+        node_map = _load_node_map(session, project_key, topology_id)
+        parents_by_node, children_by_node = _load_adjacency(
+            session, project_key, topology_id
+        )
+
+    parent_ids = parents_by_node.get(node_id, [])
+    child_ids = children_by_node.get(node_id, []) if include_children else []
+    sibling_ids: list[str] = []
+    if include_siblings and len(parent_ids) == 1:
+        sibling_ids = [
+            candidate
+            for candidate in children_by_node.get(parent_ids[0], [])
+            if candidate != node_id
+        ]
+
+    return {
+        "topology": _topology_metadata_payload(
+            registry_row, group_path_map.get(registry_row.group_id or -1, "")
+        ),
+        "node": _node_payload(node_row),
+        "parents": _node_list_payload(node_map, parent_ids),
+        "children": _node_list_payload(node_map, child_ids),
+        "siblings": _node_list_payload(node_map, sibling_ids),
+    }
+
+
+def find_topology_context(
+    project_key: str,
+    entity_type: str,
+    entity_id: int,
+    *,
+    topology_id: int | None = None,
+    include_siblings: bool = True,
+    ancestor_depth: int = 5,
+    descendant_depth: int = 2,
+) -> dict[str, Any]:
+    project_key = _text(project_key)
+    normalized_entity_type = _normalize_entity_type(entity_type)
+    if not project_key:
+        raise ValueError("project_key is required")
+    if entity_id <= 0:
+        raise ValueError("entity_id must be a positive integer")
+
+    ensure_topology_cache_tables()
+    with Session(sql_engine()) as session:
+        _require_topology_cache(session, project_key)
+        group_rows = _load_group_rows(session, project_key)
+        group_path_map = _group_path_map(group_rows)
+
+        query = select(TopologyEntityIndex).where(
+            TopologyEntityIndex.project_key == project_key,
+            TopologyEntityIndex.entity_type == normalized_entity_type,
+            TopologyEntityIndex.entity_id == entity_id,
+        )
+        if topology_id is not None:
+            query = query.where(TopologyEntityIndex.topology_id == topology_id)
+        index_rows = list(
+            session.scalars(
+                query.order_by(
+                    TopologyEntityIndex.depth.desc(),
+                    TopologyEntityIndex.topology_id.asc(),
+                )
+            )
+        )
+
+        matches: list[dict[str, Any]] = []
+        topology_node_maps: dict[int, dict[str, TopologyNode]] = {}
+        topology_adjacency: dict[
+            int, tuple[dict[str, list[str]], dict[str, list[str]]]
+        ] = {}
+        topology_registry_rows: dict[int, TopologyRegistry] = {}
+
+        for index_row in index_rows:
+            current_topology_id = index_row.topology_id
+            if current_topology_id not in topology_registry_rows:
+                topology_registry_rows[current_topology_id] = _get_registry_or_error(
+                    session, project_key, current_topology_id
+                )
+                topology_node_maps[current_topology_id] = _load_node_map(
+                    session, project_key, current_topology_id
+                )
+                topology_adjacency[current_topology_id] = _load_adjacency(
+                    session, project_key, current_topology_id
+                )
+
+            node_map = topology_node_maps[current_topology_id]
+            node_row = node_map.get(index_row.node_id)
+            if node_row is None:
+                continue
+
+            parents_by_node, children_by_node = topology_adjacency[current_topology_id]
+            parent_ids = parents_by_node.get(index_row.node_id, [])
+            child_ids = children_by_node.get(index_row.node_id, [])
+            sibling_ids: list[str] = []
+            if include_siblings and len(parent_ids) == 1:
+                sibling_ids = [
+                    candidate
+                    for candidate in children_by_node.get(parent_ids[0], [])
+                    if candidate != index_row.node_id
+                ]
+
+            matches.append(
+                {
+                    "topology": _topology_metadata_payload(
+                        topology_registry_rows[current_topology_id],
+                        group_path_map.get(
+                            topology_registry_rows[current_topology_id].group_id or -1,
+                            "",
+                        ),
+                    ),
+                    "self": _node_payload(node_row),
+                    "parents": _node_list_payload(node_map, parent_ids),
+                    "children": _node_list_payload(node_map, child_ids),
+                    "ancestors": _collect_ancestors(
+                        node_map,
+                        parents_by_node,
+                        index_row.node_id,
+                        max(ancestor_depth, 0),
+                    ),
+                    "descendants": _collect_descendants(
+                        node_map,
+                        children_by_node,
+                        index_row.node_id,
+                        max(descendant_depth, 0),
+                    ),
+                    "siblings": _node_list_payload(node_map, sibling_ids),
+                }
+            )
+
+    return {
+        "query": {
+            "project_key": project_key,
+            "entity_type": normalized_entity_type,
+            "entity_id": entity_id,
+            "topology_id": topology_id,
+        },
+        "matches": matches,
+        "total_matches": len(matches),
+    }