topology_cache.py 62 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783
  1. from __future__ import annotations
  2. from collections import defaultdict, deque
  3. from concurrent.futures import ThreadPoolExecutor
  4. from datetime import datetime, timezone
  5. import json
  6. from typing import Any
  7. from sqlalchemy import (
  8. Index,
  9. Integer,
  10. String,
  11. Text,
  12. UniqueConstraint,
  13. delete,
  14. inspect,
  15. select,
  16. text,
  17. )
  18. from sqlalchemy.orm import Mapped, Session, mapped_column
  19. from .config_api import list_topologies_with_group as api_list_topologies_with_group
  20. from .config_api import list_locations as api_list_locations
  21. from .config_api import list_system_tree as api_list_system_tree
  22. from .config_api import list_systems as api_list_systems
  23. from .config_api import list_device_types as api_list_device_types
  24. from .config_api import list_meter_types as api_list_meter_types
  25. from .config_api import search_devices as api_search_devices
  26. from .config_api import search_meters as api_search_meters
  27. from .config_api import get_topology_data as api_get_topology_data
  28. from .config_api import get_topology as api_get_topology
  29. from .db import Base, sql_engine
  30. PT_OBJ_TYPE_LOCATION = 11
  31. PT_OBJ_TYPE_SYSTEMTYPE = 12
  32. PT_OBJ_TYPE_SYSTEM = 13
  33. PT_OBJ_TYPE_DEVICETYPE = 14
  34. PT_OBJ_TYPE_DEVICE = 15
  35. PT_OBJ_TYPE_METERTYPE = 16
  36. PT_OBJ_TYPE_METERMODEL = 17
  37. PT_OBJ_TYPE_METER = 18
  38. PT_OBJ_TYPE_TOPOGROUP = 19
  39. PT_OBJ_TYPE_TOPODIAGRAM = 20
  40. OBJECT_TYPE_LABELS = {
  41. PT_OBJ_TYPE_LOCATION: "位置",
  42. PT_OBJ_TYPE_SYSTEMTYPE: "系统类型",
  43. PT_OBJ_TYPE_SYSTEM: "系统",
  44. PT_OBJ_TYPE_DEVICETYPE: "设备类型",
  45. PT_OBJ_TYPE_DEVICE: "设备",
  46. PT_OBJ_TYPE_METERTYPE: "仪表类型",
  47. PT_OBJ_TYPE_METERMODEL: "仪表型号",
  48. PT_OBJ_TYPE_METER: "仪表",
  49. PT_OBJ_TYPE_TOPOGROUP: "拓扑图分组",
  50. PT_OBJ_TYPE_TOPODIAGRAM: "拓扑图",
  51. }
  52. ORDER_LABELS = {1: "顺序", 2: "逆序"}
  53. FILTER_TYPE_LABELS = {1: "所有", 2: "任一"}
  54. MATCH_TYPE_LABELS = {1: "等于", 2: "不等于", 3: "包含", 4: "不包含"}
  55. def _utc_now_iso() -> str:
  56. return datetime.now(timezone.utc).isoformat()
  57. def _current_unix_ts() -> int:
  58. return int(datetime.now().timestamp())
  59. def _safe_int(raw_value: Any) -> int | None:
  60. if raw_value is None:
  61. return None
  62. try:
  63. return int(str(raw_value).strip())
  64. except Exception:
  65. return None
  66. def _text(raw_value: Any) -> str:
  67. return str(raw_value or "").strip()
  68. def _bool_as_int(raw_value: Any) -> int:
  69. return 1 if bool(raw_value) else 0
  70. def _normalize_entity_type(entity_type: str) -> str:
  71. normalized = _text(entity_type).lower()
  72. if normalized not in {"meter", "device"}:
  73. raise ValueError("entity_type must be 'meter' or 'device'")
  74. return normalized
  75. class TopologyGroup(Base):
  76. __tablename__ = "topology_group"
  77. __table_args__ = (
  78. UniqueConstraint(
  79. "project_key", "group_id", name="uq_topology_group_project_group"
  80. ),
  81. Index("ix_topology_group_project_parent", "project_key", "parent_group_id"),
  82. )
  83. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  84. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  85. group_id: Mapped[int] = mapped_column(Integer, nullable=False)
  86. group_name: Mapped[str] = mapped_column(String(255), nullable=False)
  87. parent_group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  88. group_path_text: Mapped[str] = mapped_column(Text, nullable=False)
  89. level: Mapped[int] = mapped_column(Integer, nullable=False)
  90. sort_index: Mapped[int] = mapped_column(Integer, nullable=False)
  91. refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
  92. is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
  93. class TopologyRegistry(Base):
  94. __tablename__ = "topology_registry"
  95. __table_args__ = (
  96. UniqueConstraint(
  97. "project_key", "topology_id", name="uq_topology_registry_project_topology"
  98. ),
  99. Index("ix_topology_registry_project_group", "project_key", "group_id"),
  100. )
  101. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  102. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  103. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  104. topology_name: Mapped[str] = mapped_column(String(255), nullable=False)
  105. topology_type: Mapped[int] = mapped_column(Integer, nullable=False)
  106. object_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
  107. group_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  108. root_shape: Mapped[str] = mapped_column(String(32), nullable=False)
  109. source_updated_time: Mapped[str] = mapped_column(
  110. String(64), nullable=False, default=""
  111. )
  112. data_options_json: Mapped[str] = mapped_column(Text, nullable=False, default="")
  113. dimension_config_json: Mapped[str] = mapped_column(
  114. Text, nullable=False, default=""
  115. )
  116. refreshed_at: Mapped[str] = mapped_column(String(64), nullable=False)
  117. is_active: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
  118. class TopologyNode(Base):
  119. __tablename__ = "topology_node"
  120. __table_args__ = (
  121. UniqueConstraint(
  122. "project_key",
  123. "topology_id",
  124. "node_id",
  125. name="uq_topology_node_project_topology_node",
  126. ),
  127. Index(
  128. "ix_topology_node_project_topology_parent",
  129. "project_key",
  130. "topology_id",
  131. "parent_node_id",
  132. ),
  133. Index(
  134. "ix_topology_node_project_topology_refer",
  135. "project_key",
  136. "topology_id",
  137. "refer_id",
  138. ),
  139. )
  140. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  141. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  142. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  143. node_id: Mapped[str] = mapped_column(Text, nullable=False)
  144. node_name: Mapped[str] = mapped_column(Text, nullable=False)
  145. parent_node_id: Mapped[str | None] = mapped_column(Text, nullable=True)
  146. level: Mapped[int | None] = mapped_column(Integer, nullable=True)
  147. node_type_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
  148. refer_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
  149. refer_level: Mapped[int | None] = mapped_column(Integer, nullable=True)
  150. is_virtual: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  151. path_text: Mapped[str] = mapped_column(Text, nullable=False, default="")
  152. child_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  153. sort_index: Mapped[int | None] = mapped_column(Integer, nullable=True)
  154. class TopologyEdge(Base):
  155. __tablename__ = "topology_edge"
  156. __table_args__ = (
  157. UniqueConstraint(
  158. "project_key",
  159. "topology_id",
  160. "source_node_id",
  161. "target_node_id",
  162. name="uq_topology_edge_project_topology_nodes",
  163. ),
  164. Index(
  165. "ix_topology_edge_project_topology_source",
  166. "project_key",
  167. "topology_id",
  168. "source_node_id",
  169. ),
  170. Index(
  171. "ix_topology_edge_project_topology_target",
  172. "project_key",
  173. "topology_id",
  174. "target_node_id",
  175. ),
  176. )
  177. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  178. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  179. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  180. source_node_id: Mapped[str] = mapped_column(Text, nullable=False)
  181. target_node_id: Mapped[str] = mapped_column(Text, nullable=False)
  182. sort_index: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
  183. class TopologyEntityIndex(Base):
  184. __tablename__ = "topology_entity_index"
  185. __table_args__ = (
  186. UniqueConstraint(
  187. "project_key",
  188. "entity_type",
  189. "entity_id",
  190. "topology_id",
  191. "node_id",
  192. name="uq_topology_entity_index_project_entity_topology_node",
  193. ),
  194. Index(
  195. "ix_topology_entity_index_project_entity",
  196. "project_key",
  197. "entity_type",
  198. "entity_id",
  199. ),
  200. Index(
  201. "ix_topology_entity_index_project_topology_node",
  202. "project_key",
  203. "topology_id",
  204. "node_id",
  205. ),
  206. )
  207. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  208. project_key: Mapped[str] = mapped_column(String(128), nullable=False)
  209. entity_type: Mapped[str] = mapped_column(String(16), nullable=False)
  210. entity_id: Mapped[int] = mapped_column(Integer, nullable=False)
  211. topology_id: Mapped[int] = mapped_column(Integer, nullable=False)
  212. node_id: Mapped[str] = mapped_column(Text, nullable=False)
  213. depth: Mapped[int | None] = mapped_column(Integer, nullable=True)
  214. def ensure_topology_cache_tables() -> None:
  215. Base.metadata.create_all(sql_engine())
  216. engine = sql_engine()
  217. topology_registry_columns = {
  218. item["name"] for item in inspect(engine).get_columns("topology_registry")
  219. }
  220. with engine.begin() as connection:
  221. if "data_options_json" not in topology_registry_columns:
  222. connection.execute(
  223. text("ALTER TABLE topology_registry ADD COLUMN data_options_json TEXT")
  224. )
  225. if "dimension_config_json" not in topology_registry_columns:
  226. connection.execute(
  227. text(
  228. "ALTER TABLE topology_registry ADD COLUMN dimension_config_json TEXT"
  229. )
  230. )
  231. def _dump_json_text(raw_value: Any) -> str:
  232. if raw_value is None:
  233. return ""
  234. return json.dumps(raw_value, ensure_ascii=False, separators=(",", ":"))
  235. def _load_json_text(raw_value: str) -> Any:
  236. text_value = _text(raw_value)
  237. if not text_value:
  238. return None
  239. try:
  240. return json.loads(text_value)
  241. except json.JSONDecodeError:
  242. return text_value
  243. def _build_metric_definitions(data_options: Any) -> dict[str, dict[str, Any]]:
  244. if not isinstance(data_options, dict):
  245. return {"instant": {}, "accu": {}}
  246. result: dict[str, dict[str, Any]] = {"instant": {}, "accu": {}}
  247. for display in ("instant", "accu"):
  248. items = data_options.get(display)
  249. if not isinstance(items, list):
  250. continue
  251. mapped: dict[str, Any] = {}
  252. for item in items:
  253. if not isinstance(item, dict):
  254. continue
  255. code = _text(item.get("code"))
  256. if not code:
  257. continue
  258. mapped[code] = {
  259. "name": _text(item.get("name")),
  260. "unit": _text(item.get("unit")),
  261. "type": _safe_int(item.get("type")),
  262. "display": bool(item.get("display")),
  263. "value_key": _text(item.get("value")),
  264. }
  265. result[display] = mapped
  266. return result
  267. def _extract_page_items(payload: Any) -> list[dict[str, Any]]:
  268. if not isinstance(payload, dict):
  269. return []
  270. data = payload.get("data")
  271. if isinstance(data, dict):
  272. items = data.get("data")
  273. if isinstance(items, list):
  274. return [item for item in items if isinstance(item, dict)]
  275. if isinstance(data, list):
  276. return [item for item in data if isinstance(item, dict)]
  277. return []
  278. def _load_all_pages(fetch_page: Any) -> list[dict[str, Any]]:
  279. page_num = 1
  280. result: list[dict[str, Any]] = []
  281. while True:
  282. payload = fetch_page(page_num)
  283. result.extend(_extract_page_items(payload))
  284. data = payload.get("data") if isinstance(payload, dict) else None
  285. if not isinstance(data, dict):
  286. break
  287. total_page = _safe_int(data.get("total_page")) or page_num
  288. if page_num >= total_page:
  289. break
  290. page_num += 1
  291. return result
  292. def _flatten_tree_items(items: list[dict[str, Any]]) -> list[dict[str, Any]]:
  293. result: list[dict[str, Any]] = []
  294. queue = deque(items)
  295. while queue:
  296. item = queue.popleft()
  297. result.append(item)
  298. children = item.get("children")
  299. if isinstance(children, list):
  300. queue.extend(child for child in children if isinstance(child, dict))
  301. return result
  302. def _to_id_name_map(items: list[dict[str, Any]]) -> dict[int, str]:
  303. result: dict[int, str] = {}
  304. for item in items:
  305. item_id = _safe_int(item.get("id"))
  306. if item_id is None:
  307. continue
  308. result[item_id] = _text(item.get("name"))
  309. return result
  310. def _load_location_name_map(project_key: str) -> dict[int, str]:
  311. items = _load_all_pages(
  312. lambda page_num: api_list_locations(
  313. project_key, keyword="", page_size=100, page_num=page_num
  314. )
  315. )
  316. return _to_id_name_map(_flatten_tree_items(items))
  317. def _load_system_type_name_map(project_key: str) -> dict[int, str]:
  318. payload = api_list_system_tree(project_key)
  319. items = _extract_page_items(payload)
  320. return _to_id_name_map(
  321. [item for item in _flatten_tree_items(items) if _safe_int(item.get("type")) == 1]
  322. )
  323. def _load_system_name_map(project_key: str) -> dict[int, str]:
  324. items = _load_all_pages(
  325. lambda page_num: api_list_systems(
  326. project_key,
  327. page_size=100,
  328. page_num=page_num,
  329. system_type_id=0,
  330. show_below=True,
  331. )
  332. )
  333. return _to_id_name_map(items)
  334. def _load_device_type_name_map(project_key: str) -> dict[int, str]:
  335. return _to_id_name_map(_extract_page_items(api_list_device_types(project_key)))
  336. def _load_meter_type_name_map(project_key: str) -> dict[int, str]:
  337. return _to_id_name_map(_extract_page_items(api_list_meter_types(project_key)))
  338. def _load_device_name_map(project_key: str) -> dict[int, str]:
  339. items = _load_all_pages(
  340. lambda page_num: api_search_devices(
  341. project_key,
  342. page_size=100,
  343. page_num=page_num,
  344. keyword="",
  345. location_id=0,
  346. show_below=True,
  347. system_ids=[],
  348. device_type_ids=[],
  349. )
  350. )
  351. return _to_id_name_map(items)
  352. def _load_meter_name_map(project_key: str) -> dict[int, str]:
  353. items = _load_all_pages(
  354. lambda page_num: api_search_meters(
  355. project_key,
  356. page_size=100,
  357. page_num=page_num,
  358. keyword="",
  359. location_id=0,
  360. show_below=True,
  361. meter_type_id=0,
  362. measurement_location_ids=[],
  363. measurement_system_ids=[],
  364. measurement_device_type_ids=[],
  365. status=None,
  366. )
  367. )
  368. return _to_id_name_map(items)
  369. def _load_topology_group_name_map(session: Session, project_key: str) -> dict[int, str]:
  370. return {row.group_id: row.group_name for row in _load_group_rows(session, project_key)}
  371. def _load_topology_name_map(session: Session, project_key: str) -> dict[int, str]:
  372. return {
  373. row.topology_id: row.topology_name
  374. for row in _load_registry_rows(session, project_key)
  375. }
  376. def _object_type_label(type_code: int | None) -> str:
  377. return OBJECT_TYPE_LABELS.get(type_code or 0, "")
  378. def _resolve_field_name_map(
  379. session: Session, project_key: str, type_code: int
  380. ) -> dict[int, str]:
  381. if type_code == PT_OBJ_TYPE_LOCATION:
  382. return _load_location_name_map(project_key)
  383. if type_code == PT_OBJ_TYPE_SYSTEMTYPE:
  384. return _load_system_type_name_map(project_key)
  385. if type_code == PT_OBJ_TYPE_SYSTEM:
  386. return _load_system_name_map(project_key)
  387. if type_code == PT_OBJ_TYPE_DEVICETYPE:
  388. return _load_device_type_name_map(project_key)
  389. if type_code == PT_OBJ_TYPE_DEVICE:
  390. return _load_device_name_map(project_key)
  391. if type_code == PT_OBJ_TYPE_METERTYPE:
  392. return _load_meter_type_name_map(project_key)
  393. if type_code == PT_OBJ_TYPE_METERMODEL:
  394. raise ValueError("type=17 (仪表型号) needs additional API mapping confirmation")
  395. if type_code == PT_OBJ_TYPE_METER:
  396. return _load_meter_name_map(project_key)
  397. if type_code == PT_OBJ_TYPE_TOPOGROUP:
  398. return _load_topology_group_name_map(session, project_key)
  399. if type_code == PT_OBJ_TYPE_TOPODIAGRAM:
  400. return _load_topology_name_map(session, project_key)
  401. return {}
  402. def _resolve_field_items(
  403. session: Session, project_key: str, type_code: int, field_ids: list[int]
  404. ) -> list[dict[str, Any]]:
  405. name_map = _resolve_field_name_map(session, project_key, type_code)
  406. return [
  407. {"id": field_id, "name": name_map.get(field_id, "")}
  408. for field_id in field_ids
  409. ]
  410. def _normalize_int_list(raw_value: Any) -> list[int]:
  411. if not isinstance(raw_value, list):
  412. return []
  413. result: list[int] = []
  414. for item in raw_value:
  415. normalized = _safe_int(item)
  416. if normalized is None:
  417. continue
  418. result.append(normalized)
  419. return result
  420. def get_topology_group_config(project_key: str, topology_id: int) -> dict[str, Any]:
  421. project_key = _text(project_key)
  422. if not project_key:
  423. raise ValueError("project_key is required")
  424. ensure_topology_cache_tables()
  425. with Session(sql_engine()) as session:
  426. _require_topology_cache(session, project_key)
  427. group_path_map = _group_path_map(_load_group_rows(session, project_key))
  428. registry_row = _get_registry_or_error(session, project_key, topology_id)
  429. if registry_row.topology_type != 2:
  430. return {
  431. "supported": False,
  432. "raw_dimension_config": _load_json_text(
  433. registry_row.dimension_config_json
  434. ),
  435. "groupings": [],
  436. "filter": {
  437. "filter_type": None,
  438. "filter_type_label": "",
  439. "conditions": [],
  440. },
  441. "mcp_note": "topology.get_group_config only applies to topology_type=2 topologies.",
  442. }
  443. raw_dimension_config = _load_json_text(registry_row.dimension_config_json)
  444. if not isinstance(raw_dimension_config, dict):
  445. raw_dimension_config = {}
  446. raw_dimensions = raw_dimension_config.get("dimensions")
  447. raw_filter = raw_dimension_config.get("filter")
  448. dimensions = raw_dimensions if isinstance(raw_dimensions, list) else []
  449. filter_payload = raw_filter if isinstance(raw_filter, dict) else {}
  450. groupings: list[dict[str, Any]] = []
  451. for item in dimensions:
  452. if not isinstance(item, dict):
  453. continue
  454. type_code = _safe_int(item.get("type"))
  455. level = _safe_int(item.get("level"))
  456. order = _safe_int(item.get("order"))
  457. groupings.append(
  458. {
  459. "name": _text(item.get("name")),
  460. "type": type_code,
  461. "type_label": _object_type_label(type_code),
  462. "level": level,
  463. "order": order,
  464. "order_label": ORDER_LABELS.get(order or 0, ""),
  465. }
  466. )
  467. conditions_payload = filter_payload.get("conditions")
  468. conditions = conditions_payload if isinstance(conditions_payload, list) else []
  469. resolved_conditions: list[dict[str, Any]] = []
  470. for item in conditions:
  471. if not isinstance(item, dict):
  472. continue
  473. type_code = _safe_int(item.get("type")) or 0
  474. level = _safe_int(item.get("level"))
  475. match_type = _safe_int(item.get("match_type"))
  476. field_ids = _normalize_int_list(item.get("fields"))
  477. resolved_conditions.append(
  478. {
  479. "type": type_code,
  480. "type_label": _object_type_label(type_code),
  481. "level": level,
  482. "match_type": match_type,
  483. "match_type_label": MATCH_TYPE_LABELS.get(match_type or 0, ""),
  484. "fields": field_ids,
  485. "field_items": _resolve_field_items(
  486. session, project_key, type_code, field_ids
  487. ),
  488. }
  489. )
  490. filter_type = _safe_int(filter_payload.get("filter_type"))
  491. return {
  492. "supported": True,
  493. "raw_dimension_config": raw_dimension_config,
  494. "groupings": groupings,
  495. "filter": {
  496. "filter_type": filter_type,
  497. "filter_type_label": FILTER_TYPE_LABELS.get(filter_type or 0, ""),
  498. "conditions": resolved_conditions,
  499. },
  500. }
  501. def _collect_group_and_topology_refs(
  502. project_key: str,
  503. items: list[Any],
  504. *,
  505. refreshed_at: str,
  506. parent_group_id: int | None = None,
  507. parent_group_path: tuple[str, ...] = (),
  508. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
  509. group_rows: list[dict[str, Any]] = []
  510. topology_refs: list[dict[str, Any]] = []
  511. for sort_index, item in enumerate(items, start=1):
  512. if not isinstance(item, dict):
  513. continue
  514. item_id = _safe_int(item.get("id"))
  515. item_name = _text(item.get("name")) or str(item_id or "")
  516. item_type = _safe_int(item.get("type"))
  517. children = (
  518. item.get("children") if isinstance(item.get("children"), list) else []
  519. )
  520. if item_id is None:
  521. continue
  522. if item_type == 1:
  523. group_path = (*parent_group_path, item_name)
  524. group_rows.append(
  525. {
  526. "project_key": project_key,
  527. "group_id": item_id,
  528. "group_name": item_name,
  529. "parent_group_id": parent_group_id,
  530. "group_path_text": " / ".join(group_path),
  531. "level": len(group_path),
  532. "sort_index": sort_index,
  533. "refreshed_at": refreshed_at,
  534. "is_active": 1,
  535. }
  536. )
  537. nested_group_rows, nested_topology_refs = _collect_group_and_topology_refs(
  538. project_key,
  539. children,
  540. refreshed_at=refreshed_at,
  541. parent_group_id=item_id,
  542. parent_group_path=group_path,
  543. )
  544. group_rows.extend(nested_group_rows)
  545. topology_refs.extend(nested_topology_refs)
  546. continue
  547. topology_refs.append(
  548. {
  549. "topology_id": item_id,
  550. "topology_name": item_name,
  551. "group_id": parent_group_id,
  552. "sort_index": sort_index,
  553. }
  554. )
  555. return group_rows, topology_refs
  556. def _as_int_list(raw_value: Any) -> list[int]:
  557. if not isinstance(raw_value, list):
  558. return []
  559. result: list[int] = []
  560. for item in raw_value:
  561. value = _safe_int(item)
  562. if value is None:
  563. continue
  564. result.append(value)
  565. return result
  566. def _record_deepest_entities(
  567. entity_best: dict[tuple[str, int], tuple[int, list[str]]],
  568. entity_node_ids: dict[tuple[str, int], list[str]],
  569. entity_type: str,
  570. entity_ids: list[int],
  571. *,
  572. depth: int,
  573. node_id: str,
  574. ) -> None:
  575. for entity_id in entity_ids:
  576. key = (entity_type, entity_id)
  577. existing = entity_best.get(key)
  578. if existing is None or depth > existing[0]:
  579. entity_best[key] = (depth, [node_id])
  580. entity_node_ids[key] = [node_id]
  581. continue
  582. if depth == existing[0] and node_id not in entity_node_ids[key]:
  583. entity_node_ids[key].append(node_id)
  584. def _parse_tree_topology(
  585. project_key: str,
  586. topology_id: int,
  587. diagram: list[Any],
  588. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
  589. node_rows: list[dict[str, Any]] = []
  590. edge_rows: list[dict[str, Any]] = []
  591. entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
  592. entity_node_ids: dict[tuple[str, int], list[str]] = {}
  593. def visit(
  594. node: dict[str, Any],
  595. parent_node_id: str | None,
  596. path_names: tuple[str, ...],
  597. sort_index: int,
  598. ) -> None:
  599. node_id = _text(node.get("id"))
  600. if not node_id:
  601. return
  602. node_name = _text(node.get("name")) or node_id
  603. children = (
  604. node.get("children") if isinstance(node.get("children"), list) else []
  605. )
  606. level = _safe_int(node.get("level")) or (len(path_names) + 1)
  607. path_text = " / ".join((*path_names, node_name))
  608. effective_parent_node_id = parent_node_id or (
  609. _text(node.get("parent_id")) or None
  610. )
  611. node_rows.append(
  612. {
  613. "project_key": project_key,
  614. "topology_id": topology_id,
  615. "node_id": node_id,
  616. "node_name": node_name,
  617. "parent_node_id": effective_parent_node_id,
  618. "level": level,
  619. "node_type_code": _safe_int(node.get("type")),
  620. "refer_id": _safe_int(node.get("refer_id")),
  621. "refer_level": _safe_int(node.get("refer_level")),
  622. "is_virtual": _bool_as_int(node.get("is_virtual")),
  623. "path_text": path_text,
  624. "child_count": len(children),
  625. "sort_index": sort_index,
  626. }
  627. )
  628. if effective_parent_node_id:
  629. edge_rows.append(
  630. {
  631. "project_key": project_key,
  632. "topology_id": topology_id,
  633. "source_node_id": effective_parent_node_id,
  634. "target_node_id": node_id,
  635. "sort_index": sort_index,
  636. }
  637. )
  638. _record_deepest_entities(
  639. entity_best,
  640. entity_node_ids,
  641. "meter",
  642. _as_int_list(node.get("meter_list")),
  643. depth=level,
  644. node_id=node_id,
  645. )
  646. _record_deepest_entities(
  647. entity_best,
  648. entity_node_ids,
  649. "device",
  650. _as_int_list(node.get("device_list")),
  651. depth=level,
  652. node_id=node_id,
  653. )
  654. for child_sort_index, child in enumerate(children, start=1):
  655. if not isinstance(child, dict):
  656. continue
  657. visit(child, node_id, (*path_names, node_name), child_sort_index)
  658. for root_sort_index, root in enumerate(diagram, start=1):
  659. if not isinstance(root, dict):
  660. continue
  661. visit(root, None, (), root_sort_index)
  662. entity_rows: list[dict[str, Any]] = []
  663. for (entity_type, entity_id), (depth, _) in entity_best.items():
  664. for node_id in entity_node_ids[(entity_type, entity_id)]:
  665. entity_rows.append(
  666. {
  667. "project_key": project_key,
  668. "entity_type": entity_type,
  669. "entity_id": entity_id,
  670. "topology_id": topology_id,
  671. "node_id": node_id,
  672. "depth": depth,
  673. }
  674. )
  675. return node_rows, edge_rows, entity_rows
  676. def _build_graph_node_context(
  677. nodes: list[dict[str, Any]],
  678. edges: list[dict[str, Any]],
  679. ) -> tuple[dict[str, int], dict[str, str | None], dict[str, str], dict[str, int]]:
  680. incoming: dict[str, list[str]] = defaultdict(list)
  681. outgoing: dict[str, list[str]] = defaultdict(list)
  682. for edge in edges:
  683. source_node_id = _text(edge.get("source"))
  684. target_node_id = _text(edge.get("target"))
  685. if not source_node_id or not target_node_id:
  686. continue
  687. outgoing[source_node_id].append(target_node_id)
  688. incoming[target_node_id].append(source_node_id)
  689. node_ids = [_text(node.get("id")) for node in nodes if _text(node.get("id"))]
  690. roots = [node_id for node_id in node_ids if not incoming.get(node_id)]
  691. if not roots:
  692. roots = node_ids[:1]
  693. level_map: dict[str, int] = {}
  694. path_map: dict[str, str] = {}
  695. parent_map: dict[str, str | None] = {node_id: None for node_id in node_ids}
  696. child_count_map: dict[str, int] = {
  697. node_id: len(outgoing.get(node_id, [])) for node_id in node_ids
  698. }
  699. node_name_map = {
  700. _text(node.get("id")): _text(node.get("name")) or _text(node.get("id"))
  701. for node in nodes
  702. }
  703. queue: deque[tuple[str, int, str]] = deque()
  704. for root_node_id in roots:
  705. root_name = node_name_map.get(root_node_id) or root_node_id
  706. queue.append((root_node_id, 1, root_name))
  707. while queue:
  708. node_id, depth, path_text = queue.popleft()
  709. previous = level_map.get(node_id)
  710. if previous is not None and previous >= depth:
  711. continue
  712. level_map[node_id] = depth
  713. path_map[node_id] = path_text
  714. for child_node_id in outgoing.get(node_id, []):
  715. if parent_map.get(child_node_id) is None:
  716. parent_map[child_node_id] = node_id
  717. child_name = node_name_map.get(child_node_id) or child_node_id
  718. queue.append((child_node_id, depth + 1, f"{path_text} / {child_name}"))
  719. for node in nodes:
  720. node_id = _text(node.get("id"))
  721. if not node_id:
  722. continue
  723. if node_id in level_map:
  724. continue
  725. fallback_level = _safe_int(node.get("level")) or 1
  726. node_name = _text(node.get("name")) or node_id
  727. level_map[node_id] = fallback_level
  728. path_map[node_id] = node_name
  729. return level_map, parent_map, path_map, child_count_map
  730. def _parse_graph_topology(
  731. project_key: str,
  732. topology_id: int,
  733. diagram: dict[str, Any],
  734. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
  735. raw_nodes = [item for item in diagram.get("nodes", []) if isinstance(item, dict)]
  736. raw_edges = [item for item in diagram.get("edges", []) if isinstance(item, dict)]
  737. level_map, parent_map, path_map, child_count_map = _build_graph_node_context(
  738. raw_nodes, raw_edges
  739. )
  740. node_rows: list[dict[str, Any]] = []
  741. edge_rows: list[dict[str, Any]] = []
  742. entity_best: dict[tuple[str, int], tuple[int, list[str]]] = {}
  743. entity_node_ids: dict[tuple[str, int], list[str]] = {}
  744. for sort_index, node in enumerate(raw_nodes, start=1):
  745. node_id = _text(node.get("id"))
  746. if not node_id:
  747. continue
  748. level = level_map.get(node_id) or _safe_int(node.get("level")) or 1
  749. node_rows.append(
  750. {
  751. "project_key": project_key,
  752. "topology_id": topology_id,
  753. "node_id": node_id,
  754. "node_name": _text(node.get("name")) or node_id,
  755. "parent_node_id": parent_map.get(node_id),
  756. "level": level,
  757. "node_type_code": _safe_int(node.get("type")),
  758. "refer_id": _safe_int(node.get("refer_id")),
  759. "refer_level": _safe_int(node.get("refer_level")),
  760. "is_virtual": _bool_as_int(node.get("is_virtual")),
  761. "path_text": path_map.get(node_id, _text(node.get("name")) or node_id),
  762. "child_count": child_count_map.get(node_id, 0),
  763. "sort_index": sort_index,
  764. }
  765. )
  766. _record_deepest_entities(
  767. entity_best,
  768. entity_node_ids,
  769. "meter",
  770. _as_int_list(node.get("meter_list")),
  771. depth=level,
  772. node_id=node_id,
  773. )
  774. _record_deepest_entities(
  775. entity_best,
  776. entity_node_ids,
  777. "device",
  778. _as_int_list(node.get("device_list")),
  779. depth=level,
  780. node_id=node_id,
  781. )
  782. for sort_index, edge in enumerate(raw_edges, start=1):
  783. source_node_id = _text(edge.get("source"))
  784. target_node_id = _text(edge.get("target"))
  785. if not source_node_id or not target_node_id:
  786. continue
  787. edge_rows.append(
  788. {
  789. "project_key": project_key,
  790. "topology_id": topology_id,
  791. "source_node_id": source_node_id,
  792. "target_node_id": target_node_id,
  793. "sort_index": sort_index,
  794. }
  795. )
  796. entity_rows: list[dict[str, Any]] = []
  797. for (entity_type, entity_id), (depth, _) in entity_best.items():
  798. for node_id in entity_node_ids[(entity_type, entity_id)]:
  799. entity_rows.append(
  800. {
  801. "project_key": project_key,
  802. "entity_type": entity_type,
  803. "entity_id": entity_id,
  804. "topology_id": topology_id,
  805. "node_id": node_id,
  806. "depth": depth,
  807. }
  808. )
  809. return node_rows, edge_rows, entity_rows
  810. def refresh_topology_cache(
  811. project_key: str, topology_ids: list[int] | None = None
  812. ) -> dict[str, Any]:
  813. project_key = _text(project_key)
  814. if not project_key:
  815. raise ValueError("project_key is required")
  816. ensure_topology_cache_tables()
  817. refreshed_at = _utc_now_iso()
  818. list_payload = api_list_topologies_with_group(project_key, group_ids=[])
  819. raw_items = list_payload.get("data")
  820. if not isinstance(raw_items, list):
  821. raise ValueError("topology list returned invalid data")
  822. group_rows, topology_refs = _collect_group_and_topology_refs(
  823. project_key, raw_items, refreshed_at=refreshed_at
  824. )
  825. requested_topology_ids = {item for item in topology_ids or []}
  826. available_topology_map = {item["topology_id"]: item for item in topology_refs}
  827. if requested_topology_ids:
  828. missing = sorted(requested_topology_ids - set(available_topology_map))
  829. if missing:
  830. raise ValueError(
  831. f"topology_id not found in upstream topology list: {missing}"
  832. )
  833. selected_topology_refs = [
  834. available_topology_map[item] for item in topology_ids or []
  835. ]
  836. else:
  837. selected_topology_refs = topology_refs
  838. registry_rows: list[dict[str, Any]] = []
  839. node_rows: list[dict[str, Any]] = []
  840. edge_rows: list[dict[str, Any]] = []
  841. entity_rows: list[dict[str, Any]] = []
  842. for topology_ref in selected_topology_refs:
  843. topology_id = topology_ref["topology_id"]
  844. detail_payload = api_get_topology(project_key, topology_id)
  845. detail_data = detail_payload.get("data")
  846. if not isinstance(detail_data, dict):
  847. raise ValueError(
  848. f"topology get returned invalid data for topology_id={topology_id}"
  849. )
  850. diagram = detail_data.get("diagram")
  851. if isinstance(diagram, list):
  852. root_shape = "tree"
  853. topology_node_rows, topology_edge_rows, topology_entity_rows = (
  854. _parse_tree_topology(project_key, topology_id, diagram)
  855. )
  856. elif isinstance(diagram, dict):
  857. root_shape = "graph"
  858. topology_node_rows, topology_edge_rows, topology_entity_rows = (
  859. _parse_graph_topology(project_key, topology_id, diagram)
  860. )
  861. else:
  862. root_shape = "tree"
  863. topology_node_rows, topology_edge_rows, topology_entity_rows = ([], [], [])
  864. registry_rows.append(
  865. {
  866. "project_key": project_key,
  867. "topology_id": topology_id,
  868. "topology_name": _text(detail_data.get("name"))
  869. or topology_ref["topology_name"],
  870. "topology_type": _safe_int(detail_data.get("type")) or 0,
  871. "object_type_code": _safe_int(detail_data.get("object")),
  872. "group_id": _safe_int(detail_data.get("group_id"))
  873. or topology_ref.get("group_id"),
  874. "root_shape": root_shape,
  875. "source_updated_time": _text(detail_data.get("updated_time")),
  876. "data_options_json": _dump_json_text(detail_data.get("data_options")),
  877. "dimension_config_json": _dump_json_text(
  878. detail_data.get("dimension_config")
  879. ),
  880. "refreshed_at": refreshed_at,
  881. "is_active": 1,
  882. }
  883. )
  884. node_rows.extend(topology_node_rows)
  885. edge_rows.extend(topology_edge_rows)
  886. entity_rows.extend(topology_entity_rows)
  887. with Session(sql_engine()) as session:
  888. if requested_topology_ids:
  889. session.execute(
  890. delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
  891. )
  892. session.add_all(TopologyGroup(**row) for row in group_rows)
  893. session.execute(
  894. delete(TopologyEntityIndex).where(
  895. TopologyEntityIndex.project_key == project_key,
  896. TopologyEntityIndex.topology_id.in_(requested_topology_ids),
  897. )
  898. )
  899. session.execute(
  900. delete(TopologyEdge).where(
  901. TopologyEdge.project_key == project_key,
  902. TopologyEdge.topology_id.in_(requested_topology_ids),
  903. )
  904. )
  905. session.execute(
  906. delete(TopologyNode).where(
  907. TopologyNode.project_key == project_key,
  908. TopologyNode.topology_id.in_(requested_topology_ids),
  909. )
  910. )
  911. session.execute(
  912. delete(TopologyRegistry).where(
  913. TopologyRegistry.project_key == project_key,
  914. TopologyRegistry.topology_id.in_(requested_topology_ids),
  915. )
  916. )
  917. else:
  918. session.execute(
  919. delete(TopologyEntityIndex).where(
  920. TopologyEntityIndex.project_key == project_key
  921. )
  922. )
  923. session.execute(
  924. delete(TopologyEdge).where(TopologyEdge.project_key == project_key)
  925. )
  926. session.execute(
  927. delete(TopologyNode).where(TopologyNode.project_key == project_key)
  928. )
  929. session.execute(
  930. delete(TopologyRegistry).where(
  931. TopologyRegistry.project_key == project_key
  932. )
  933. )
  934. session.execute(
  935. delete(TopologyGroup).where(TopologyGroup.project_key == project_key)
  936. )
  937. session.add_all(TopologyGroup(**row) for row in group_rows)
  938. session.add_all(TopologyRegistry(**row) for row in registry_rows)
  939. session.add_all(TopologyNode(**row) for row in node_rows)
  940. session.add_all(TopologyEdge(**row) for row in edge_rows)
  941. session.add_all(TopologyEntityIndex(**row) for row in entity_rows)
  942. session.commit()
  943. result = {
  944. "project_key": project_key,
  945. "refreshed_group_count": len(group_rows),
  946. "refreshed_topology_count": len(registry_rows),
  947. "refreshed_node_count": len(node_rows),
  948. "refreshed_edge_count": len(edge_rows),
  949. "refreshed_entity_index_count": len(entity_rows),
  950. "topology_ids": [row["topology_id"] for row in registry_rows],
  951. "refreshed_at": refreshed_at,
  952. }
  953. if not registry_rows:
  954. result["mcp_note"] = (
  955. f"Project '{project_key}' currently has no topology data in upstream config."
  956. )
  957. return result
  958. def _load_group_rows(session: Session, project_key: str) -> list[TopologyGroup]:
  959. return list(
  960. session.scalars(
  961. select(TopologyGroup)
  962. .where(TopologyGroup.project_key == project_key)
  963. .order_by(
  964. TopologyGroup.level.asc(),
  965. TopologyGroup.sort_index.asc(),
  966. TopologyGroup.group_id.asc(),
  967. )
  968. )
  969. )
  970. def _load_registry_rows(session: Session, project_key: str) -> list[TopologyRegistry]:
  971. return list(
  972. session.scalars(
  973. select(TopologyRegistry)
  974. .where(TopologyRegistry.project_key == project_key)
  975. .order_by(
  976. TopologyRegistry.topology_name.asc(), TopologyRegistry.topology_id.asc()
  977. )
  978. )
  979. )
  980. def _has_topology_cache(session: Session, project_key: str) -> bool:
  981. registry_rows = _load_registry_rows(session, project_key)
  982. return bool(registry_rows)
  983. def _require_topology_cache(session: Session, project_key: str) -> None:
  984. if _has_topology_cache(session, project_key):
  985. return
  986. raise ValueError(
  987. f"Project '{project_key}' has no cached topologies. Refresh it via "
  988. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  989. "already succeeded, the upstream project likely has no topology data."
  990. )
  991. def list_topology_groups(project_key: str) -> dict[str, Any]:
  992. project_key = _text(project_key)
  993. if not project_key:
  994. raise ValueError("project_key is required")
  995. ensure_topology_cache_tables()
  996. with Session(sql_engine()) as session:
  997. if not _has_topology_cache(session, project_key):
  998. return {
  999. "project_key": project_key,
  1000. "groups": [],
  1001. "total": 0,
  1002. "mcp_note": (
  1003. f"Project '{project_key}' has no cached topologies. Refresh it via "
  1004. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  1005. "already succeeded, the upstream project likely has no topology data."
  1006. ),
  1007. }
  1008. group_rows = _load_group_rows(session, project_key)
  1009. children_by_parent: dict[int | None, list[dict[str, Any]]] = defaultdict(list)
  1010. node_by_group_id: dict[int, dict[str, Any]] = {}
  1011. for group_row in group_rows:
  1012. payload = {
  1013. "group_id": group_row.group_id,
  1014. "group_name": group_row.group_name,
  1015. "parent_group_id": group_row.parent_group_id,
  1016. "level": group_row.level,
  1017. "group_path_text": group_row.group_path_text,
  1018. "children": [],
  1019. }
  1020. node_by_group_id[group_row.group_id] = payload
  1021. children_by_parent[group_row.parent_group_id].append(payload)
  1022. for group_payload in node_by_group_id.values():
  1023. group_payload["children"] = children_by_parent.get(
  1024. group_payload["group_id"], []
  1025. )
  1026. return {
  1027. "project_key": project_key,
  1028. "groups": children_by_parent.get(None, []),
  1029. "total": len(group_rows),
  1030. }
  1031. def _group_path_map(group_rows: list[TopologyGroup]) -> dict[int, str]:
  1032. return {group_row.group_id: group_row.group_path_text for group_row in group_rows}
  1033. def _collect_descendant_group_ids(
  1034. group_rows: list[TopologyGroup], group_id: int
  1035. ) -> set[int]:
  1036. group_children: dict[int | None, list[int]] = defaultdict(list)
  1037. for row in group_rows:
  1038. group_children[row.parent_group_id].append(row.group_id)
  1039. result: set[int] = set()
  1040. queue: deque[int] = deque([group_id])
  1041. while queue:
  1042. current_group_id = queue.popleft()
  1043. if current_group_id in result:
  1044. continue
  1045. result.add(current_group_id)
  1046. queue.extend(group_children.get(current_group_id, []))
  1047. return result
  1048. def list_topologies(
  1049. project_key: str,
  1050. *,
  1051. group_id: int | None = None,
  1052. object_type_code: int | None = None,
  1053. ) -> dict[str, Any]:
  1054. project_key = _text(project_key)
  1055. if not project_key:
  1056. raise ValueError("project_key is required")
  1057. ensure_topology_cache_tables()
  1058. with Session(sql_engine()) as session:
  1059. if not _has_topology_cache(session, project_key):
  1060. return {
  1061. "project_key": project_key,
  1062. "topologies": [],
  1063. "total": 0,
  1064. "mcp_note": (
  1065. f"Project '{project_key}' has no cached topologies. Refresh it via "
  1066. f"GET /topology/cache/refresh?project_key={project_key}. If the refresh "
  1067. "already succeeded, the upstream project likely has no topology data."
  1068. ),
  1069. }
  1070. group_rows = _load_group_rows(session, project_key)
  1071. registry_rows = _load_registry_rows(session, project_key)
  1072. group_path_map = _group_path_map(group_rows)
  1073. allowed_group_ids: set[int] | None = None
  1074. if group_id is not None:
  1075. allowed_group_ids = _collect_descendant_group_ids(group_rows, group_id)
  1076. topologies: list[dict[str, Any]] = []
  1077. for registry_row in registry_rows:
  1078. if (
  1079. allowed_group_ids is not None
  1080. and registry_row.group_id not in allowed_group_ids
  1081. ):
  1082. continue
  1083. if (
  1084. object_type_code is not None
  1085. and registry_row.object_type_code != object_type_code
  1086. ):
  1087. continue
  1088. topologies.append(
  1089. {
  1090. "topology_id": registry_row.topology_id,
  1091. "topology_name": registry_row.topology_name,
  1092. "topology_type": registry_row.topology_type,
  1093. "object_type_code": registry_row.object_type_code,
  1094. "group_id": registry_row.group_id,
  1095. "group_path_text": group_path_map.get(registry_row.group_id or -1, ""),
  1096. "root_shape": registry_row.root_shape,
  1097. "refreshed_at": registry_row.refreshed_at,
  1098. }
  1099. )
  1100. return {
  1101. "project_key": project_key,
  1102. "topologies": topologies,
  1103. "total": len(topologies),
  1104. }
  1105. def _get_registry_or_error(
  1106. session: Session, project_key: str, topology_id: int
  1107. ) -> TopologyRegistry:
  1108. registry_row = session.scalar(
  1109. select(TopologyRegistry).where(
  1110. TopologyRegistry.project_key == project_key,
  1111. TopologyRegistry.topology_id == topology_id,
  1112. )
  1113. )
  1114. if registry_row is None:
  1115. raise ValueError(f"topology_id not found in cache: {topology_id}")
  1116. return registry_row
  1117. def _get_node_or_error(
  1118. session: Session, project_key: str, topology_id: int, node_id: str
  1119. ) -> TopologyNode:
  1120. node_row = session.scalar(
  1121. select(TopologyNode).where(
  1122. TopologyNode.project_key == project_key,
  1123. TopologyNode.topology_id == topology_id,
  1124. TopologyNode.node_id == node_id,
  1125. )
  1126. )
  1127. if node_row is None:
  1128. raise ValueError(f"node_id not found in cache: {node_id}")
  1129. return node_row
  1130. def _resolve_root_node_id(
  1131. node_map: dict[str, TopologyNode], parents_by_node: dict[str, list[str]]
  1132. ) -> str:
  1133. root_ids = [node_id for node_id in node_map if not parents_by_node.get(node_id)]
  1134. if not root_ids:
  1135. raise ValueError("root node not found in cache")
  1136. root_ids.sort(
  1137. key=lambda item: (
  1138. node_map[item].level if node_map[item].level is not None else 10**9,
  1139. node_map[item].path_text or node_map[item].node_name,
  1140. item,
  1141. )
  1142. )
  1143. return root_ids[0]
  1144. def _normalize_requested_node_id(
  1145. raw_node_id: str,
  1146. node_map: dict[str, TopologyNode],
  1147. parents_by_node: dict[str, list[str]],
  1148. ) -> str:
  1149. normalized = _text(raw_node_id)
  1150. if normalized.lower() != "root":
  1151. return normalized
  1152. return _resolve_root_node_id(node_map, parents_by_node)
  1153. def _node_payload(node_row: TopologyNode) -> dict[str, Any]:
  1154. return {
  1155. "node_id": node_row.node_id,
  1156. "node_name": node_row.node_name,
  1157. "level": node_row.level,
  1158. "parent_node_id": node_row.parent_node_id,
  1159. "refer_id": node_row.refer_id,
  1160. "refer_level": node_row.refer_level,
  1161. "is_virtual": bool(node_row.is_virtual),
  1162. "path_text": node_row.path_text,
  1163. "child_count": node_row.child_count,
  1164. }
  1165. def _floor_hour_ts(ts: int) -> int:
  1166. return max(0, int(ts) - (int(ts) % 3600))
  1167. def _floor_day_ts(ts: int) -> int:
  1168. current = datetime.fromtimestamp(int(ts)).astimezone()
  1169. return int(
  1170. current.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
  1171. )
  1172. def _hourly_window_timestamps(base_ts: int) -> list[int]:
  1173. current_hour_ts = _floor_hour_ts(base_ts)
  1174. return [current_hour_ts - 3600 * offset for offset in range(1, 13)]
  1175. def _daily_window_timestamps(base_ts: int) -> list[int]:
  1176. current_day_ts = _floor_day_ts(base_ts)
  1177. return [current_day_ts - 86400 * offset for offset in range(0, 7)]
  1178. def _extract_topology_data_map(payload: Any) -> dict[str, Any]:
  1179. if not isinstance(payload, dict):
  1180. return {}
  1181. data = payload.get("data")
  1182. if not isinstance(data, dict):
  1183. return {}
  1184. return {str(node_id): node_values for node_id, node_values in data.items()}
  1185. def _fetch_topology_runtime_data(
  1186. project_key: str, topology_id: int, *, base_ts: int | None = None
  1187. ) -> dict[str, Any]:
  1188. effective_base_ts = int(base_ts if base_ts is not None else _current_unix_ts())
  1189. hourly_timestamps = _hourly_window_timestamps(effective_base_ts)
  1190. daily_timestamps = _daily_window_timestamps(effective_base_ts)
  1191. def fetch_instant() -> dict[str, Any]:
  1192. return _extract_topology_data_map(
  1193. api_get_topology_data(project_key, topology_id, display="instant")
  1194. )
  1195. def fetch_accu(accu_step: int, ts: int) -> tuple[int, dict[str, Any]]:
  1196. payload = api_get_topology_data(
  1197. project_key,
  1198. topology_id,
  1199. display="accu",
  1200. accu_step=accu_step,
  1201. ts=ts,
  1202. )
  1203. return ts, _extract_topology_data_map(payload)
  1204. max_workers = max(1, min(8, 1 + len(hourly_timestamps) + len(daily_timestamps)))
  1205. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  1206. instant_future = executor.submit(fetch_instant)
  1207. hourly_futures = [
  1208. executor.submit(fetch_accu, 2, ts_value) for ts_value in hourly_timestamps
  1209. ]
  1210. daily_futures = [
  1211. executor.submit(fetch_accu, 3, ts_value) for ts_value in daily_timestamps
  1212. ]
  1213. instant_map = instant_future.result()
  1214. hourly_series = [future.result() for future in hourly_futures]
  1215. daily_series = [future.result() for future in daily_futures]
  1216. return {
  1217. "base_ts": effective_base_ts,
  1218. "instant": instant_map,
  1219. "hourly": [
  1220. {"ts": ts_value, "data_map": data_map} for ts_value, data_map in hourly_series
  1221. ],
  1222. "daily": [
  1223. {"ts": ts_value, "data_map": data_map} for ts_value, data_map in daily_series
  1224. ],
  1225. "data_window": {
  1226. "hourly_ts": hourly_timestamps,
  1227. "daily_ts": daily_timestamps,
  1228. },
  1229. }
  1230. def _attach_runtime_data_to_node(
  1231. node_payload: dict[str, Any], runtime_bundle: dict[str, Any]
  1232. ) -> dict[str, Any]:
  1233. node_id = str(node_payload.get("node_id") or "").strip()
  1234. instant_map = runtime_bundle.get("instant") or {}
  1235. hourly_series = runtime_bundle.get("hourly") or []
  1236. daily_series = runtime_bundle.get("daily") or []
  1237. node_payload["data"] = {
  1238. "instant": instant_map.get(node_id),
  1239. "accu": {
  1240. "hourly": [
  1241. {
  1242. "ts": item["ts"],
  1243. "values": item["data_map"].get(node_id),
  1244. }
  1245. for item in hourly_series
  1246. ],
  1247. "daily": [
  1248. {
  1249. "ts": item["ts"],
  1250. "values": item["data_map"].get(node_id),
  1251. }
  1252. for item in daily_series
  1253. ],
  1254. },
  1255. }
  1256. return node_payload
  1257. def _attach_runtime_data_to_nodes(
  1258. nodes: list[dict[str, Any]], runtime_bundle: dict[str, Any]
  1259. ) -> list[dict[str, Any]]:
  1260. return [_attach_runtime_data_to_node(node_payload, runtime_bundle) for node_payload in nodes]
  1261. def _load_node_map(
  1262. session: Session, project_key: str, topology_id: int
  1263. ) -> dict[str, TopologyNode]:
  1264. rows = session.scalars(
  1265. select(TopologyNode).where(
  1266. TopologyNode.project_key == project_key,
  1267. TopologyNode.topology_id == topology_id,
  1268. )
  1269. )
  1270. return {row.node_id: row for row in rows}
  1271. def _load_adjacency(
  1272. session: Session, project_key: str, topology_id: int
  1273. ) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
  1274. edges = session.scalars(
  1275. select(TopologyEdge)
  1276. .where(
  1277. TopologyEdge.project_key == project_key,
  1278. TopologyEdge.topology_id == topology_id,
  1279. )
  1280. .order_by(TopologyEdge.sort_index.asc(), TopologyEdge.id.asc())
  1281. )
  1282. parents_by_node: dict[str, list[str]] = defaultdict(list)
  1283. children_by_node: dict[str, list[str]] = defaultdict(list)
  1284. for edge in edges:
  1285. parents_by_node[edge.target_node_id].append(edge.source_node_id)
  1286. children_by_node[edge.source_node_id].append(edge.target_node_id)
  1287. return parents_by_node, children_by_node
  1288. def _dedupe_preserve_order(node_ids: list[str]) -> list[str]:
  1289. result: list[str] = []
  1290. seen: set[str] = set()
  1291. for node_id in node_ids:
  1292. if node_id in seen:
  1293. continue
  1294. seen.add(node_id)
  1295. result.append(node_id)
  1296. return result
  1297. def _node_list_payload(
  1298. node_map: dict[str, TopologyNode], node_ids: list[str]
  1299. ) -> list[dict[str, Any]]:
  1300. payload: list[dict[str, Any]] = []
  1301. for node_id in _dedupe_preserve_order(node_ids):
  1302. node_row = node_map.get(node_id)
  1303. if node_row is None:
  1304. continue
  1305. payload.append(_node_payload(node_row))
  1306. return payload
  1307. def _collect_ancestors(
  1308. node_map: dict[str, TopologyNode],
  1309. parents_by_node: dict[str, list[str]],
  1310. node_id: str,
  1311. depth_limit: int,
  1312. ) -> list[dict[str, Any]]:
  1313. if depth_limit <= 0:
  1314. return []
  1315. visited: dict[str, int] = {}
  1316. queue: deque[tuple[str, int]] = deque(
  1317. (parent_id, 1) for parent_id in parents_by_node.get(node_id, [])
  1318. )
  1319. while queue:
  1320. current_node_id, distance = queue.popleft()
  1321. if distance > depth_limit:
  1322. continue
  1323. previous_distance = visited.get(current_node_id)
  1324. if previous_distance is not None and previous_distance <= distance:
  1325. continue
  1326. visited[current_node_id] = distance
  1327. for parent_id in parents_by_node.get(current_node_id, []):
  1328. queue.append((parent_id, distance + 1))
  1329. ordered_ids = sorted(
  1330. visited,
  1331. key=lambda item: (
  1332. -visited[item],
  1333. node_map[item].path_text or node_map[item].node_name,
  1334. item,
  1335. ),
  1336. )
  1337. result: list[dict[str, Any]] = []
  1338. for current_node_id in ordered_ids:
  1339. node_payload = _node_payload(node_map[current_node_id])
  1340. node_payload["distance"] = visited[current_node_id]
  1341. result.append(node_payload)
  1342. return result
  1343. def _collect_descendants(
  1344. node_map: dict[str, TopologyNode],
  1345. children_by_node: dict[str, list[str]],
  1346. node_id: str,
  1347. depth_limit: int,
  1348. ) -> list[dict[str, Any]]:
  1349. if depth_limit <= 0:
  1350. return []
  1351. visited: dict[str, int] = {}
  1352. queue: deque[tuple[str, int]] = deque(
  1353. (child_id, 1) for child_id in children_by_node.get(node_id, [])
  1354. )
  1355. while queue:
  1356. current_node_id, distance = queue.popleft()
  1357. if distance > depth_limit:
  1358. continue
  1359. previous_distance = visited.get(current_node_id)
  1360. if previous_distance is not None and previous_distance <= distance:
  1361. continue
  1362. visited[current_node_id] = distance
  1363. for child_id in children_by_node.get(current_node_id, []):
  1364. queue.append((child_id, distance + 1))
  1365. ordered_ids = sorted(
  1366. visited,
  1367. key=lambda item: (
  1368. visited[item],
  1369. node_map[item].path_text or node_map[item].node_name,
  1370. item,
  1371. ),
  1372. )
  1373. result: list[dict[str, Any]] = []
  1374. for current_node_id in ordered_ids:
  1375. node_payload = _node_payload(node_map[current_node_id])
  1376. node_payload["distance"] = visited[current_node_id]
  1377. result.append(node_payload)
  1378. return result
  1379. def _topology_metadata_payload(
  1380. registry_row: TopologyRegistry, group_path_text: str
  1381. ) -> dict[str, Any]:
  1382. data_options = _load_json_text(registry_row.data_options_json)
  1383. return {
  1384. "topology_id": registry_row.topology_id,
  1385. "topology_name": registry_row.topology_name,
  1386. "topology_type": registry_row.topology_type,
  1387. "object_type_code": registry_row.object_type_code,
  1388. "group_id": registry_row.group_id,
  1389. "group_path_text": group_path_text,
  1390. "root_shape": registry_row.root_shape,
  1391. "data_options": data_options,
  1392. "metric_definitions": _build_metric_definitions(data_options),
  1393. "dimension_config": _load_json_text(registry_row.dimension_config_json),
  1394. }
  1395. def get_topology_node(
  1396. project_key: str,
  1397. topology_id: int,
  1398. node_id: str = "root",
  1399. *,
  1400. include_siblings: bool = True,
  1401. include_children: bool = True,
  1402. ) -> dict[str, Any]:
  1403. project_key = _text(project_key)
  1404. node_id = _text(node_id)
  1405. if not project_key:
  1406. raise ValueError("project_key is required")
  1407. if not node_id:
  1408. raise ValueError("node_id is required")
  1409. ensure_topology_cache_tables()
  1410. with Session(sql_engine()) as session:
  1411. _require_topology_cache(session, project_key)
  1412. group_rows = _load_group_rows(session, project_key)
  1413. group_path_map = _group_path_map(group_rows)
  1414. registry_row = _get_registry_or_error(session, project_key, topology_id)
  1415. node_map = _load_node_map(session, project_key, topology_id)
  1416. parents_by_node, children_by_node = _load_adjacency(
  1417. session, project_key, topology_id
  1418. )
  1419. resolved_node_id = _normalize_requested_node_id(
  1420. node_id, node_map, parents_by_node
  1421. )
  1422. node_row = _get_node_or_error(
  1423. session, project_key, topology_id, resolved_node_id
  1424. )
  1425. runtime_bundle = _fetch_topology_runtime_data(project_key, topology_id)
  1426. parent_ids = parents_by_node.get(resolved_node_id, [])
  1427. child_ids = children_by_node.get(resolved_node_id, []) if include_children else []
  1428. sibling_ids: list[str] = []
  1429. if include_siblings and len(parent_ids) == 1:
  1430. sibling_ids = [
  1431. candidate
  1432. for candidate in children_by_node.get(parent_ids[0], [])
  1433. if candidate != resolved_node_id
  1434. ]
  1435. return {
  1436. "data_window": runtime_bundle["data_window"],
  1437. "topology": _topology_metadata_payload(
  1438. registry_row, group_path_map.get(registry_row.group_id or -1, "")
  1439. ),
  1440. "node": _attach_runtime_data_to_node(_node_payload(node_row), runtime_bundle),
  1441. "parents": _attach_runtime_data_to_nodes(
  1442. _node_list_payload(node_map, parent_ids), runtime_bundle
  1443. ),
  1444. "children": _attach_runtime_data_to_nodes(
  1445. _node_list_payload(node_map, child_ids), runtime_bundle
  1446. ),
  1447. "siblings": _attach_runtime_data_to_nodes(
  1448. _node_list_payload(node_map, sibling_ids), runtime_bundle
  1449. ),
  1450. }
  1451. def find_topology_context(
  1452. project_key: str,
  1453. entity_type: str,
  1454. entity_id: int,
  1455. *,
  1456. topology_id: int | None = None,
  1457. include_siblings: bool = True,
  1458. ancestor_depth: int = 5,
  1459. descendant_depth: int = 2,
  1460. ) -> dict[str, Any]:
  1461. project_key = _text(project_key)
  1462. normalized_entity_type = _normalize_entity_type(entity_type)
  1463. if not project_key:
  1464. raise ValueError("project_key is required")
  1465. if entity_id <= 0:
  1466. raise ValueError("entity_id must be a positive integer")
  1467. ensure_topology_cache_tables()
  1468. with Session(sql_engine()) as session:
  1469. _require_topology_cache(session, project_key)
  1470. group_rows = _load_group_rows(session, project_key)
  1471. group_path_map = _group_path_map(group_rows)
  1472. query = select(TopologyEntityIndex).where(
  1473. TopologyEntityIndex.project_key == project_key,
  1474. TopologyEntityIndex.entity_type == normalized_entity_type,
  1475. TopologyEntityIndex.entity_id == entity_id,
  1476. )
  1477. if topology_id is not None:
  1478. query = query.where(TopologyEntityIndex.topology_id == topology_id)
  1479. index_rows = list(
  1480. session.scalars(
  1481. query.order_by(
  1482. TopologyEntityIndex.depth.desc(),
  1483. TopologyEntityIndex.topology_id.asc(),
  1484. )
  1485. )
  1486. )
  1487. matches: list[dict[str, Any]] = []
  1488. topology_node_maps: dict[int, dict[str, TopologyNode]] = {}
  1489. topology_adjacency: dict[
  1490. int, tuple[dict[str, list[str]], dict[str, list[str]]]
  1491. ] = {}
  1492. topology_registry_rows: dict[int, TopologyRegistry] = {}
  1493. for index_row in index_rows:
  1494. current_topology_id = index_row.topology_id
  1495. if current_topology_id not in topology_registry_rows:
  1496. topology_registry_rows[current_topology_id] = _get_registry_or_error(
  1497. session, project_key, current_topology_id
  1498. )
  1499. topology_node_maps[current_topology_id] = _load_node_map(
  1500. session, project_key, current_topology_id
  1501. )
  1502. topology_adjacency[current_topology_id] = _load_adjacency(
  1503. session, project_key, current_topology_id
  1504. )
  1505. node_map = topology_node_maps[current_topology_id]
  1506. node_row = node_map.get(index_row.node_id)
  1507. if node_row is None:
  1508. continue
  1509. parents_by_node, children_by_node = topology_adjacency[current_topology_id]
  1510. parent_ids = parents_by_node.get(index_row.node_id, [])
  1511. child_ids = children_by_node.get(index_row.node_id, [])
  1512. sibling_ids: list[str] = []
  1513. if include_siblings and len(parent_ids) == 1:
  1514. sibling_ids = [
  1515. candidate
  1516. for candidate in children_by_node.get(parent_ids[0], [])
  1517. if candidate != index_row.node_id
  1518. ]
  1519. matches.append(
  1520. {
  1521. "topology": _topology_metadata_payload(
  1522. topology_registry_rows[current_topology_id],
  1523. group_path_map.get(
  1524. topology_registry_rows[current_topology_id].group_id or -1,
  1525. "",
  1526. ),
  1527. ),
  1528. "self": _node_payload(node_row),
  1529. "parents": _node_list_payload(node_map, parent_ids),
  1530. "children": _node_list_payload(node_map, child_ids),
  1531. "ancestors": _collect_ancestors(
  1532. node_map,
  1533. parents_by_node,
  1534. index_row.node_id,
  1535. max(ancestor_depth, 0),
  1536. ),
  1537. "descendants": _collect_descendants(
  1538. node_map,
  1539. children_by_node,
  1540. index_row.node_id,
  1541. max(descendant_depth, 0),
  1542. ),
  1543. "siblings": _node_list_payload(node_map, sibling_ids),
  1544. }
  1545. )
  1546. return {
  1547. "query": {
  1548. "project_key": project_key,
  1549. "entity_type": normalized_entity_type,
  1550. "entity_id": entity_id,
  1551. "topology_id": topology_id,
  1552. },
  1553. "matches": matches,
  1554. "total_matches": len(matches),
  1555. }