from __future__ import annotations import os from typing import Any from fastmcp import FastMCP from starlette.requests import Request from starlette.responses import JSONResponse, Response from .auth import load_projects_config from .config_api import ( list_device_types as api_list_device_types, list_locations as api_list_locations, list_meter_types as api_list_meter_types, list_system_tree as api_list_system_tree, list_systems as api_list_systems, search_devices as api_search_devices, search_meters as api_search_meters, search_points as api_search_points, ) from .topology_cache import ( find_topology_context, get_topology_node, list_topologies, list_topology_groups, refresh_topology_cache, ) mcp = FastMCP("instrument-config") @mcp.tool( name="project.list", title="Project List", description="List enabled projects configured in sys_config for instrument-config tools. 在执行其他工具前,询问用户要查询哪一个项目,根据用户的选择查询对应项目。", tags={"project", "list"}, ) def project_list() -> dict[str, Any]: projects = load_projects_config() result: list[dict[str, Any]] = [] for item in projects: if not item["enabled"]: continue result.append( { "project_key": item["project_key"], "project_name": item["project_name"], } ) result.sort(key=lambda item: item["project_key"]) return { "projects": result, "total": len(result), } def _append_next_page_hint(payload: Any, page_num: int) -> Any: if not isinstance(payload, dict): return payload data = payload.get("data") if not isinstance(data, dict): return payload total_page = data.get("total_page") if isinstance(total_page, int) and total_page > page_num: payload.setdefault( "mcp_note", f"Current result is page {page_num}. If the target was not found, continue with page_num={page_num + 1}.", ) return payload def _parse_bool_query(raw_value: str | None) -> bool: text = str(raw_value or "").strip().lower() if text in {"1", "true", "yes", "y", "on"}: return True if text in {"0", "false", "no", "n", "off", ""}: return False raise ValueError(f"invalid boolean query value: {raw_value}") def _parse_int_list_query(values: list[str]) -> list[int]: result: list[int] = [] for item in values: text = str(item or "").strip() if not text: continue result.append(int(text)) return result @mcp.custom_route("/topology/cache/refresh", methods=["GET"], include_in_schema=False) async def refresh_topology_cache_route(request: Request) -> Response: try: project_key = str(request.query_params.get("project_key") or "").strip() if not project_key: raise ValueError("project_key is required") topology_ids = _parse_int_list_query( request.query_params.getlist("topology_id") ) force = _parse_bool_query(request.query_params.get("force")) del force # Manual refresh always rebuilds the requested cache scope. payload = refresh_topology_cache(project_key, topology_ids=topology_ids or None) return JSONResponse(payload) except Exception as exc: return JSONResponse({"error": str(exc)}, status_code=400) @mcp.tool() def list_locations( project_key: str, keyword: str = "", page_size: int = 100, page_num: int = 1 ) -> Any: """List location data from the config API.""" payload = api_list_locations( project_key, keyword=keyword, page_size=page_size, page_num=page_num ) return _append_next_page_hint(payload, page_num) @mcp.tool() def list_system_tree(project_key: str) -> Any: """Get the full system tree from the config API.""" return api_list_system_tree(project_key) @mcp.tool() def list_systems( project_key: str, page_size: int = 100, page_num: int = 1, system_type_id: int = 0, show_below: bool = True, ) -> Any: """List systems by system type.""" payload = api_list_systems( project_key, page_size=page_size, page_num=page_num, system_type_id=system_type_id, show_below=show_below, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def list_device_types(project_key: str) -> Any: """List all device types.""" return api_list_device_types(project_key) @mcp.tool() def list_meter_types(project_key: str) -> Any: """List all meter types.""" return api_list_meter_types(project_key) @mcp.tool() def search_devices( project_key: str, page_size: int = 100, page_num: int = 1, keyword: str = "", location_id: int = 0, show_below: bool = True, system_ids: list[int] | None = None, device_type_ids: list[int] | None = None, ) -> Any: """Search devices with id-based filters.""" payload = api_search_devices( project_key, page_size=page_size, page_num=page_num, keyword=keyword, location_id=location_id, show_below=show_below, system_ids=system_ids, device_type_ids=device_type_ids, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def search_meters( project_key: str, page_size: int = 100, page_num: int = 1, keyword: str = "", location_id: int = 0, show_below: bool = True, meter_type_id: int = 0, measurement_location_ids: list[int] | None = None, measurement_system_ids: list[int] | None = None, measurement_device_type_ids: list[int] | None = None, status: int | None = None, ) -> Any: """Search meters with id-based filters.""" payload = api_search_meters( project_key, page_size=page_size, page_num=page_num, keyword=keyword, location_id=location_id, show_below=show_below, meter_type_id=meter_type_id, measurement_location_ids=measurement_location_ids, measurement_system_ids=measurement_system_ids, measurement_device_type_ids=measurement_device_type_ids, status=status, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def search_points( project_key: str, id: int, page_size: int = 100, page_num: int = 1 ) -> Any: """Search points under a meter by meter id.""" payload = api_search_points( project_key, id=id, page_size=page_size, page_num=page_num ) return _append_next_page_hint(payload, page_num) @mcp.tool(name="topology.group_list") def topology_group_list(project_key: str) -> dict[str, Any]: """List cached topology groups as a tree.""" return list_topology_groups(project_key) @mcp.tool(name="topology.list") def topology_list( project_key: str, group_id: int | None = None, object_type_code: int | None = None ) -> dict[str, Any]: """List cached topologies, optionally filtered by group or object type.""" return list_topologies( project_key, group_id=group_id, object_type_code=object_type_code ) @mcp.tool(name="topology.get_node") def topology_get_node( project_key: str, topology_id: int, node_id: str, include_siblings: bool = True, include_children: bool = True, ) -> dict[str, Any]: """Get one cached topology node with its immediate neighborhood.""" return get_topology_node( project_key, topology_id, node_id, include_siblings=include_siblings, include_children=include_children, ) @mcp.tool(name="topology.find_context") def topology_find_context( project_key: str, entity_type: str, entity_id: int, topology_id: int | None = None, include_siblings: bool = True, ancestor_depth: int = 5, descendant_depth: int = 2, ) -> dict[str, Any]: """Find cached topology context for a device or meter.""" return find_topology_context( project_key, entity_type, entity_id, topology_id=topology_id, include_siblings=include_siblings, ancestor_depth=ancestor_depth, descendant_depth=descendant_depth, ) def main() -> None: host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0" port = int(os.getenv("MCP_PORT", "8500")) path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp" mcp.run(transport="http", host=host, port=port, path=path) if __name__ == "__main__": main()