topology_cache.py 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226
  1. from __future__ import annotations
  2. from collections import defaultdict, deque
  3. from datetime import datetime, timezone
  4. from typing import Any
  5. from sqlalchemy import Index, Integer, String, Text, UniqueConstraint, delete, select
  6. from sqlalchemy.orm import Mapped, Session, mapped_column
  7. from .config_api import list_topologies_with_group as api_list_topologies_with_group
  8. from .config_api import get_topology as api_get_topology
  9. from .db import Base, sql_engine
  10. def _utc_now_iso() -> str:
  11. return datetime.now(timezone.utc).isoformat()
  12. def _safe_int(raw_value: Any) -> int | None:
  13. if raw_value is None:
  14. return None
  15. try:
  16. return int(str(raw_value).strip())
  17. except Exception:
  18. return None
  19. def _text(raw_value: Any) -> str:
  20. return str(raw_value or "").strip()
  21. def _bool_as_int(raw_value: Any) -> int:
  22. return 1 if bool(raw_value) else 0
  23. def _normalize_entity_type(entity_type: str) -> str:
  24. normalized = _text(entity_type).lower()
  25. if normalized not in {"meter", "device"}:
  26. raise ValueError("entity_type must be 'meter' or 'device'")
  27. return normalized
  28. class TopologyGroup(Base):
  29. __tablename__ = "topology_group"
  30. __table_args__ = (
  31. UniqueConstraint(
  32. "project_key", "group_id", name="uq_topology_group_project_group"
  33. ),
  34. Index("ix_topology_group_project_parent", "project_key", "parent_group_id"),
  35. )
  36. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  37. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  38. group_id: Mapped[int] = mapped_column(Integer, nullable=False)
  39. group_name: Mapped[str] = mapped_column(String(255), nullable=False)
  40. parent_group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  41. group_path_text: Mapped[str] = mapped_column(Text, nullable=False)
  42. level: Mapped[int] = mapped_column(Integer, nullable=False)
  43. sort_index: Mapped[int] = mapped_column(Integer, nullable=False)
  44. refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
  45. is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
  46. class TopologyRegistry(Base):
  47. __tablename__ = "topology_registry"
  48. __table_args__ = (
  49. UniqueConstraint(
  50. "project_key", "topology_id", name="uq_topology_registry_project_topology"
  51. ),
  52. Index("ix_topology_registry_project_group", "project_key", "group_id"),
  53. )
  54. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  55. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  56. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  57. topology_name: Mapped[str] = mapped_column(String(255), nullable=False)
  58. topology_type: Mapped[int] = mapped_column(Integer, nullable=False)
  59. object_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
  60. group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  61. root_shape: Mapped[str] = mapped_column(String(32), nullable=False)
  62. source_updated_time: Mapped[str] = mapped_column(
  63. String(64), nullable=False, default=""
  64. )
  65. refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
  66. is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
  67. class TopologyNode(Base):
  68. __tablename__ = "topology_node"
  69. __table_args__ = (
  70. UniqueConstraint(
  71. "project_key",
  72. "topology_id",
  73. "node_id",
  74. name="uq_topology_node_project_topology_node",
  75. ),
  76. Index(
  77. "ix_topology_node_project_topology_parent",
  78. "project_key",
  79. "topology_id",
  80. "parent_node_id",
  81. ),
  82. Index(
  83. "ix_topology_node_project_topology_refer",
  84. "project_key",
  85. "topology_id",
  86. "refer_id",
  87. ),
  88. )
  89. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  90. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  91. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  92. node_id: Mapped[str] = mapped_column(Text, nullable=False)
  93. node_name: Mapped[str] = mapped_column(Text, nullable=False)
  94. parent_node_id: Mapped[str | None] = mapped_column(Text, nullable=True)
  95. level: Mapped[int | None] = mapped_column(Integer, nullable=True)
  96. node_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
  97. refer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  98. refer_level: Mapped[int | None] = mapped_column(Integer, nullable=True)
  99. is_virtual: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  100. path_text: Mapped[str] = mapped_column(Text, nullable=False, default="")
  101. child_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  102. sort_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
  103. class TopologyEdge(Base):
  104. __tablename__ = "topology_edge"
  105. __table_args__ = (
  106. UniqueConstraint(
  107. "project_key",
  108. "topology_id",
  109. "source_node_id",
  110. "target_node_id",
  111. name="uq_topology_edge_project_topology_nodes",
  112. ),
  113. Index(
  114. "ix_topology_edge_project_topology_source",
  115. "project_key",
  116. "topology_id",
  117. "source_node_id",
  118. ),
  119. Index(
  120. "ix_topology_edge_project_topology_target",
  121. "project_key",
  122. "topology_id",
  123. "target_node_id",
  124. ),
  125. )
  126. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  127. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  128. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  129. source_node_id: Mapped[str] = mapped_column(Text, nullable=False)
  130. target_node_id: Mapped[str] = mapped_column(Text, nullable=False)
  131. sort_index: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  132. class TopologyEntityIndex(Base):
  133. __tablename__ = "topology_entity_index"
  134. __table_args__ = (
  135. UniqueConstraint(
  136. "project_key",
  137. "entity_type",
  138. "entity_id",
  139. "topology_id",
  140. "node_id",
  141. name="uq_topology_entity_index_project_entity_topology_node",
  142. ),
  143. Index(
  144. "ix_topology_entity_index_project_entity",
  145. "project_key",
  146. "entity_type",
  147. "entity_id",
  148. ),
  149. Index(
  150. "ix_topology_entity_index_project_topology_node",
  151. "project_key",
  152. "topology_id",
  153. "node_id",
  154. ),
  155. )
  156. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  157. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  158. entity_type: Mapped[str] = mapped_column(String(16), nullable=False)
  159. entity_id: Mapped[int] = mapped_column(Integer, nullable=False)
  160. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  161. node_id: Mapped[str] = mapped_column(Text, nullable=False)
  162. depth: Mapped[int | None] = mapped_column(Integer, nullable=True)
  163. def ensure_topology_cache_tables() -> None:
  164. Base.metadata.create_all(sql_engine())
  165. def _collect_group_and_topology_refs(
  166. project_key: str,
  167. items: list[Any],
  168. *,
  169. refreshed_at: str,
  170. parent_group_id: int | None = None,
  171. parent_group_path: tuple[str, ...] = (),
  172. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
  173. group_rows: list[dict[str, Any]] = []
  174. topology_refs: list[dict[str, Any]] = []
  175. for sort_index, item in enumerate(items, start=1):
  176. if not isinstance(item, dict):
  177. continue
  178. item_id = _safe_int(item.get("id"))
  179. item_name = _text(item.get("name")) or str(item_id or "")
  180. item_type = _safe_int(item.get("type"))
  181. children = (
  182. item.get("children") if isinstance(item.get("children"), list) else []
  183. )
  184. if item_id is None:
  185. continue
  186. if item_type == 1:
  187. group_path = (*parent_group_path, item_name)
  188. group_rows.append(
  189. {
  190. "project_key": project_key,
  191. "group_id": item_id,
  192. "group_name": item_name,
  193. "parent_group_id": parent_group_id,
  194. "group_path_text": " / ".join(group_path),
  195. "level": len(group_path),
  196. "sort_index": sort_index,
  197. "refreshed_at": refreshed_at,
  198. "is_active": 1,
  199. }
  200. )
  201. nested_group_rows, nested_topology_refs = _collect_group_and_topology_refs(
  202. project_key,
  203. children,
  204. refreshed_at=refreshed_at,
  205. parent_group_id=item_id,
  206. parent_group_path=group_path,
  207. )
  208. group_rows.extend(nested_group_rows)
  209. topology_refs.extend(nested_topology_refs)
  210. continue
  211. topology_refs.append(
  212. {
  213. "topology_id": item_id,
  214. "topology_name": item_name,
  215. "group_id": parent_group_id,
  216. "sort_index": sort_index,
  217. }
  218. )
  219. return group_rows, topology_refs
  220. def _as_int_list(raw_value: Any) -> list[int]:
  221. if not isinstance(raw_value, list):
  222. return []
  223. result: list[int] = []
  224. for item in raw_value:
  225. value = _safe_int(item)
  226. if value is None:
  227. continue
  228. result.append(value)
  229. return result
  230. def _record_deepest_entities(
  231. entity_best: dict[tuple[str, int], tuple[int, list[str]]],
  232. entity_node_ids: dict[tuple[str, int], list[str]],
  233. entity_type: str,
  234. entity_ids: list[int],
  235. *,
  236. depth: int,
  237. node_id: str,
  238. ) -> None:
  239. for entity_id in entity_ids:
  240. key = (entity_type, entity_id)
  241. existing = entity_best.get(key)
  242. if existing is None or depth > existing[0]:
  243. entity_best[key] = (depth, [node_id])
  244. entity_node_ids[key] = [node_id]
  245. continue
  246. if depth == existing[0] and node_id not in entity_node_ids[key]:
  247. entity_node_ids[key].append(node_id)
  248. def _parse_tree_topology(
  249. project_key: str,
  250. topology_id: int,
  251. diagram: list[Any],
  252. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
  253. node_rows: list[dict[str, Any]] = []
  254. edge_rows: list[dict[str, Any]] = []
  255. entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
  256. entity_node_ids: dict[tuple[str, int], list[str]] = {}
  257. def visit(
  258. node: dict[str, Any],
  259. parent_node_id: str | None,
  260. path_names: tuple[str, ...],
  261. sort_index: int,
  262. ) -> None:
  263. node_id = _text(node.get("id"))
  264. if not node_id:
  265. return
  266. node_name = _text(node.get("name")) or node_id
  267. children = (
  268. node.get("children") if isinstance(node.get("children"), list) else []
  269. )
  270. level = _safe_int(node.get("level")) or (len(path_names) + 1)
  271. path_text = " / ".join((*path_names, node_name))
  272. effective_parent_node_id = parent_node_id or (
  273. _text(node.get("parent_id")) or None
  274. )
  275. node_rows.append(
  276. {
  277. "project_key": project_key,
  278. "topology_id": topology_id,
  279. "node_id": node_id,
  280. "node_name": node_name,
  281. "parent_node_id": effective_parent_node_id,
  282. "level": level,
  283. "node_type_code": _safe_int(node.get("type")),
  284. "refer_id": _safe_int(node.get("refer_id")),
  285. "refer_level": _safe_int(node.get("refer_level")),
  286. "is_virtual": _bool_as_int(node.get("is_virtual")),
  287. "path_text": path_text,
  288. "child_count": len(children),
  289. "sort_index": sort_index,
  290. }
  291. )
  292. if effective_parent_node_id:
  293. edge_rows.append(
  294. {
  295. "project_key": project_key,
  296. "topology_id": topology_id,
  297. "source_node_id": effective_parent_node_id,
  298. "target_node_id": node_id,
  299. "sort_index": sort_index,
  300. }
  301. )
  302. _record_deepest_entities(
  303. entity_best,
  304. entity_node_ids,
  305. "meter",
  306. _as_int_list(node.get("meter_list")),
  307. depth=level,
  308. node_id=node_id,
  309. )
  310. _record_deepest_entities(
  311. entity_best,
  312. entity_node_ids,
  313. "device",
  314. _as_int_list(node.get("device_list")),
  315. depth=level,
  316. node_id=node_id,
  317. )
  318. for child_sort_index, child in enumerate(children, start=1):
  319. if not isinstance(child, dict):
  320. continue
  321. visit(child, node_id, (*path_names, node_name), child_sort_index)
  322. for root_sort_index, root in enumerate(diagram, start=1):
  323. if not isinstance(root, dict):
  324. continue
  325. visit(root, None, (), root_sort_index)
  326. entity_rows: list[dict[str, Any]] = []
  327. for (entity_type, entity_id), (depth, _) in entity_best.items():
  328. for node_id in entity_node_ids[(entity_type, entity_id)]:
  329. entity_rows.append(
  330. {
  331. "project_key": project_key,
  332. "entity_type": entity_type,
  333. "entity_id": entity_id,
  334. "topology_id": topology_id,
  335. "node_id": node_id,
  336. "depth": depth,
  337. }
  338. )
  339. return node_rows, edge_rows, entity_rows
  340. def _build_graph_node_context(
  341. nodes: list[dict[str, Any]],
  342. edges: list[dict[str, Any]],
  343. ) -> tuple[dict[str, int], dict[str, str | None], dict[str, str], dict[str, int]]:
  344. incoming: dict[str, list[str]] = defaultdict(list)
  345. outgoing: dict[str, list[str]] = defaultdict(list)
  346. for edge in edges:
  347. source_node_id = _text(edge.get("source"))
  348. target_node_id = _text(edge.get("target"))
  349. if not source_node_id or not target_node_id:
  350. continue
  351. outgoing[source_node_id].append(target_node_id)
  352. incoming[target_node_id].append(source_node_id)
  353. node_ids = [_text(node.get("id")) for node in nodes if _text(node.get("id"))]
  354. roots = [node_id for node_id in node_ids if not incoming.get(node_id)]
  355. if not roots:
  356. roots = node_ids[:1]
  357. level_map: dict[str, int] = {}
  358. path_map: dict[str, str] = {}
  359. parent_map: dict[str, str | None] = {node_id: None for node_id in node_ids}
  360. child_count_map: dict[str, int] = {
  361. node_id: len(outgoing.get(node_id, [])) for node_id in node_ids
  362. }
  363. node_name_map = {
  364. _text(node.get("id")): _text(node.get("name")) or _text(node.get("id"))
  365. for node in nodes
  366. }
  367. queue: deque[tuple[str, int, str]] = deque()
  368. for root_node_id in roots:
  369. root_name = node_name_map.get(root_node_id) or root_node_id
  370. queue.append((root_node_id, 1, root_name))
  371. while queue:
  372. node_id, depth, path_text = queue.popleft()
  373. previous = level_map.get(node_id)
  374. if previous is not None and previous >= depth:
  375. continue
  376. level_map[node_id] = depth
  377. path_map[node_id] = path_text
  378. for child_node_id in outgoing.get(node_id, []):
  379. if parent_map.get(child_node_id) is None:
  380. parent_map[child_node_id] = node_id
  381. child_name = node_name_map.get(child_node_id) or child_node_id
  382. queue.append((child_node_id, depth + 1, f"{path_text} / {child_name}"))
  383. for node in nodes:
  384. node_id = _text(node.get("id"))
  385. if not node_id:
  386. continue
  387. if node_id in level_map:
  388. continue
  389. fallback_level = _safe_int(node.get("level")) or 1
  390. node_name = _text(node.get("name")) or node_id
  391. level_map[node_id] = fallback_level
  392. path_map[node_id] = node_name
  393. return level_map, parent_map, path_map, child_count_map
  394. def _parse_graph_topology(
  395. project_key: str,
  396. topology_id: int,
  397. diagram: dict[str, Any],
  398. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
  399. raw_nodes = [item for item in diagram.get("nodes", []) if isinstance(item, dict)]
  400. raw_edges = [item for item in diagram.get("edges", []) if isinstance(item, dict)]
  401. level_map, parent_map, path_map, child_count_map = _build_graph_node_context(
  402. raw_nodes, raw_edges
  403. )
  404. node_rows: list[dict[str, Any]] = []
  405. edge_rows: list[dict[str, Any]] = []
  406. entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
  407. entity_node_ids: dict[tuple[str, int], list[str]] = {}
  408. for sort_index, node in enumerate(raw_nodes, start=1):
  409. node_id = _text(node.get("id"))
  410. if not node_id:
  411. continue
  412. level = level_map.get(node_id) or _safe_int(node.get("level")) or 1
  413. node_rows.append(
  414. {
  415. "project_key": project_key,
  416. "topology_id": topology_id,
  417. "node_id": node_id,
  418. "node_name": _text(node.get("name")) or node_id,
  419. "parent_node_id": parent_map.get(node_id),
  420. "level": level,
  421. "node_type_code": _safe_int(node.get("type")),
  422. "refer_id": _safe_int(node.get("refer_id")),
  423. "refer_level": _safe_int(node.get("refer_level")),
  424. "is_virtual": _bool_as_int(node.get("is_virtual")),
  425. "path_text": path_map.get(node_id, _text(node.get("name")) or node_id),
  426. "child_count": child_count_map.get(node_id, 0),
  427. "sort_index": sort_index,
  428. }
  429. )
  430. _record_deepest_entities(
  431. entity_best,
  432. entity_node_ids,
  433. "meter",
  434. _as_int_list(node.get("meter_list")),
  435. depth=level,
  436. node_id=node_id,
  437. )
  438. _record_deepest_entities(
  439. entity_best,
  440. entity_node_ids,
  441. "device",
  442. _as_int_list(node.get("device_list")),
  443. depth=level,
  444. node_id=node_id,
  445. )
  446. for sort_index, edge in enumerate(raw_edges, start=1):
  447. source_node_id = _text(edge.get("source"))
  448. target_node_id = _text(edge.get("target"))
  449. if not source_node_id or not target_node_id:
  450. continue
  451. edge_rows.append(
  452. {
  453. "project_key": project_key,
  454. "topology_id": topology_id,
  455. "source_node_id": source_node_id,
  456. "target_node_id": target_node_id,
  457. "sort_index": sort_index,
  458. }
  459. )
  460. entity_rows: list[dict[str, Any]] = []
  461. for (entity_type, entity_id), (depth, _) in entity_best.items():
  462. for node_id in entity_node_ids[(entity_type, entity_id)]:
  463. entity_rows.append(
  464. {
  465. "project_key": project_key,
  466. "entity_type": entity_type,
  467. "entity_id": entity_id,
  468. "topology_id": topology_id,
  469. "node_id": node_id,
  470. "depth": depth,
  471. }
  472. )
  473. return node_rows, edge_rows, entity_rows
  474. def refresh_topology_cache(
  475. project_key: str, topology_ids: list[int] | None = None
  476. ) -> dict[str, Any]:
  477. project_key = _text(project_key)
  478. if not project_key:
  479. raise ValueError("project_key is required")
  480. ensure_topology_cache_tables()
  481. refreshed_at = _utc_now_iso()
  482. list_payload = api_list_topologies_with_group(project_key, group_ids=[])
  483. raw_items = list_payload.get("data")
  484. if not isinstance(raw_items, list):
  485. raise ValueError("topology list returned invalid data")
  486. group_rows, topology_refs = _collect_group_and_topology_refs(
  487. project_key, raw_items, refreshed_at=refreshed_at
  488. )
  489. requested_topology_ids = {item for item in topology_ids or []}
  490. available_topology_map = {item["topology_id"]: item for item in topology_refs}
  491. if requested_topology_ids:
  492. missing = sorted(requested_topology_ids - set(available_topology_map))
  493. if missing:
  494. raise ValueError(
  495. f"topology_id not found in upstream topology list: {missing}"
  496. )
  497. selected_topology_refs = [
  498. available_topology_map[item] for item in topology_ids or []
  499. ]
  500. else:
  501. selected_topology_refs = topology_refs
  502. registry_rows: list[dict[str, Any]] = []
  503. node_rows: list[dict[str, Any]] = []
  504. edge_rows: list[dict[str, Any]] = []
  505. entity_rows: list[dict[str, Any]] = []
  506. for topology_ref in selected_topology_refs:
  507. topology_id = topology_ref["topology_id"]
  508. detail_payload = api_get_topology(project_key, topology_id)
  509. detail_data = detail_payload.get("data")
  510. if not isinstance(detail_data, dict):
  511. raise ValueError(
  512. f"topology get returned invalid data for topology_id={topology_id}"
  513. )
  514. diagram = detail_data.get("diagram")
  515. if isinstance(diagram, list):
  516. root_shape = "tree"
  517. topology_node_rows, topology_edge_rows, topology_entity_rows = (
  518. _parse_tree_topology(project_key, topology_id, diagram)
  519. )
  520. elif isinstance(diagram, dict):
  521. root_shape = "graph"
  522. topology_node_rows, topology_edge_rows, topology_entity_rows = (
  523. _parse_graph_topology(project_key, topology_id, diagram)
  524. )
  525. else:
  526. root_shape = "tree"
  527. topology_node_rows, topology_edge_rows, topology_entity_rows = ([], [], [])
  528. registry_rows.append(
  529. {
  530. "project_key": project_key,
  531. "topology_id": topology_id,
  532. "topology_name": _text(detail_data.get("name"))
  533. or topology_ref["topology_name"],
  534. "topology_type": _safe_int(detail_data.get("type")) or 0,
  535. "object_type_code": _safe_int(detail_data.get("object")),
  536. "group_id": _safe_int(detail_data.get("group_id"))
  537. or topology_ref.get("group_id"),
  538. "root_shape": root_shape,
  539. "source_updated_time": _text(detail_data.get("updated_time")),
  540. "refreshed_at": refreshed_at,
  541. "is_active": 1,
  542. }
  543. )
  544. node_rows.extend(topology_node_rows)
  545. edge_rows.extend(topology_edge_rows)
  546. entity_rows.extend(topology_entity_rows)
  547. with Session(sql_engine()) as session:
  548. if requested_topology_ids:
  549. session.execute(
  550. delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
  551. )
  552. session.add_all(TopologyGroup(**row) for row in group_rows)
  553. session.execute(
  554. delete(TopologyEntityIndex).where(
  555. TopologyEntityIndex.project_key == project_key,
  556. TopologyEntityIndex.topology_id.in_(requested_topology_ids),
  557. )
  558. )
  559. session.execute(
  560. delete(TopologyEdge).where(
  561. TopologyEdge.project_key == project_key,
  562. TopologyEdge.topology_id.in_(requested_topology_ids),
  563. )
  564. )
  565. session.execute(
  566. delete(TopologyNode).where(
  567. TopologyNode.project_key == project_key,
  568. TopologyNode.topology_id.in_(requested_topology_ids),
  569. )
  570. )
  571. session.execute(
  572. delete(TopologyRegistry).where(
  573. TopologyRegistry.project_key == project_key,
  574. TopologyRegistry.topology_id.in_(requested_topology_ids),
  575. )
  576. )
  577. else:
  578. session.execute(
  579. delete(TopologyEntityIndex).where(
  580. TopologyEntityIndex.project_key == project_key
  581. )
  582. )
  583. session.execute(
  584. delete(TopologyEdge).where(TopologyEdge.project_key == project_key)
  585. )
  586. session.execute(
  587. delete(TopologyNode).where(TopologyNode.project_key == project_key)
  588. )
  589. session.execute(
  590. delete(TopologyRegistry).where(
  591. TopologyRegistry.project_key == project_key
  592. )
  593. )
  594. session.execute(
  595. delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
  596. )
  597. session.add_all(TopologyGroup(**row) for row in group_rows)
  598. session.add_all(TopologyRegistry(**row) for row in registry_rows)
  599. session.add_all(TopologyNode(**row) for row in node_rows)
  600. session.add_all(TopologyEdge(**row) for row in edge_rows)
  601. session.add_all(TopologyEntityIndex(**row) for row in entity_rows)
  602. session.commit()
  603. result = {
  604. "project_key": project_key,
  605. "refreshed_group_count": len(group_rows),
  606. "refreshed_topology_count": len(registry_rows),
  607. "refreshed_node_count": len(node_rows),
  608. "refreshed_edge_count": len(edge_rows),
  609. "refreshed_entity_index_count": len(entity_rows),
  610. "topology_ids": [row["topology_id"] for row in registry_rows],
  611. "refreshed_at": refreshed_at,
  612. }
  613. if not registry_rows:
  614. result["mcp_note"] = (
  615. f"Project '{project_key}' currently has no topology data in upstream config."
  616. )
  617. return result
  618. def _load_group_rows(session: Session, project_key: str) -> list[TopologyGroup]:
  619. return list(
  620. session.scalars(
  621. select(TopologyGroup)
  622. .where(TopologyGroup.project_key == project_key)
  623. .order_by(
  624. TopologyGroup.level.asc(),
  625. TopologyGroup.sort_index.asc(),
  626. TopologyGroup.group_id.asc(),
  627. )
  628. )
  629. )
  630. def _load_registry_rows(session: Session, project_key: str) -> list[TopologyRegistry]:
  631. return list(
  632. session.scalars(
  633. select(TopologyRegistry)
  634. .where(TopologyRegistry.project_key == project_key)
  635. .order_by(
  636. TopologyRegistry.topology_name.asc(), TopologyRegistry.topology_id.asc()
  637. )
  638. )
  639. )
  640. def _has_topology_cache(session: Session, project_key: str) -> bool:
  641. registry_rows = _load_registry_rows(session, project_key)
  642. return bool(registry_rows)
  643. def _require_topology_cache(session: Session, project_key: str) -> None:
  644. if _has_topology_cache(session, project_key):
  645. return
  646. raise ValueError(
  647. f"Project '{project_key}' has no cached topologies. Refresh it via "
  648. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  649. "already succeeded, the upstream project likely has no topology data."
  650. )
  651. def list_topology_groups(project_key: str) -> dict[str, Any]:
  652. project_key = _text(project_key)
  653. if not project_key:
  654. raise ValueError("project_key is required")
  655. ensure_topology_cache_tables()
  656. with Session(sql_engine()) as session:
  657. if not _has_topology_cache(session, project_key):
  658. return {
  659. "project_key": project_key,
  660. "groups": [],
  661. "total": 0,
  662. "mcp_note": (
  663. f"Project '{project_key}' has no cached topologies. Refresh it via "
  664. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  665. "already succeeded, the upstream project likely has no topology data."
  666. ),
  667. }
  668. group_rows = _load_group_rows(session, project_key)
  669. children_by_parent: dict[int | None, list[dict[str, Any]]] = defaultdict(list)
  670. node_by_group_id: dict[int, dict[str, Any]] = {}
  671. for group_row in group_rows:
  672. payload = {
  673. "group_id": group_row.group_id,
  674. "group_name": group_row.group_name,
  675. "parent_group_id": group_row.parent_group_id,
  676. "level": group_row.level,
  677. "group_path_text": group_row.group_path_text,
  678. "children": [],
  679. }
  680. node_by_group_id[group_row.group_id] = payload
  681. children_by_parent[group_row.parent_group_id].append(payload)
  682. for group_payload in node_by_group_id.values():
  683. group_payload["children"] = children_by_parent.get(
  684. group_payload["group_id"], []
  685. )
  686. return {
  687. "project_key": project_key,
  688. "groups": children_by_parent.get(None, []),
  689. "total": len(group_rows),
  690. }
  691. def _group_path_map(group_rows: list[TopologyGroup]) -> dict[int, str]:
  692. return {group_row.group_id: group_row.group_path_text for group_row in group_rows}
  693. def _collect_descendant_group_ids(
  694. group_rows: list[TopologyGroup], group_id: int
  695. ) -> set[int]:
  696. group_children: dict[int | None, list[int]] = defaultdict(list)
  697. for row in group_rows:
  698. group_children[row.parent_group_id].append(row.group_id)
  699. result: set[int] = set()
  700. queue: deque[int] = deque([group_id])
  701. while queue:
  702. current_group_id = queue.popleft()
  703. if current_group_id in result:
  704. continue
  705. result.add(current_group_id)
  706. queue.extend(group_children.get(current_group_id, []))
  707. return result
  708. def list_topologies(
  709. project_key: str,
  710. *,
  711. group_id: int | None = None,
  712. object_type_code: int | None = None,
  713. ) -> dict[str, Any]:
  714. project_key = _text(project_key)
  715. if not project_key:
  716. raise ValueError("project_key is required")
  717. ensure_topology_cache_tables()
  718. with Session(sql_engine()) as session:
  719. if not _has_topology_cache(session, project_key):
  720. return {
  721. "project_key": project_key,
  722. "topologies": [],
  723. "total": 0,
  724. "mcp_note": (
  725. f"Project '{project_key}' has no cached topologies. Refresh it via "
  726. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  727. "already succeeded, the upstream project likely has no topology data."
  728. ),
  729. }
  730. group_rows = _load_group_rows(session, project_key)
  731. registry_rows = _load_registry_rows(session, project_key)
  732. group_path_map = _group_path_map(group_rows)
  733. allowed_group_ids: set[int] | None = None
  734. if group_id is not None:
  735. allowed_group_ids = _collect_descendant_group_ids(group_rows, group_id)
  736. topologies: list[dict[str, Any]] = []
  737. for registry_row in registry_rows:
  738. if (
  739. allowed_group_ids is not None
  740. and registry_row.group_id not in allowed_group_ids
  741. ):
  742. continue
  743. if (
  744. object_type_code is not None
  745. and registry_row.object_type_code != object_type_code
  746. ):
  747. continue
  748. topologies.append(
  749. {
  750. "topology_id": registry_row.topology_id,
  751. "topology_name": registry_row.topology_name,
  752. "topology_type": registry_row.topology_type,
  753. "object_type_code": registry_row.object_type_code,
  754. "group_id": registry_row.group_id,
  755. "group_path_text": group_path_map.get(registry_row.group_id or -1, ""),
  756. "root_shape": registry_row.root_shape,
  757. "refreshed_at": registry_row.refreshed_at,
  758. }
  759. )
  760. return {
  761. "project_key": project_key,
  762. "topologies": topologies,
  763. "total": len(topologies),
  764. }
  765. def _get_registry_or_error(
  766. session: Session, project_key: str, topology_id: int
  767. ) -> TopologyRegistry:
  768. registry_row = session.scalar(
  769. select(TopologyRegistry).where(
  770. TopologyRegistry.project_key == project_key,
  771. TopologyRegistry.topology_id == topology_id,
  772. )
  773. )
  774. if registry_row is None:
  775. raise ValueError(f"topology_id not found in cache: {topology_id}")
  776. return registry_row
  777. def _get_node_or_error(
  778. session: Session, project_key: str, topology_id: int, node_id: str
  779. ) -> TopologyNode:
  780. node_row = session.scalar(
  781. select(TopologyNode).where(
  782. TopologyNode.project_key == project_key,
  783. TopologyNode.topology_id == topology_id,
  784. TopologyNode.node_id == node_id,
  785. )
  786. )
  787. if node_row is None:
  788. raise ValueError(f"node_id not found in cache: {node_id}")
  789. return node_row
  790. def _node_payload(node_row: TopologyNode) -> dict[str, Any]:
  791. return {
  792. "node_id": node_row.node_id,
  793. "node_name": node_row.node_name,
  794. "level": node_row.level,
  795. "parent_node_id": node_row.parent_node_id,
  796. "refer_id": node_row.refer_id,
  797. "refer_level": node_row.refer_level,
  798. "is_virtual": bool(node_row.is_virtual),
  799. "path_text": node_row.path_text,
  800. "child_count": node_row.child_count,
  801. }
  802. def _load_node_map(
  803. session: Session, project_key: str, topology_id: int
  804. ) -> dict[str, TopologyNode]:
  805. rows = session.scalars(
  806. select(TopologyNode).where(
  807. TopologyNode.project_key == project_key,
  808. TopologyNode.topology_id == topology_id,
  809. )
  810. )
  811. return {row.node_id: row for row in rows}
  812. def _load_adjacency(
  813. session: Session, project_key: str, topology_id: int
  814. ) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
  815. edges = session.scalars(
  816. select(TopologyEdge)
  817. .where(
  818. TopologyEdge.project_key == project_key,
  819. TopologyEdge.topology_id == topology_id,
  820. )
  821. .order_by(TopologyEdge.sort_index.asc(), TopologyEdge.id.asc())
  822. )
  823. parents_by_node: dict[str, list[str]] = defaultdict(list)
  824. children_by_node: dict[str, list[str]] = defaultdict(list)
  825. for edge in edges:
  826. parents_by_node[edge.target_node_id].append(edge.source_node_id)
  827. children_by_node[edge.source_node_id].append(edge.target_node_id)
  828. return parents_by_node, children_by_node
  829. def _dedupe_preserve_order(node_ids: list[str]) -> list[str]:
  830. result: list[str] = []
  831. seen: set[str] = set()
  832. for node_id in node_ids:
  833. if node_id in seen:
  834. continue
  835. seen.add(node_id)
  836. result.append(node_id)
  837. return result
  838. def _node_list_payload(
  839. node_map: dict[str, TopologyNode], node_ids: list[str]
  840. ) -> list[dict[str, Any]]:
  841. payload: list[dict[str, Any]] = []
  842. for node_id in _dedupe_preserve_order(node_ids):
  843. node_row = node_map.get(node_id)
  844. if node_row is None:
  845. continue
  846. payload.append(_node_payload(node_row))
  847. return payload
  848. def _collect_ancestors(
  849. node_map: dict[str, TopologyNode],
  850. parents_by_node: dict[str, list[str]],
  851. node_id: str,
  852. depth_limit: int,
  853. ) -> list[dict[str, Any]]:
  854. if depth_limit <= 0:
  855. return []
  856. visited: dict[str, int] = {}
  857. queue: deque[tuple[str, int]] = deque(
  858. (parent_id, 1) for parent_id in parents_by_node.get(node_id, [])
  859. )
  860. while queue:
  861. current_node_id, distance = queue.popleft()
  862. if distance > depth_limit:
  863. continue
  864. previous_distance = visited.get(current_node_id)
  865. if previous_distance is not None and previous_distance <= distance:
  866. continue
  867. visited[current_node_id] = distance
  868. for parent_id in parents_by_node.get(current_node_id, []):
  869. queue.append((parent_id, distance + 1))
  870. ordered_ids = sorted(
  871. visited,
  872. key=lambda item: (
  873. -visited[item],
  874. node_map[item].path_text or node_map[item].node_name,
  875. item,
  876. ),
  877. )
  878. result: list[dict[str, Any]] = []
  879. for current_node_id in ordered_ids:
  880. node_payload = _node_payload(node_map[current_node_id])
  881. node_payload["distance"] = visited[current_node_id]
  882. result.append(node_payload)
  883. return result
  884. def _collect_descendants(
  885. node_map: dict[str, TopologyNode],
  886. children_by_node: dict[str, list[str]],
  887. node_id: str,
  888. depth_limit: int,
  889. ) -> list[dict[str, Any]]:
  890. if depth_limit <= 0:
  891. return []
  892. visited: dict[str, int] = {}
  893. queue: deque[tuple[str, int]] = deque(
  894. (child_id, 1) for child_id in children_by_node.get(node_id, [])
  895. )
  896. while queue:
  897. current_node_id, distance = queue.popleft()
  898. if distance > depth_limit:
  899. continue
  900. previous_distance = visited.get(current_node_id)
  901. if previous_distance is not None and previous_distance <= distance:
  902. continue
  903. visited[current_node_id] = distance
  904. for child_id in children_by_node.get(current_node_id, []):
  905. queue.append((child_id, distance + 1))
  906. ordered_ids = sorted(
  907. visited,
  908. key=lambda item: (
  909. visited[item],
  910. node_map[item].path_text or node_map[item].node_name,
  911. item,
  912. ),
  913. )
  914. result: list[dict[str, Any]] = []
  915. for current_node_id in ordered_ids:
  916. node_payload = _node_payload(node_map[current_node_id])
  917. node_payload["distance"] = visited[current_node_id]
  918. result.append(node_payload)
  919. return result
  920. def _topology_metadata_payload(
  921. registry_row: TopologyRegistry, group_path_text: str
  922. ) -> dict[str, Any]:
  923. return {
  924. "topology_id": registry_row.topology_id,
  925. "topology_name": registry_row.topology_name,
  926. "topology_type": registry_row.topology_type,
  927. "object_type_code": registry_row.object_type_code,
  928. "group_id": registry_row.group_id,
  929. "group_path_text": group_path_text,
  930. "root_shape": registry_row.root_shape,
  931. }
  932. def get_topology_node(
  933. project_key: str,
  934. topology_id: int,
  935. node_id: str,
  936. *,
  937. include_siblings: bool = True,
  938. include_children: bool = True,
  939. ) -> dict[str, Any]:
  940. project_key = _text(project_key)
  941. node_id = _text(node_id)
  942. if not project_key:
  943. raise ValueError("project_key is required")
  944. if not node_id:
  945. raise ValueError("node_id is required")
  946. ensure_topology_cache_tables()
  947. with Session(sql_engine()) as session:
  948. _require_topology_cache(session, project_key)
  949. group_rows = _load_group_rows(session, project_key)
  950. group_path_map = _group_path_map(group_rows)
  951. registry_row = _get_registry_or_error(session, project_key, topology_id)
  952. node_row = _get_node_or_error(session, project_key, topology_id, node_id)
  953. node_map = _load_node_map(session, project_key, topology_id)
  954. parents_by_node, children_by_node = _load_adjacency(
  955. session, project_key, topology_id
  956. )
  957. parent_ids = parents_by_node.get(node_id, [])
  958. child_ids = children_by_node.get(node_id, []) if include_children else []
  959. sibling_ids: list[str] = []
  960. if include_siblings and len(parent_ids) == 1:
  961. sibling_ids = [
  962. candidate
  963. for candidate in children_by_node.get(parent_ids[0], [])
  964. if candidate != node_id
  965. ]
  966. return {
  967. "topology": _topology_metadata_payload(
  968. registry_row, group_path_map.get(registry_row.group_id or -1, "")
  969. ),
  970. "node": _node_payload(node_row),
  971. "parents": _node_list_payload(node_map, parent_ids),
  972. "children": _node_list_payload(node_map, child_ids),
  973. "siblings": _node_list_payload(node_map, sibling_ids),
  974. }
  975. def find_topology_context(
  976. project_key: str,
  977. entity_type: str,
  978. entity_id: int,
  979. *,
  980. topology_id: int | None = None,
  981. include_siblings: bool = True,
  982. ancestor_depth: int = 5,
  983. descendant_depth: int = 2,
  984. ) -> dict[str, Any]:
  985. project_key = _text(project_key)
  986. normalized_entity_type = _normalize_entity_type(entity_type)
  987. if not project_key:
  988. raise ValueError("project_key is required")
  989. if entity_id <= 0:
  990. raise ValueError("entity_id must be a positive integer")
  991. ensure_topology_cache_tables()
  992. with Session(sql_engine()) as session:
  993. _require_topology_cache(session, project_key)
  994. group_rows = _load_group_rows(session, project_key)
  995. group_path_map = _group_path_map(group_rows)
  996. query = select(TopologyEntityIndex).where(
  997. TopologyEntityIndex.project_key == project_key,
  998. TopologyEntityIndex.entity_type == normalized_entity_type,
  999. TopologyEntityIndex.entity_id == entity_id,
  1000. )
  1001. if topology_id is not None:
  1002. query = query.where(TopologyEntityIndex.topology_id == topology_id)
  1003. index_rows = list(
  1004. session.scalars(
  1005. query.order_by(
  1006. TopologyEntityIndex.depth.desc(),
  1007. TopologyEntityIndex.topology_id.asc(),
  1008. )
  1009. )
  1010. )
  1011. matches: list[dict[str, Any]] = []
  1012. topology_node_maps: dict[int, dict[str, TopologyNode]] = {}
  1013. topology_adjacency: dict[
  1014. int, tuple[dict[str, list[str]], dict[str, list[str]]]
  1015. ] = {}
  1016. topology_registry_rows: dict[int, TopologyRegistry] = {}
  1017. for index_row in index_rows:
  1018. current_topology_id = index_row.topology_id
  1019. if current_topology_id not in topology_registry_rows:
  1020. topology_registry_rows[current_topology_id] = _get_registry_or_error(
  1021. session, project_key, current_topology_id
  1022. )
  1023. topology_node_maps[current_topology_id] = _load_node_map(
  1024. session, project_key, current_topology_id
  1025. )
  1026. topology_adjacency[current_topology_id] = _load_adjacency(
  1027. session, project_key, current_topology_id
  1028. )
  1029. node_map = topology_node_maps[current_topology_id]
  1030. node_row = node_map.get(index_row.node_id)
  1031. if node_row is None:
  1032. continue
  1033. parents_by_node, children_by_node = topology_adjacency[current_topology_id]
  1034. parent_ids = parents_by_node.get(index_row.node_id, [])
  1035. child_ids = children_by_node.get(index_row.node_id, [])
  1036. sibling_ids: list[str] = []
  1037. if include_siblings and len(parent_ids) == 1:
  1038. sibling_ids = [
  1039. candidate
  1040. for candidate in children_by_node.get(parent_ids[0], [])
  1041. if candidate != index_row.node_id
  1042. ]
  1043. matches.append(
  1044. {
  1045. "topology": _topology_metadata_payload(
  1046. topology_registry_rows[current_topology_id],
  1047. group_path_map.get(
  1048. topology_registry_rows[current_topology_id].group_id or -1,
  1049. "",
  1050. ),
  1051. ),
  1052. "self": _node_payload(node_row),
  1053. "parents": _node_list_payload(node_map, parent_ids),
  1054. "children": _node_list_payload(node_map, child_ids),
  1055. "ancestors": _collect_ancestors(
  1056. node_map,
  1057. parents_by_node,
  1058. index_row.node_id,
  1059. max(ancestor_depth, 0),
  1060. ),
  1061. "descendants": _collect_descendants(
  1062. node_map,
  1063. children_by_node,
  1064. index_row.node_id,
  1065. max(descendant_depth, 0),
  1066. ),
  1067. "siblings": _node_list_payload(node_map, sibling_ids),
  1068. }
  1069. )
  1070. return {
  1071. "query": {
  1072. "project_key": project_key,
  1073. "entity_type": normalized_entity_type,
  1074. "entity_id": entity_id,
  1075. "topology_id": topology_id,
  1076. },
  1077. "matches": matches,
  1078. "total_matches": len(matches),
  1079. }