| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- from __future__ import annotations
- import os
- from typing import Annotated
- from typing import Any
- from fastmcp import FastMCP
- from pydantic import Field
- from starlette.requests import Request
- from starlette.responses import JSONResponse, Response
- from .auth import load_projects_config
- from .config_api import (
- list_device_types as api_list_device_types,
- list_locations as api_list_locations,
- list_meter_types as api_list_meter_types,
- list_system_tree as api_list_system_tree,
- list_systems as api_list_systems,
- search_devices as api_search_devices,
- search_meters as api_search_meters,
- search_points as api_search_points,
- )
- from .topology_cache import (
- find_topology_context,
- get_topology_node,
- list_topologies,
- list_topology_groups,
- refresh_topology_cache,
- )
- mcp = FastMCP("instrument-config")
- @mcp.tool(
- name="project.list",
- title="Project List",
- description="List enabled projects configured in sys_config for instrument-config tools. 在执行其他工具前,询问用户要查询哪一个项目,根据用户的选择查询对应项目。",
- tags={"project", "list"},
- )
- def project_list() -> dict[str, Any]:
- projects = load_projects_config()
- result: list[dict[str, Any]] = []
- for item in projects:
- if not item["enabled"]:
- continue
- result.append(
- {
- "project_key": item["project_key"],
- "project_name": item["project_name"],
- }
- )
- result.sort(key=lambda item: item["project_key"])
- return {
- "projects": result,
- "total": len(result),
- }
- def _append_next_page_hint(payload: Any, page_num: int) -> Any:
- if not isinstance(payload, dict):
- return payload
- data = payload.get("data")
- if not isinstance(data, dict):
- return payload
- total_page = data.get("total_page")
- if isinstance(total_page, int) and total_page > page_num:
- payload.setdefault(
- "mcp_note",
- f"Current result is page {page_num}. If the target was not found, continue with page_num={page_num + 1}.",
- )
- return payload
- def _parse_bool_query(raw_value: str | None) -> bool:
- text = str(raw_value or "").strip().lower()
- if text in {"1", "true", "yes", "y", "on"}:
- return True
- if text in {"0", "false", "no", "n", "off", ""}:
- return False
- raise ValueError(f"invalid boolean query value: {raw_value}")
- def _parse_int_list_query(values: list[str]) -> list[int]:
- result: list[int] = []
- for item in values:
- text = str(item or "").strip()
- if not text:
- continue
- result.append(int(text))
- return result
- @mcp.custom_route("/topology/cache/refresh", methods=["GET"], include_in_schema=False)
- async def refresh_topology_cache_route(request: Request) -> Response:
- try:
- project_key = str(request.query_params.get("project_key") or "").strip()
- if not project_key:
- raise ValueError("project_key is required")
- topology_ids = _parse_int_list_query(
- request.query_params.getlist("topology_id")
- )
- force = _parse_bool_query(request.query_params.get("force"))
- del force # Manual refresh always rebuilds the requested cache scope.
- payload = refresh_topology_cache(project_key, topology_ids=topology_ids or None)
- return JSONResponse(payload)
- except Exception as exc:
- return JSONResponse({"error": str(exc)}, status_code=400)
- @mcp.tool()
- def list_locations(
- project_key: str, keyword: str = "", page_size: int = 100, page_num: int = 1
- ) -> Any:
- """List location data from the config API."""
- payload = api_list_locations(
- project_key, keyword=keyword, page_size=page_size, page_num=page_num
- )
- return _append_next_page_hint(payload, page_num)
- @mcp.tool()
- def list_system_tree(project_key: str) -> Any:
- """Get the full system tree from the config API."""
- return api_list_system_tree(project_key)
- @mcp.tool()
- def list_systems(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- system_type_id: int = 0,
- show_below: bool = True,
- ) -> Any:
- """List systems by system type."""
- payload = api_list_systems(
- project_key,
- page_size=page_size,
- page_num=page_num,
- system_type_id=system_type_id,
- show_below=show_below,
- )
- return _append_next_page_hint(payload, page_num)
- @mcp.tool()
- def list_device_types(project_key: str) -> Any:
- """List all device types."""
- return api_list_device_types(project_key)
- @mcp.tool()
- def list_meter_types(project_key: str) -> Any:
- """List all meter types."""
- return api_list_meter_types(project_key)
- @mcp.tool()
- def search_devices(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- keyword: str = "",
- location_id: int = 0,
- show_below: bool = True,
- system_ids: list[int] | None = None,
- device_type_ids: list[int] | None = None,
- ) -> Any:
- """Search devices with id-based filters."""
- payload = api_search_devices(
- project_key,
- page_size=page_size,
- page_num=page_num,
- keyword=keyword,
- location_id=location_id,
- show_below=show_below,
- system_ids=system_ids,
- device_type_ids=device_type_ids,
- )
- return _append_next_page_hint(payload, page_num)
- @mcp.tool()
- def search_meters(
- project_key: str,
- page_size: int = 100,
- page_num: int = 1,
- keyword: str = "",
- location_id: int = 0,
- show_below: bool = True,
- meter_type_id: int = 0,
- measurement_location_ids: list[int] | None = None,
- measurement_system_ids: list[int] | None = None,
- measurement_device_type_ids: list[int] | None = None,
- status: int | None = None,
- ) -> Any:
- """Search meters with id-based filters."""
- payload = api_search_meters(
- project_key,
- page_size=page_size,
- page_num=page_num,
- keyword=keyword,
- location_id=location_id,
- show_below=show_below,
- meter_type_id=meter_type_id,
- measurement_location_ids=measurement_location_ids,
- measurement_system_ids=measurement_system_ids,
- measurement_device_type_ids=measurement_device_type_ids,
- status=status,
- )
- return _append_next_page_hint(payload, page_num)
- @mcp.tool()
- def search_points(
- project_key: str, id: int, page_size: int = 100, page_num: int = 1
- ) -> Any:
- """Search points under a meter by meter id."""
- payload = api_search_points(
- project_key, id=id, page_size=page_size, page_num=page_num
- )
- return _append_next_page_hint(payload, page_num)
- @mcp.tool(name="topology.group_list")
- def topology_group_list(project_key: str) -> dict[str, Any]:
- """List topology groups to help locate device and meter relationships."""
- return list_topology_groups(project_key)
- @mcp.tool(name="topology.list")
- def topology_list(
- project_key: str, group_id: int | None = None, object_type_code: int | None = None
- ) -> dict[str, Any]:
- """List topologies for checking device and meter hierarchy relationships."""
- return list_topologies(
- project_key, group_id=group_id, object_type_code=object_type_code
- )
- @mcp.tool(name="topology.get_node")
- def topology_get_node(
- project_key: str,
- topology_id: int,
- node_id: Annotated[
- str,
- Field(
- description=(
- "Node ID in the target topology. Use 'root' to resolve the "
- "topology root node automatically."
- )
- ),
- ] = "root",
- include_siblings: bool = True,
- include_children: bool = True,
- ) -> dict[str, Any]:
- """Get one topology node and its direct parent-child relationships."""
- return get_topology_node(
- project_key,
- topology_id,
- node_id,
- include_siblings=include_siblings,
- include_children=include_children,
- )
- @mcp.tool(name="topology.find_context")
- def topology_find_context(
- project_key: str,
- entity_type: Annotated[
- str,
- Field(
- description=(
- "Entity type for topology relationship lookup. Only 'meter' or "
- "'device' are accepted."
- )
- ),
- ],
- entity_id: int,
- topology_id: int | None = None,
- include_siblings: bool = True,
- ancestor_depth: int = 5,
- descendant_depth: int = 2,
- ) -> dict[str, Any]:
- """Find topology context for a meter or device, including hierarchy relationships."""
- return find_topology_context(
- project_key,
- entity_type,
- entity_id,
- topology_id=topology_id,
- include_siblings=include_siblings,
- ancestor_depth=ancestor_depth,
- descendant_depth=descendant_depth,
- )
- def main() -> None:
- host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0"
- port = int(os.getenv("MCP_PORT", "8500"))
- path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp"
- mcp.run(transport="http", host=host, port=port, path=path, stateless_http=True)
- if __name__ == "__main__":
- main()
|