| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- from __future__ import annotations
- import logging
- import os
- from typing import Any
- import uvicorn
- from fastmcp import FastMCP
- from .auth import load_projects_config
- from .collector_api import (
- connect_device as api_connect_device,
- create_modbus_device as api_create_modbus_device,
- create_modbus_point as api_create_modbus_point,
- disconnect_device as api_disconnect_device,
- edit_modbus_device as api_edit_modbus_device,
- edit_modbus_point as api_edit_modbus_point,
- list_device_points as api_list_device_points,
- list_devices as api_list_devices,
- )
- from .db import check_database_connection
- from .gateway_api import modbus_point_collect_test as api_modbus_point_collect_test
- SERVER_INSTRUCTIONS = (
- "Data collector tools. Use project.list first to choose a project_key. "
- "base_url is used for login and gateway test APIs. data_collector_base_url "
- "is required for collector management APIs and never falls back to base_url. "
- "For Modbus, gateway word_byte_order values ABCD/BADC/CDAB/DCBA must be "
- "mapped to collector byte_order/word_order before creating a device."
- )
- mcp = FastMCP("data-collector-mcp", instructions=SERVER_INSTRUCTIONS)
- logger = logging.getLogger(__name__)
- @mcp.tool(
- name="project.list",
- title="Project List",
- description="List enabled 汇采 projects available to this MCP service. Call this first to choose project_key.",
- tags={"project", "list"},
- )
- def project_list() -> dict[str, Any]:
- projects = load_projects_config()
- result = [
- {
- "project_key": item["project_key"],
- "project_name": item["project_name"],
- }
- for item in projects
- if item["enabled"]
- ]
- result.sort(key=lambda item: item["project_key"])
- return {"projects": result, "total": len(result)}
- @mcp.tool(
- name="modbus.point_collect_test",
- description=(
- "通过采集网关读取 Modbus TCP 点位并转换为业务值。调用 "
- "{base_url}/api/dc-gateway/modbus/read_points。function_code: "
- "1=Read Coils/线圈,2=Read Discrete Inputs/离散输入,"
- "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。"
- "word_byte_order 可选 ABCD、BADC、CDAB、DCBA。若读取成功后要创建汇采设备,"
- "映射为 byte_order/word_order: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。"
- "响应透传上游 JSON,code=0 表示业务成功。"
- ),
- )
- def modbus_point_collect_test(
- project_key: str,
- ip: str,
- port: int,
- slave_id: int,
- points: list[dict[str, Any]],
- device_type: str = "ModbusTCP",
- word_byte_order: str = "ABCD",
- address_base: int = 0,
- ) -> dict[str, Any]:
- return api_modbus_point_collect_test(
- project_key,
- ip=ip,
- port=port,
- slave_id=slave_id,
- points=points,
- device_type=device_type,
- word_byte_order=word_byte_order,
- address_base=address_base,
- )
- @mcp.tool(
- name="collector.modbus_device_create",
- description=(
- "汇采-创建 Modbus 设备。调用 {data_collector_base_url}/api/collector/device,"
- "需要登录后使用 Authorization。创建设备必须传 payload.device_type 协议类型、"
- "payload.ip IP 地址、payload.port 端口号、payload.name 名称、payload.slave_id、"
- "payload.word_order 字顺序、payload.byte_order 字节顺序、payload.address_base。"
- "payload.address_base 会转换为汇采接口的 address_offset。"
- "payload 会补齐默认值: type=modbus, timeout=3, is_persistent=true, group_id=0, "
- "alarm_interval=90, collect_interval=5, retry_times=0。"
- "注意 byte_order/word_order 是汇采枚举,不是网关 word_byte_order。"
- "byte_order: 1=Big Endian, 2=Small Endian。word_order: 1=Big Endian, 2=Small Endian。"
- "device_type: 1=TCP, 2=RTU, 3=UDP, 4=RTU OVER TCP, 5=RTU OVER UDP。"
- "采集网关 word_byte_order 映射: ABCD=>1/1, BADC=>2/1, CDAB=>1/2, DCBA=>2/2。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_device_create(
- project_key: str,
- payload: dict[str, Any],
- ) -> dict[str, Any]:
- return api_create_modbus_device(project_key, payload)
- @mcp.tool(
- name="collector.modbus_device_edit",
- description=(
- "汇采-编辑 Modbus 设备。调用 "
- "{data_collector_base_url}/api/collector/modbus/device/edit。"
- "这是 Modbus 专用旧编辑接口,必须传 ori_id 原设备 id、name、slave_id、"
- "word_order、byte_order、device_type 连接类型。TCP/UDP 类设备还必须传 ip 和 port;"
- "RTU 设备必须传 serial_port。"
- "编辑前设备不能处于已连接状态;若已连接,请先调用 collector.device_disconnect。"
- "该接口是全量更新语义,未传字段可能被默认值覆盖。"
- "默认参数: ip='', port=0, serial_port='', timeout=3, is_persistent=true, "
- "baud_rate=0, data_bit=0, parity=0, stop_bit=0, mode=0, address_offset=0, "
- "retry_times=0, device_group_id=0, alarm_interval=90, collect_interval=5。"
- "连接类型: 1=TCP, 2=RTU, 3=UDP, 4=RTU OVER TCP, 5=RTU OVER UDP。"
- "byte_order: 1=Big Endian, 2=Small Endian;word_order: 1=Big Endian, 2=Small Endian。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_device_edit(
- project_key: str,
- ori_id: int,
- name: str,
- device_type: int,
- slave_id: int,
- byte_order: int,
- word_order: int,
- ip: str = "",
- port: int = 0,
- serial_port: str = "",
- timeout: int = 3,
- is_persistent: bool = True,
- baud_rate: int = 0,
- data_bit: int = 0,
- parity: int = 0,
- stop_bit: int = 0,
- mode: int = 0,
- address_offset: int = 0,
- retry_times: int = 0,
- device_group_id: int = 0,
- alarm_interval: int = 90,
- collect_interval: int = 5,
- ) -> dict[str, Any]:
- return api_edit_modbus_device(
- project_key,
- {
- "ori_id": ori_id,
- "name": name,
- "device_type": device_type,
- "ip": ip,
- "port": port,
- "slave_id": slave_id,
- "byte_order": byte_order,
- "word_order": word_order,
- "serial_port": serial_port,
- "timeout": timeout,
- "is_persistent": is_persistent,
- "baud_rate": baud_rate,
- "data_bit": data_bit,
- "parity": parity,
- "stop_bit": stop_bit,
- "mode": mode,
- "address_offset": address_offset,
- "retry_times": retry_times,
- "device_group_id": device_group_id,
- "alarm_interval": alarm_interval,
- "collect_interval": collect_interval,
- },
- )
- @mcp.tool(
- name="collector.modbus_point_create",
- description=(
- "汇采-创建 Modbus 采集点位。调用 "
- "{data_collector_base_url}/api/collector/modbus/point/add_collect_point。"
- "创建点位必须传 payload.name 名称、payload.address 寄存器地址、"
- "payload.type 数据类型,以及 payload.func_code 或 payload.register_type 寄存器类型。"
- "payload 会补齐默认值: point_id='', scale_ratio=1, value_offset=0, group_id=0, "
- "invalid_values='', valid_range_start=null, valid_range_end=null, bit=0。"
- "func_code: 1=Read Coils/线圈,2=Read Discrete Inputs/离散输入,"
- "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。"
- "register_type 可用 coil、discrete_input、holding_register、input_register。"
- "数据类型应使用汇采类型: bool, int16, uint16, int32, uint32, int64, uint64, float32, float64。"
- "常见点表类型映射: BOOL=>bool, SHORT=>int16, WORD=>uint16, LONG=>int32, "
- "DWORD=>uint32, FLOAT/REAL=>float32, DOUBLE=>float64, LONGLONG=>int64, QWORD=>uint64。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_point_create(
- project_key: str,
- payload: dict[str, Any],
- ) -> dict[str, Any]:
- return api_create_modbus_point(project_key, payload)
- @mcp.tool(
- name="collector.modbus_point_edit",
- description=(
- "汇采-编辑 Modbus 采集点位。调用 "
- "{data_collector_base_url}/api/collector/modbus/point/edit_collect_point。"
- "必须传 ori_id 原点位 id、name 名称、address 寄存器地址、data_type 数据类型,"
- "以及 func_code 或 register_type 寄存器类型。"
- "ori_id 对应 collector.device_points 返回的 data.point[].id。"
- "编辑点位不会迁移所属设备。"
- "该接口是全量更新语义,未传字段可能被默认值覆盖。"
- "默认参数: point_id='', scale_ratio=1, value_offset=0, group_id=0, "
- "invalid_values='', valid_range_start=null, valid_range_end=null, bit=0。"
- "func_code: 1=Read Coils/线圈,2=Read Discrete Inputs/离散输入,"
- "3=Read Holding Registers/保持寄存器,4=Read Input Registers/输入寄存器。"
- "register_type 可用 coil、discrete_input、holding_register、input_register。"
- "数据类型应使用汇采类型: bool, int16, uint16, int32, uint32, int64, uint64, float32, float64。"
- "常见点表类型映射: BOOL=>bool, SHORT=>int16, WORD=>uint16, LONG=>int32, "
- "DWORD=>uint32, FLOAT/REAL=>float32, DOUBLE=>float64, LONGLONG=>int64, QWORD=>uint64。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_modbus_point_edit(
- project_key: str,
- ori_id: int,
- name: str,
- address: int,
- data_type: str,
- func_code: int = 0,
- register_type: str = "",
- point_id: str = "",
- scale_ratio: float = 1,
- value_offset: float = 0,
- group_id: int = 0,
- invalid_values: str = "",
- valid_range_start: float | None = None,
- valid_range_end: float | None = None,
- bit: int = 0,
- describe: str = "",
- ) -> dict[str, Any]:
- payload: dict[str, Any] = {
- "ori_id": ori_id,
- "name": name,
- "address": address,
- "type": data_type,
- "point_id": point_id,
- "scale_ratio": scale_ratio,
- "value_offset": value_offset,
- "group_id": group_id,
- "invalid_values": invalid_values,
- "valid_range_start": valid_range_start,
- "valid_range_end": valid_range_end,
- "bit": bit,
- "describe": describe,
- }
- if func_code:
- payload["func_code"] = func_code
- else:
- payload["register_type"] = register_type
- return api_edit_modbus_point(project_key, payload)
- @mcp.tool(
- name="collector.device_list",
- description=(
- "汇采-查询设备列表。调用 {data_collector_base_url}/api/collector/device。"
- "用于查看设备树、设备分组、设备连接状态、设备采集状态和点位数量;"
- "不返回点位明细、点位采集状态或点位当前值。"
- "如果目标是查看某个设备下点位的采集状态和值,请使用 collector.device_points。"
- "num_points 默认 false;只有需要统计设备或点位分组下的点位数量时才传 true。"
- "响应透传上游 JSON,state=0 表示业务成功。"
- ),
- )
- def collector_device_list(project_key: str, num_points: bool = False) -> dict[str, Any]:
- return api_list_devices(project_key, num_points=num_points)
- @mcp.tool(
- name="collector.device_connect",
- description=(
- "汇采-连接设备。调用 "
- "{data_collector_base_url}/api/collector/common/device/set_connect_status,"
- "请求 status=2。用于让指定设备进入已连接状态;连接成功不代表正在采集,"
- "采集状态请看响应 data.running_status 或后续查询设备/点位状态。"
- "device_id 是设备列表中的设备 id;device_type 默认 modbus,其他设备类型可传 "
- "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。"
- "响应透传上游 JSON,state=0 表示业务成功。常见状态:"
- "data.status 1=未连接、2=已连接、3=连接异常;"
- "data.running_status 0=未采集、1=采集中、2=采集异常。"
- ),
- )
- def collector_device_connect(
- project_key: str,
- device_id: int,
- device_type: str = "modbus",
- ) -> dict[str, Any]:
- return api_connect_device(project_key, device_id=device_id, device_type=device_type)
- @mcp.tool(
- name="collector.device_disconnect",
- description=(
- "汇采-断开设备。调用 "
- "{data_collector_base_url}/api/collector/common/device/set_connect_status,"
- "请求 status=1。用于停止指定设备连接/采集相关状态,使设备回到未连接或空闲状态。"
- "device_id 是设备列表中的设备 id;device_type 默认 modbus,其他设备类型可传 "
- "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。"
- "响应透传上游 JSON,state=0 表示业务成功。常见状态:"
- "data.status 1=未连接、2=已连接、3=连接异常;"
- "data.running_status 0=未采集、1=采集中、2=采集异常。"
- ),
- )
- def collector_device_disconnect(
- project_key: str,
- device_id: int,
- device_type: str = "modbus",
- ) -> dict[str, Any]:
- return api_disconnect_device(project_key, device_id=device_id, device_type=device_type)
- @mcp.tool(
- name="collector.device_points",
- description=(
- "汇采-查询设备点位列表。调用 "
- "{data_collector_base_url}/api/collector/common/device/get_collect_point。"
- "主要用于查看某个设备下点位的采集状态和值:Modbus 返回 data.point[].status "
- "和 data.point[].present_value,status 0=未采集、1=采集正常、2=采集异常;"
- "present_value 是当前内存中的点位最新值。group_id 默认 0 表示查询全部点位,"
- "传具体点位分组 id 时只返回该分组下点位。device_type 默认 modbus,其他设备类型可传 "
- "s7、bacnet、ethernet-ip、opc-ua、opc-da、snmp、iec104。"
- "响应透传上游 JSON,state=0 表示业务成功。Modbus 点位常见字段包括 id、point_id、"
- "name、address、type、device_id、status、function_code、present_value、scale_ratio、"
- "value_offset、group_id、bit、update_time。"
- ),
- )
- def collector_device_points(
- project_key: str,
- device_id: int,
- device_type: str = "modbus",
- group_id: int = 0,
- ) -> dict[str, Any]:
- return api_list_device_points(
- project_key,
- device_id=device_id,
- device_type=device_type,
- group_id=group_id,
- )
- def build_mcp_http_app(path: str | None = None):
- effective_path = str(path or os.getenv("MCP_PATH", "/mcp")).strip() or "/mcp"
- return mcp.http_app(path=effective_path, transport="http", stateless_http=True)
- def main() -> None:
- logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
- try:
- check_database_connection()
- except Exception:
- logger.exception("Database startup check failed")
- raise SystemExit(1)
- host = os.getenv("MCP_HOST", "0.0.0.0").strip() or "0.0.0.0"
- port = int(os.getenv("MCP_PORT", "8501"))
- path = os.getenv("MCP_PATH", "/mcp").strip() or "/mcp"
- app = build_mcp_http_app(path)
- print(f"Using FastMCP app '{mcp.name}' on http://{host}:{port}{path}")
- uvicorn.run(
- app,
- host=host,
- port=port,
- lifespan="on",
- timeout_graceful_shutdown=2,
- ws="websockets-sansio",
- )
|