from __future__ import annotations import logging import os from typing import Any import uvicorn from fastmcp import FastMCP from .auth import load_projects_config from .collector_api import ( connect_device as api_connect_device, create_modbus_device as api_create_modbus_device, create_modbus_point as api_create_modbus_point, disconnect_device as api_disconnect_device, edit_modbus_device as api_edit_modbus_device, edit_modbus_point as api_edit_modbus_point, list_device_points as api_list_device_points, list_devices as api_list_devices, ) from .db import check_database_connection from .gateway_api import modbus_point_collect_test as api_modbus_point_collect_test SERVER_INSTRUCTIONS = ( "Data collector tools. Use project.list first to choose a project_key. " "base_url is used for login and gateway test APIs. data_collector_base_url " "is required for collector management APIs and never falls back to base_url. " "For Modbus, gateway word_byte_order values ABCD/BADC/CDAB/DCBA must be " "mapped to collector byte_order/word_order before creating a device." ) mcp = FastMCP("data-collector-mcp", instructions=SERVER_INSTRUCTIONS) logger = logging.getLogger(__name__) @mcp.tool( name="project.list", title="Project List", description="List enabled 汇采 projects available to this MCP service. Call this first to choose project_key.", tags={"project", "list"}, ) def project_list() -> dict[str, Any]: projects = load_projects_config() result = [ { "project_key": item["project_key"], "project_name": item["project_name"], } for item in projects if item["enabled"] ] result.sort(key=lambda item: item["project_key"]) return {"projects": result, "total": len(result)} @mcp.tool( name="modbus.point_collect_test", description=( "通过采集网关读取 Modbus TCP 点位并转换为业务值。调用 " "{base_url}/api/dc-gateway/modbus/read_points。function_code: " "1=Read Coils/线圈,2=Read Discrete Inputs/离散输入," "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。" "word_byte_order 可选 ABCD、BADC、CDAB、DCBA。若读取成功后要创建汇采设备," "映射为 byte_order/word_order: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。" "响应透传上游 JSON,code=0 表示业务成功。" ), ) def modbus_point_collect_test( project_key: str, ip: str, port: int, slave_id: int, points: list[dict[str, Any]], device_type: str = "ModbusTCP", word_byte_order: str = "ABCD", address_base: int = 0, ) -> dict[str, Any]: return api_modbus_point_collect_test( project_key, ip=ip, port=port, slave_id=slave_id, points=points, device_type=device_type, word_byte_order=word_byte_order, address_base=address_base, ) @mcp.tool( name="collector.modbus_device_create", description=( "汇采-创建 Modbus 设备。调用 {data_collector_base_url}/api/collector/device," "需要登录后使用 Authorization。创建设备必须传 payload.device_type 协议类型、" "payload.ip IP 地址、payload.port 端口号、payload.name 名称、payload.slave_id、" "payload.word_order 字顺序、payload.byte_order 字节顺序、payload.address_base。" "payload.address_base 会转换为汇采接口的 address_offset。" "payload 会补齐默认值: type=modbus, timeout=3, is_persistent=true, group_id=0, " "alarm_interval=90, collect_interval=5, retry_times=0。" "注意 byte_order/word_order 是汇采枚举,不是网关 word_byte_order。" "byte_order: 1=Big Endian, 2=Small Endian。word_order: 1=Big Endian, 2=Small Endian。" "device_type: 1=TCP, 2=RTU, 3=UDP, 4=RTU OVER TCP, 5=RTU OVER UDP。" "采集网关 word_byte_order 映射: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。" "响应透传上游 JSON,state=0 表示业务成功。" ), ) def collector_modbus_device_create( project_key: str, payload: dict[str, Any], ) -> dict[str, Any]: return api_create_modbus_device(project_key, payload) @mcp.tool( name="collector.modbus_device_edit", description=( "汇采-编辑 Modbus 设备。调用 " "{data_collector_base_url}/api/collector/modbus/device/edit。" "这是 Modbus 专用旧编辑接口,必须传 ori_id 原设备 id、name、slave_id、" "word_order、byte_order、device_type 连接类型。TCP/UDP 类设备还必须传 ip 和 port;" "RTU 设备必须传 serial_port。" "编辑前设备不能处于已连接状态;若已连接,请先调用 collector.device_disconnect。" "该接口是全量更新语义,未传字段可能被默认值覆盖。" "默认参数: ip='', port=0, serial_port='', timeout=3, is_persistent=true, " "baud_rate=0, data_bit=0, parity=0, stop_bit=0, mode=0, address_offset=0, " "retry_times=0, device_group_id=0, alarm_interval=90, collect_interval=5。" "连接类型: 1=TCP, 2=RTU, 3=UDP, 4=RTU OVER TCP, 5=RTU OVER UDP。" "byte_order: 1=Big Endian, 2=Small Endian;word_order: 1=Big Endian, 2=Small Endian。" "响应透传上游 JSON,state=0 表示业务成功。" ), ) def collector_modbus_device_edit( project_key: str, ori_id: int, name: str, device_type: int, slave_id: int, byte_order: int, word_order: int, ip: str = "", port: int = 0, serial_port: str = "", timeout: int = 3, is_persistent: bool = True, baud_rate: int = 0, data_bit: int = 0, parity: int = 0, stop_bit: int = 0, mode: int = 0, address_offset: int = 0, retry_times: int = 0, device_group_id: int = 0, alarm_interval: int = 90, collect_interval: int = 5, ) -> dict[str, Any]: return api_edit_modbus_device( project_key, { "ori_id": ori_id, "name": name, "device_type": device_type, "ip": ip, "port": port, "slave_id": slave_id, "byte_order": byte_order, "word_order": word_order, "serial_port": serial_port, "timeout": timeout, "is_persistent": is_persistent, "baud_rate": baud_rate, "data_bit": data_bit, "parity": parity, "stop_bit": stop_bit, "mode": mode, "address_offset": address_offset, "retry_times": retry_times, "device_group_id": device_group_id, "alarm_interval": alarm_interval, "collect_interval": collect_interval, }, ) @mcp.tool( name="collector.modbus_point_create", description=( "汇采-创建 Modbus 采集点位。调用 " "{data_collector_base_url}/api/collector/modbus/point/add_collect_point。" "创建点位必须传 payload.name 名称、payload.address 寄存器地址、" "payload.type 数据类型,以及 payload.func_code 或 payload.register_type 寄存器类型。" "payload 会补齐默认值: point_id='', scale_ratio=1, value_offset=0, group_id=0, " "invalid_values='', valid_range_start=null, valid_range_end=null, bit=0。" "func_code: 1=Read Coils/线圈,2=Read Discrete Inputs/离散输入," "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。" "register_type 可用 coil、discrete_input、holding_register、input_register。" "数据类型应使用汇采类型: bool, int16, uint16, int32, uint32, int64, uint64, float32, float64。" "常见点表类型映射: BOOL=>bool, SHORT=>int16, WORD=>uint16, LONG=>int32, " "DWORD=>uint32, FLOAT/REAL=>float32, DOUBLE=>float64, LONGLONG=>int64, QWORD=>uint64。" "响应透传上游 JSON,state=0 表示业务成功。" ), ) def collector_modbus_point_create( project_key: str, payload: dict[str, Any], ) -> dict[str, Any]: return api_create_modbus_point(project_key, payload) @mcp.tool( name="collector.modbus_point_edit", description=( "汇采-编辑 Modbus 采集点位。调用 " "{data_collector_base_url}/api/collector/modbus/point/edit_collect_point。" "必须传 ori_id 原点位 id、name 名称、address 寄存器地址、data_type 数据类型," "以及 func_code 或 register_type 寄存器类型。" "ori_id 对应 collector.device_points 返回的 data.point[].id。" "编辑点位不会迁移所属设备。" "该接口是全量更新语义,未传字段可能被默认值覆盖。" "默认参数: point_id='', scale_ratio=1, value_offset=0, group_id=0, " "invalid_values='', valid_range_start=null, valid_range_end=null, bit=0。" "func_code: 1=Read Coils/线圈,2=Read Discrete Inputs/离散输入," "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。" "register_type 可用 coil、discrete_input、holding_register、input_register。" "数据类型应使用汇采类型: bool, int16, uint16, int32, uint32, int64, uint64, float32, float64。" "常见点表类型映射: BOOL=>bool, SHORT=>int16, WORD=>uint16, LONG=>int32, " "DWORD=>uint32, FLOAT/REAL=>float32, DOUBLE=>float64, LONGLONG=>int64, QWORD=>uint64。" "响应透传上游 JSON,state=0 表示业务成功。" ), ) def collector_modbus_point_edit( project_key: str, ori_id: int, name: str, address: int, data_type: str, func_code: int = 0, register_type: str = "", point_id: str = "", scale_ratio: float = 1, value_offset: float = 0, group_id: int = 0, invalid_values: str = "", valid_range_start: float | None = None, valid_range_end: float | None = None, bit: int = 0, describe: str = "", ) -> dict[str, Any]: payload: dict[str, Any] = { "ori_id": ori_id, "name": name, "address": address, "type": data_type, "point_id": point_id, "scale_ratio": scale_ratio, "value_offset": value_offset, "group_id": group_id, "invalid_values": invalid_values, "valid_range_start": valid_range_start, "valid_range_end": valid_range_end, "bit": bit, "describe": describe, } if func_code: payload["func_code"] = func_code else: payload["register_type"] = register_type return api_edit_modbus_point(project_key, payload) @mcp.tool( name="collector.device_list", description=( "汇采-查询设备列表。调用 {data_collector_base_url}/api/collector/device。" "用于查看设备树、设备分组、设备连接状态、设备采集状态和点位数量;" "不返回点位明细、点位采集状态或点位当前值。" "如果目标是查看某个设备下点位的采集状态和值,请使用 collector.device_points。" "num_points 默认 false;只有需要统计设备或点位分组下的点位数量时才传 true。" "响应透传上游 JSON,state=0 表示业务成功。" ), ) def collector_device_list(project_key: str, num_points: bool = False) -> dict[str, Any]: return api_list_devices(project_key, num_points=num_points) @mcp.tool( name="collector.device_connect", description=( "汇采-连接设备。调用 " "{data_collector_base_url}/api/collector/common/device/set_connect_status," "请求 status=2。用于让指定设备进入已连接状态;连接成功不代表正在采集," "采集状态请看响应 data.running_status 或后续查询设备/点位状态。" "device_id 是设备列表中的设备 id;device_type 默认 modbus,其他设备类型可传 " "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。" "响应透传上游 JSON,state=0 表示业务成功。常见状态:" "data.status 1=未连接、2=已连接、3=连接异常;" "data.running_status 0=未采集、1=采集中、2=采集异常。" ), ) def collector_device_connect( project_key: str, device_id: int, device_type: str = "modbus", ) -> dict[str, Any]: return api_connect_device(project_key, device_id=device_id, device_type=device_type) @mcp.tool( name="collector.device_disconnect", description=( "汇采-断开设备。调用 " "{data_collector_base_url}/api/collector/common/device/set_connect_status," "请求 status=1。用于停止指定设备连接/采集相关状态,使设备回到未连接或空闲状态。" "device_id 是设备列表中的设备 id;device_type 默认 modbus,其他设备类型可传 " "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。" "响应透传上游 JSON,state=0 表示业务成功。常见状态:" "data.status 1=未连接、2=已连接、3=连接异常;" "data.running_status 0=未采集、1=采集中、2=采集异常。" ), ) def collector_device_disconnect( project_key: str, device_id: int, device_type: str = "modbus", ) -> dict[str, Any]: return api_disconnect_device(project_key, device_id=device_id, device_type=device_type) @mcp.tool( name="collector.device_points", description=( "汇采-查询设备点位列表。调用 " "{data_collector_base_url}/api/collector/common/device/get_collect_point。" "主要用于查看某个设备下点位的采集状态和值:Modbus 返回 data.point[].status " "和 data.point[].present_value,status 0=未采集、1=采集正常、2=采集异常;" "present_value 是当前内存中的点位最新值。group_id 默认 0 表示查询全部点位," "传具体点位分组 id 时只返回该分组下点位。device_type 默认 modbus,其他设备类型可传 " "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。" "响应透传上游 JSON,state=0 表示业务成功。Modbus 点位常见字段包括 id、point_id、" "name、address、type、device_id、status、function_code、present_value、scale_ratio、" "value_offset、group_id、bit、update_time。" ), ) def collector_device_points( project_key: str, device_id: int, device_type: str = "modbus", group_id: int = 0, ) -> dict[str, Any]: return api_list_device_points( project_key, device_id=device_id, device_type=device_type, group_id=group_id, ) def build_mcp_http_app(path: str | None = None): effective_path = str(path or os.getenv("MCP_PATH", "/mcp")).strip() or "/mcp" return mcp.http_app(path=effective_path, transport="http", stateless_http=True) def main() -> None: logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) try: check_database_connection() except Exception: logger.exception("Database startup check failed") raise SystemExit(1) host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0" port = int(os.getenv("MCP_PORT", "8501")) path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp" app = build_mcp_http_app(path) print(f"Using FastMCP app '{mcp.name}' on http://{host}:{port}{path}") uvicorn.run( app, host=host, port=port, lifespan="on", timeout_graceful_shutdown=2, ws="websockets-sansio", )