# Data Collector MCP Design ## 目标 实现一个精简、可运行、可扩展的 MCP 服务 `data-collector-mcp`,用于通过 MCP tools 调用采集测试网关和汇采 HTTP 接口。 首期支持 Modbus,后续需要扩展支持 S7、Rockwell、OPC UA 等协议。 首期不包含前端、异步任务和调用日志。 ## 服务形态 - MCP 服务名:`data-collector-mcp` - 默认监听地址:`0.0.0.0` - 默认监听端口:`8080` - 默认 MCP 路径:`/mcp` - 默认 MCP HTTP endpoint:`http://127.0.0.1:8080/mcp` `8080` 是 HTTP 服务监听端口,MCP endpoint 挂载在该 HTTP 服务的 `/mcp` 路径上。 ## 技术栈 - Python 3.11 - FastMCP - Uvicorn - Requests - SQLAlchemy - PostgreSQL - psycopg2-binary ## 项目结构 ```text data-collector-mcp/ pyproject.toml Dockerfile README.md docs/ data-collector-mcp-design.md data_collector_mcp/ __init__.py __main__.py server.py auth.py db.py http_client.py gateway_api.py collector_api.py protocols/ __init__.py base.py modbus.py tests/ test_auth.py test_gateway_api.py test_collector_api.py ``` ## 数据库设计 不使用 SQLite。服务通过 `DATABASE_URL` 连接 PostgreSQL。 数据库名建议为: ```text data_collector_mcp ``` 如果手动创建 PostgreSQL 数据库,由于名称包含 `-`,需要使用双引号: ```sql CREATE DATABASE "data_collector_mcp"; ``` 表名:`sys_config` 表结构与 `D:\Projects\project-data-mcp` 中的 `sys_config` 保持一致: ```text id integer primary key key varchar(128) unique not null value json/jsonb not null ``` 如果需要手动建表,推荐使用 PostgreSQL `IDENTITY` 写法: ```sql CREATE TABLE IF NOT EXISTS public.sys_config ( id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, key character varying(128) NOT NULL UNIQUE, value json NOT NULL ); ``` 不要直接使用下面这种 SQL,除非已经提前创建了 `sys_config_id_seq`: ```sql id integer NOT NULL DEFAULT nextval('sys_config_id_seq'::regclass) ``` 否则 PostgreSQL 会报错: ```text relation "sys_config_id_seq" does not exist ``` 配置 key: ```text mcp_data_collector_projects mcp_data_collector_token_ ``` ## 项目配置 `sys_config.key = mcp_data_collector_projects` 的 value 为项目数组。 示例: ```json [ { "project_key": "dev-01", "project_name": "DEV开发环境", "base_url": "http://127.0.0.1:8000", "data_collector_base_url": "http://127.0.0.1:32080", "username": "admin", "password": "123456", "enabled": true } ] ``` 字段说明: | 字段 | 必填 | 含义 | |---|---:|---| | `project_key` | 是 | 项目唯一标识,MCP tools 通过该字段选择项目。 | | `project_name` | 否 | 项目展示名称,缺省时可使用 `project_key`。 | | `base_url` | 是 | 基础地址,用于登录接口和采集测试网关接口。 | | `data_collector_base_url` | 是 | 汇采管理接口基础地址,用于创建设备、创建点位、查询设备列表。缺失时直接报错。 | | `username` | 是 | 登录用户名。 | | `password` | 是 | 登录密码。 | | `enabled` | 否 | 是否启用,默认 `true`。 | ## 地址约定 登录接口使用 `base_url`: ```text POST {base_url}/api/ai/auth/password_login ``` Modbus 点位采集测试使用 `base_url`: ```text POST {base_url}/api/dc-gateway/modbus/read_points ``` 汇采创建设备使用 `data_collector_base_url`: ```text POST {data_collector_base_url}/api/collector/device ``` 汇采创建 Modbus 点位使用 `data_collector_base_url`: ```text POST {data_collector_base_url}/api/collector/modbus/point/add_collect_point ``` 汇采设备列表使用 `data_collector_base_url`: ```text GET {data_collector_base_url}/api/collector/device?num_points=false ``` `data_collector_base_url` 缺失时必须直接报错,不回退到 `base_url`,避免将设备或点位创建请求误发到采集测试网关。 ## 鉴权设计 登录请求: ```http POST {base_url}/api/ai/auth/password_login Content-Type: application/json ``` 请求体: ```json { "username": "admin", "password": "123456" } ``` 成功响应示例: ```json { "errcode": 0, "msg": "成功", "token": "TOKEN", "token_expire_time": 1781583584 } ``` token 缓存到 `sys_config`: ```text mcp_data_collector_token_ ``` 缓存值示例: ```json { "auth_token": "TOKEN", "expire_at": 1781583584, "updated_at": "2026-06-16T00:00:00Z" } ``` 调用汇采管理接口时,HTTP header 带: ```http Authorization: TOKEN ``` 采集测试网关接口是否需要 token 以后可扩展;首期按接口文档直接请求。 ## MCP Tools ### `project.list` 查询启用项目列表。调用其他工具前应先调用该工具选择 `project_key`。 返回示例: ```json { "projects": [ { "project_key": "dev-01", "project_name": "DEV开发环境" } ], "total": 1 } ``` ### `modbus.point_collect_test` 调用采集测试网关读取 Modbus 点位并转换为业务值。 上游接口: ```text POST {base_url}/api/dc-gateway/modbus/read_points ``` 参数建议: | 参数 | 类型 | 默认值 | 说明 | |---|---|---|---| | `project_key` | string | 无 | 项目标识。 | | `ip` | string | 无 | Modbus TCP 设备 IP,不是网关地址。 | | `port` | integer | 无 | Modbus TCP 设备端口。 | | `slave_id` | integer | 无 | Modbus 从站 ID。 | | `device_type` | string | `ModbusTCP` | 当前只支持 `ModbusTCP`。 | | `word_byte_order` | string | `ABCD` | 网关接口使用的字节序和字序组合。 | | `address_base` | integer | `0` | 地址偏移。协议地址为 `address + address_base`。 | | `points` | object[] | 无 | 点位定义列表。 | `word_byte_order` 取值: | 值 | 含义 | |---|---| | `ABCD` | 寄存器内字节不反转,寄存器顺序不反转。 | | `BADC` | 每个寄存器内两个字节反转,寄存器顺序不反转。 | | `CDAB` | 寄存器顺序反转,寄存器内字节不反转。 | | `DCBA` | 每个寄存器内字节反转,同时寄存器顺序反转。 | 当 AI 使用采集测试网关读取点位成功后,如果需要继续创建汇采 Modbus 设备,必须将采集网关的 `word_byte_order` 映射为汇采创建设备接口的 `byte_order` 和 `word_order`。 映射关系: | 采集网关 `word_byte_order` | 汇采 `byte_order` | 汇采 `word_order` | 含义 | |---|---:|---:|---| | `ABCD` | `1` | `1` | 大端字节序,大端字序。 | | `BADC` | `2` | `1` | 小端字节序,大端字序。 | | `CDAB` | `1` | `2` | 大端字节序,小端字序。 | | `DCBA` | `2` | `2` | 小端字节序,小端字序。 | 该映射主要影响 `int32`、`uint32`、`float32`、`int64`、`uint64`、`float64` 等多寄存器数据类型。对于 `int16`、`uint16` 等单寄存器数据,`word_order` 通常不影响结果;对于线圈和离散输入布尔点位,字节序和字序通常不影响结果。 `points[].function_code` 含义: | 值 | 含义 | |---:|---| | `1` | Read Coils,线圈。 | | `2` | Read Discrete Inputs,离散输入。 | | `3` | Read Holding Registers,保持寄存器。 | | `4` | Read Input Registers,输入寄存器。 | `points[].type` 支持: ```text bool, int16, uint16, int32, uint32, int64, uint64, float32, float64 ``` 请求示例: ```json { "project_key": "dev-01", "ip": "192.168.75.240", "port": 505, "slave_id": 1, "word_byte_order": "ABCD", "address_base": 0, "points": [ { "function_code": 3, "address": 7, "type": "int16" }, { "function_code": 3, "address": 10, "type": "bool", "bit": 2 } ] } ``` 响应透传上游返回。业务成功按响应体 `code == 0` 判断。 ### `collector.modbus_device_create` 调用汇采接口创建 Modbus 设备。 上游接口: ```text POST {data_collector_base_url}/api/collector/device ``` 该接口使用汇采设备创建参数,特别注意: - `byte_order` 和 `word_order` 是汇采创建设备接口的枚举值。 - 它们不是采集测试网关 `modbus.point_collect_test` 中的 `word_byte_order`。 - 两套参数不能混用。如果先通过采集网关用 `word_byte_order` 测试成功,再创建汇采设备,需要按映射表转换为 `byte_order` 和 `word_order`。 `byte_order`: | 值 | 含义 | |---:|---| | `1` | Big Endian,大端字节序。 | | `2` | Small Endian,小端字节序。 | `word_order`: | 值 | 含义 | |---:|---| | `1` | Big Endian,大端字序。 | | `2` | Small Endian,小端字序。 | 采集网关 `word_byte_order` 到汇采设备枚举的映射: | 采集网关 `word_byte_order` | 汇采 `byte_order` | 汇采 `word_order` | |---|---:|---:| | `ABCD` | `1` | `1` | | `BADC` | `2` | `1` | | `CDAB` | `1` | `2` | | `DCBA` | `2` | `2` | `device_type`: | 值 | 含义 | |---:|---| | `1` | TCP。 | | `2` | RTU。 | | `3` | UDP。 | | `4` | RTU OVER TCP。 | | `5` | RTU OVER UDP。 | MCP 工具可以接收一个 `payload`,服务端校验必填项、补齐默认值后转发。 必填项: - `device_type`:协议类型,`1=TCP`、`2=RTU`、`3=UDP`、`4=RTU OVER TCP`、`5=RTU OVER UDP`。 - `ip`:IP 地址。 - `port`:端口号。 - `name`:设备名称。 - `slave_id`:Modbus 从站 ID。 - `word_order`:字顺序。 - `byte_order`:字节顺序。 - `address_base`:地址基准;转发给汇采接口时转换为 `address_offset`。 默认值: ```json { "type": "modbus", "timeout": 3, "is_persistent": true, "group_id": 0, "alarm_interval": 90, "collect_interval": 5, "retry_times": 0 } ``` 请求示例: ```json { "project_key": "dev-01", "payload": { "name": "modbus_tcp_1", "device_type": 1, "ip": "127.0.0.1", "port": 5502, "slave_id": 1, "byte_order": 1, "word_order": 1, "address_base": 0 } } ``` 实际转发给汇采的请求体: ```json { "name": "modbus_tcp_1", "type": "modbus", "device_type": 1, "ip": "127.0.0.1", "port": 5502, "slave_id": 1, "timeout": 3, "byte_order": 1, "word_order": 1, "is_persistent": true, "group_id": 0, "alarm_interval": 90, "collect_interval": 5, "address_offset": 0, "retry_times": 0 } ``` 响应透传上游返回。业务成功按响应体 `state == 0` 判断。 ### `collector.modbus_point_create` 调用汇采接口创建 Modbus 采集点位。 上游接口: ```text POST {data_collector_base_url}/api/collector/modbus/point/add_collect_point ``` MCP 工具接收 `payload`,服务端做少量默认值补齐后转发。 创建点位时必须传入以下字段: | 字段 | 说明 | |---|---| | `payload.name` | 点位名称。 | | `payload.address` | 寄存器地址。 | | `payload.type` | 数据类型。 | | `payload.func_code` 或 `payload.register_type` | 寄存器类型。 | 如果传入 `payload.register_type`,服务会自动转换为汇采接口要求的 `payload.func_code`。 `payload.register_type` 映射: | `register_type` | `func_code` | 含义 | |---|---:|---| | `coil` | `1` | Read Coils,线圈。 | | `discrete_input` | `2` | Read Discrete Inputs,离散输入。 | | `holding_register` | `3` | Read Holding Registers,保持寄存器。 | | `input_register` | `4` | Read Input Registers,输入寄存器。 | `payload.func_code` 含义: | 值 | 含义 | |---:|---| | `1` | Read Coils,线圈。 | | `2` | Read Discrete Inputs,离散输入。 | | `3` | Read Holding Registers,保持寄存器。 | | `4` | Read Input Registers,输入寄存器。 | `payload.type` 支持: ```text bool, int16, uint16, int32, uint32, int64, uint64, float32, float64 ``` 为避免 AI 按点表原始类型传错,服务会自动转换常见点表类型别名: | 点表类型 | 汇采 `type` | 说明 | |---|---|---| | `BOOL` / `BOOLEAN` | `bool` | 布尔值。 | | `SHORT` | `int16` | 16 位有符号整数。 | | `WORD` | `uint16` | 16 位无符号整数。 | | `LONG` | `int32` | 32 位有符号整数。 | | `DWORD` | `uint32` | 32 位无符号整数。 | | `FLOAT` / `REAL` | `float32` | 32 位浮点数。 | | `DOUBLE` | `float64` | 64 位浮点数。 | | `LONGLONG` | `int64` | 64 位有符号整数。 | | `QWORD` | `uint64` | 64 位无符号整数。 | 默认值: ```json { "scale_ratio": 1, "value_offset": 0, "group_id": 0, "invalid_values": "", "valid_range_start": null, "valid_range_end": null, "bit": 0 } ``` 请求示例: ```json { "project_key": "dev-01", "payload": { "device_id": 1, "name": "holding_register_uint16", "point_id": "HR_UINT16", "describe": "保持寄存器 uint16 示例", "register_type": "holding_register", "address": 10, "type": "WORD" } } ``` 实际转发给汇采的请求体: ```json { "device_id": 1, "name": "holding_register_uint16", "point_id": "HR_UINT16", "describe": "保持寄存器 uint16 示例", "func_code": 3, "address": 10, "type": "uint16", "scale_ratio": 1, "value_offset": 0, "group_id": 0, "invalid_values": "", "valid_range_start": null, "valid_range_end": null, "bit": 0 } ``` 响应透传上游返回。业务成功按响应体 `state == 0` 判断。 ### `collector.device_list` 查询汇采设备列表。 上游接口: ```text GET {data_collector_base_url}/api/collector/device?num_points=false ``` 参数: | 参数 | 类型 | 默认值 | 说明 | |---|---|---|---| | `project_key` | string | 无 | 项目标识。 | | `num_points` | boolean | `false` | 是否统计设备或点位分组下的点位数量。只有需要统计时才传 `true`。 | 响应透传上游返回。业务成功按响应体 `state == 0` 判断。 ## 上游响应处理 首期创建类工具和查询类工具均透传上游 JSON 响应,不做复杂业务包装。 HTTP 层错误或非 JSON 响应应抛出明确异常: - HTTP 请求失败或超时:提示上游服务不可达或请求超时。 - HTTP 返回非 JSON:提示状态码和响应体预览。 - HTTP 状态码大于等于 400:提示 HTTP 状态码和响应 JSON。 业务成功判断规则保留在工具描述和 README 中: - 采集测试网关:`code == 0` 表示成功。 - 汇采接口:`state == 0` 表示成功。 ## 扩展性设计 为后续支持 S7、Rockwell、OPC UA,首期内部采用协议模块结构,但 MCP tools 仍保持协议专用命名。 不建议首期只暴露一个泛化工具,例如: ```text collector.device_create(protocol, payload) ``` 原因是不同协议的地址、数据类型、连接参数差异较大,泛化工具容易让 AI 混淆参数。 推荐方式: - MCP 层暴露协议专用 tools,工具描述明确协议规则。 - 内部实现复用 HTTP 请求、鉴权、项目配置、默认值合并等通用逻辑。 - 后续新增协议时新增协议模块和专用 MCP tools。 协议模块建议定义: ```python class ProtocolSpec: protocol: str create_device_path: str create_point_path: str point_test_path: str | None device_defaults: dict point_defaults: dict ``` Modbus 协议模块首期包含: ```text protocol = "modbus" create_device_path = "/api/collector/device" create_point_path = "/api/collector/modbus/point/add_collect_point" point_test_path = "/api/dc-gateway/modbus/read_points" ``` 后续扩展示例: ```text protocols/s7.py protocols/rockwell.py protocols/opcua.py ``` 对应 tools 示例: ```text s7.point_collect_test collector.s7_device_create collector.s7_point_create rockwell.point_collect_test collector.rockwell_device_create collector.rockwell_point_create opcua.point_collect_test collector.opcua_device_create collector.opcua_point_create ``` ## 环境变量 | 环境变量 | 默认值 | 说明 | |---|---|---| | `DATABASE_URL` | 无 | PostgreSQL 连接字符串,必填。 | | `MCP_HOST` | `0.0.0.0` | HTTP 服务监听地址。 | | `MCP_PORT` | `8080` | HTTP 服务监听端口。 | | `MCP_PATH` | `/mcp` | MCP endpoint 路径。 | | `UPSTREAM_REQUEST_TIMEOUT` | `60` | 上游 HTTP 请求超时时间,单位秒。 | `DATABASE_URL` 示例: ```text postgresql+psycopg2://user:password@host:5432/data_collector_mcp ``` ## Dockerfile 要求 Docker 镜像需要: - 基于 Python 3.11。 - 安装项目依赖。 - 暴露端口 `8080`。 - 默认启动 `python -m data_collector_mcp`。 运行时通过环境变量传入: ```text DATABASE_URL MCP_HOST MCP_PORT MCP_PATH UPSTREAM_REQUEST_TIMEOUT ``` ## 首期测试范围 单元测试使用 mock,不依赖真实上游服务。 需要覆盖: - 项目配置读取。 - 缺失 `data_collector_base_url` 时报错。 - 登录 token 缓存命中。 - 登录 token 过期后重新登录。 - `modbus.point_collect_test` 请求 URL、请求体和透传响应。 - `collector.modbus_device_create` 默认值合并、请求 URL、Authorization header 和透传响应。 - `collector.modbus_point_create` 默认值合并、请求 URL、Authorization header 和透传响应。 - `collector.device_list` 默认 `num_points=false`。 ## 非目标 首期不实现: - 前端页面。 - 异步任务。 - 调用日志。 - 点位历史数据。 - 拓扑查询。 - 自动发现设备。 - 自动解析 Excel 点表。 - 自动将采集测试结果转换为汇采点位创建请求。