| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- from __future__ import annotations
- import os
- from typing import Any
- import uvicorn
- from fastmcp import FastMCP
- from .auth import load_projects_config
- from .collector_api import (
- create_modbus_device as api_create_modbus_device,
- create_modbus_point as api_create_modbus_point,
- list_devices as api_list_devices,
- )
- from .gateway_api import modbus_point_collect_test as api_modbus_point_collect_test
- SERVER_INSTRUCTIONS = (
- "Data collector tools. Use project.list first to choose a project_key. "
- "base_url is used for login and gateway test APIs. data_collector_base_url "
- "is required for collector management APIs and never falls back to base_url. "
- "For Modbus, gateway word_byte_order values ABCD/BADC/CDAB/DCBA must be "
- "mapped to collector byte_order/word_order before creating a device."
- )
- mcp = FastMCP("data-collector-mcp", instructions=SERVER_INSTRUCTIONS)
- @mcp.tool(
- name="project.list",
- title="Project List",
- description="List enabled 汇采 projects available to this MCP service. Call this first to choose project_key.",
- tags={"project", "list"},
- )
- def project_list() -> dict[str, Any]:
- projects = load_projects_config()
- result = [
- {
- "project_key": item["project_key"],
- "project_name": item["project_name"],
- }
- for item in projects
- if item["enabled"]
- ]
- result.sort(key=lambda item: item["project_key"])
- return {"projects": result, "total": len(result)}
- @mcp.tool(
- name="modbus.point_collect_test",
- description=(
- "通过采集网关读取 Modbus TCP 点位并转换为业务值。调用 "
- "{base_url}/api/dc-gateway/modbus/read_points。function_code: "
- "1=Read Coils/线圈,2=Read Discrete Inputs/离散输入,"
- "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。"
- "word_byte_order 可选 ABCD、BADC、CDAB、DCBA。若读取成功后要创建汇采设备,"
- "映射为 byte_order/word_order: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。"
- "响应透传上游 JSON,code=0 表示业务成功。"
- ),
- )
- def modbus_point_collect_test(
- project_key: str,
- ip: str,
- port: int,
- slave_id: int,
- points: list[dict[str, Any]],
- device_type: str = "ModbusTCP",
- word_byte_order: str = "ABCD",
- address_base: int = 0,
- ) -> dict[str, Any]:
- return api_modbus_point_collect_test(
- project_key,
- ip=ip,
- port=port,
- slave_id=slave_id,
- points=points,
- device_type=device_type,
- word_byte_order=word_byte_order,
- address_base=address_base,
- )
- @mcp.tool(
- name="collector.modbus_device_create",
- description=(
- "汇采-创建 Modbus 设备。调用 {data_collector_base_url}/api/collector/device,"
- "需要登录后使用 Authorization。创建设备必须传 payload.device_type 协议类型、"
- "payload.ip IP 地址、payload.port 端口号、payload.name 名称、payload.slave_id、"
- "payload.word_order 字顺序、payload.byte_order 字节顺序、payload.address_base。"
- "payload.address_base 会转换为汇采接口的 address_offset。"
- "payload 会补齐默认值: type=modbus, timeout=3, is_persistent=true, group_id=0, "
- "alarm_interval=90, collect_interval=5, retry_times=0。"
- "注意 byte_order/word_order 是汇采枚举,不是网关 word_byte_order。"
- "byte_order: 1=Big Endian, 2=Small Endian。word_order: 1=Big Endian, 2=Small Endian。"
- "device_type: 1=TCP, 2=RTU, 3=UDP, 4=RTU OVER TCP, 5=RTU OVER UDP。"
- "采集网关 word_byte_order 映射: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_device_create(
- project_key: str,
- payload: dict[str, Any],
- ) -> dict[str, Any]:
- return api_create_modbus_device(project_key, payload)
- @mcp.tool(
- name="collector.modbus_point_create",
- description=(
- "汇采-创建 Modbus 采集点位。调用 "
- "{data_collector_base_url}/api/collector/modbus/point/add_collect_point。"
- "创建点位必须传 payload.name 名称、payload.address 寄存器地址、"
- "payload.type 数据类型,以及 payload.func_code 或 payload.register_type 寄存器类型。"
- "payload 会补齐默认值: scale_ratio=1, value_offset=0, group_id=0, "
- "invalid_values='', valid_range_start=null, valid_range_end=null, bit=0。"
- "func_code: 1=Read Coils/线圈,2=Read Discrete Inputs/离散输入,"
- "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。"
- "register_type 可用 coil、discrete_input、holding_register、input_register。"
- "数据类型应使用汇采类型: bool, int16, uint16, int32, uint32, int64, uint64, float32, float64。"
- "常见点表类型映射: BOOL=>bool, SHORT=>int16, WORD=>uint16, LONG=>int32, "
- "DWORD=>uint32, FLOAT/REAL=>float32, DOUBLE=>float64, LONGLONG=>int64, QWORD=>uint64。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_point_create(
- project_key: str,
- payload: dict[str, Any],
- ) -> dict[str, Any]:
- return api_create_modbus_point(project_key, payload)
- @mcp.tool(
- name="collector.device_list",
- description=(
- "汇采-查询设备列表。调用 {data_collector_base_url}/api/collector/device。"
- "num_points 默认 false;只有需要统计设备或点位分组下的点位数量时才传 true。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_device_list(project_key: str, num_points: bool = False) -> dict[str, Any]:
- return api_list_devices(project_key, num_points=num_points)
- def build_mcp_http_app(path: str | None = None):
- effective_path = str(path or os.getenv("MCP_PATH", "/mcp")).strip() or "/mcp"
- return mcp.http_app(path=effective_path, transport="http", stateless_http=True)
- def main() -> None:
- host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0"
- port = int(os.getenv("MCP_PORT", "8501"))
- path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp"
- app = build_mcp_http_app(path)
- print(f"Using FastMCP app '{mcp.name}' on http://{host}:{port}{path}")
- uvicorn.run(
- app,
- host=host,
- port=port,
- lifespan="on",
- timeout_graceful_shutdown=2,
- ws="websockets-sansio",
- )
|