from __future__ import annotations import os from typing import Annotated from typing import Any from fastmcp import FastMCP from pydantic import Field from starlette.requests import Request from starlette.responses import JSONResponse, Response from . import __version__ from .auth import load_projects_config from .config_api import ( list_device_types as api_list_device_types, list_locations as api_list_locations, list_meter_types as api_list_meter_types, list_system_tree as api_list_system_tree, list_systems as api_list_systems, search_devices as api_search_devices, search_meters as api_search_meters, search_points as api_search_points, ) from .topology_cache import ( find_topology_context, get_topology_group_config, get_topology_node, list_topologies, list_topology_groups, refresh_topology_cache, ) mcp = FastMCP("instrument-config") @mcp.tool(name="version.get") def get_version() -> dict[str, str]: """Get the MCP project version.""" return { "name": "instrument-config-mcp", "version": __version__, } @mcp.tool( name="project.list", title="Project List", description="List enabled projects configured in sys_config for instrument-config tools. 在执行其他工具前,询问用户要查询哪一个项目,根据用户的选择查询对应项目。", tags={"project", "list"}, ) def project_list() -> dict[str, Any]: projects = load_projects_config() result: list[dict[str, Any]] = [] for item in projects: if not item["enabled"]: continue result.append( { "project_key": item["project_key"], "project_name": item["project_name"], } ) result.sort(key=lambda item: item["project_key"]) return { "projects": result, "total": len(result), } def _append_next_page_hint(payload: Any, page_num: int) -> Any: if not isinstance(payload, dict): return payload data = payload.get("data") if not isinstance(data, dict): return payload total_page = data.get("total_page") if isinstance(total_page, int) and total_page > page_num: payload.setdefault( "mcp_note", f"Current result is page {page_num}. If the target was not found, continue with page_num={page_num + 1}.", ) return payload def _parse_bool_query(raw_value: str | None) -> bool: text = str(raw_value or "").strip().lower() if text in {"1", "true", "yes", "y", "on"}: return True if text in {"0", "false", "no", "n", "off", ""}: return False raise ValueError(f"invalid boolean query value: {raw_value}") def _parse_int_list_query(values: list[str]) -> list[int]: result: list[int] = [] for item in values: text = str(item or "").strip() if not text: continue result.append(int(text)) return result @mcp.custom_route("/topology/cache/refresh", methods=["GET"], include_in_schema=False) async def refresh_topology_cache_route(request: Request) -> Response: try: project_key = str(request.query_params.get("project_key") or "").strip() if not project_key: raise ValueError("project_key is required") topology_ids = _parse_int_list_query( request.query_params.getlist("topology_id") ) force = _parse_bool_query(request.query_params.get("force")) del force # Manual refresh always rebuilds the requested cache scope. payload = refresh_topology_cache(project_key, topology_ids=topology_ids or None) return JSONResponse(payload) except Exception as exc: return JSONResponse({"error": str(exc)}, status_code=400) @mcp.tool() def list_locations( project_key: str, keyword: str = "", page_size: int = 100, page_num: int = 1 ) -> Any: """List location data from the config API.""" payload = api_list_locations( project_key, keyword=keyword, page_size=page_size, page_num=page_num ) return _append_next_page_hint(payload, page_num) @mcp.tool() def list_system_tree(project_key: str) -> Any: """Get the full system tree from the config API.""" return api_list_system_tree(project_key) @mcp.tool() def list_systems( project_key: str, page_size: int = 100, page_num: int = 1, system_type_id: int = 0, show_below: bool = True, ) -> Any: """List systems by system type.""" payload = api_list_systems( project_key, page_size=page_size, page_num=page_num, system_type_id=system_type_id, show_below=show_below, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def list_device_types(project_key: str) -> Any: """List all device types.""" return api_list_device_types(project_key) @mcp.tool() def list_meter_types(project_key: str) -> Any: """List all meter types.""" return api_list_meter_types(project_key) @mcp.tool() def search_devices( project_key: str, page_size: int = 100, page_num: int = 1, keyword: str = "", location_id: int = 0, show_below: bool = True, system_ids: list[int] | None = None, device_type_ids: list[int] | None = None, ) -> Any: """Search devices with id-based filters.""" payload = api_search_devices( project_key, page_size=page_size, page_num=page_num, keyword=keyword, location_id=location_id, show_below=show_below, system_ids=system_ids, device_type_ids=device_type_ids, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def search_meters( project_key: str, page_size: int = 100, page_num: int = 1, keyword: str = "", location_id: int = 0, show_below: bool = True, meter_type_id: int = 0, measurement_location_ids: list[int] | None = None, measurement_system_ids: list[int] | None = None, measurement_device_type_ids: list[int] | None = None, status: int | None = None, ) -> Any: """Search meters with id-based filters.""" payload = api_search_meters( project_key, page_size=page_size, page_num=page_num, keyword=keyword, location_id=location_id, show_below=show_below, meter_type_id=meter_type_id, measurement_location_ids=measurement_location_ids, measurement_system_ids=measurement_system_ids, measurement_device_type_ids=measurement_device_type_ids, status=status, ) return _append_next_page_hint(payload, page_num) @mcp.tool() def search_points( project_key: str, id: int, page_size: int = 100, page_num: int = 1 ) -> Any: """Search points under a meter by meter id.""" payload = api_search_points( project_key, id=id, page_size=page_size, page_num=page_num ) return _append_next_page_hint(payload, page_num) @mcp.tool(name="topology.group_list") def topology_group_list(project_key: str) -> dict[str, Any]: """List topology groups to help locate device and meter relationships.""" return list_topology_groups(project_key) @mcp.tool(name="topology.list") def topology_list( project_key: str, group_id: int | None = None, object_type_code: int | None = None ) -> dict[str, Any]: """List topologies for checking device and meter hierarchy relationships.""" return list_topologies( project_key, group_id=group_id, object_type_code=object_type_code ) @mcp.tool(name="topology.get_node") def topology_get_node( project_key: str, topology_id: int, node_id: Annotated[ str, Field( description=( "Node ID in the target topology. Use 'root' to resolve the " "topology root node automatically." ) ), ] = "root", include_siblings: bool = True, include_children: bool = True, ) -> dict[str, Any]: """Get one topology node and its direct parent-child relationships.""" return get_topology_node( project_key, topology_id, node_id, include_siblings=include_siblings, include_children=include_children, ) @mcp.tool(name="topology.get_group_config") def topology_get_group_config(project_key: str, topology_id: int) -> dict[str, Any]: """Get grouping and filter configuration derived from topology dimension_config.""" return get_topology_group_config(project_key, topology_id) @mcp.tool(name="topology.find_context") def topology_find_context( project_key: str, entity_type: Annotated[ str, Field( description=( "Entity type for topology relationship lookup. Only 'meter' or " "'device' are accepted." ) ), ], entity_id: int, topology_id: int | None = None, include_siblings: bool = True, ancestor_depth: int = 5, descendant_depth: int = 2, ) -> dict[str, Any]: """Find topology context for a meter or device, including hierarchy relationships.""" return find_topology_context( project_key, entity_type, entity_id, topology_id=topology_id, include_siblings=include_siblings, ancestor_depth=ancestor_depth, descendant_depth=descendant_depth, ) def main() -> None: host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0" port = int(os.getenv("MCP_PORT", "8500")) path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp" mcp.run(transport="http", host=host, port=port, path=path, stateless_http=True) if __name__ == "__main__": main()