data-collector-mcp-design.md 17 KB

Data Collector MCP Design

目标

实现一个精简、可运行、可扩展的 MCP 服务 data-collector-mcp,用于通过 MCP tools 调用采集测试网关和汇采 HTTP 接口。

首期支持 Modbus,后续需要扩展支持 S7、Rockwell、OPC UA 等协议。

首期不包含前端、异步任务和调用日志。

服务形态

  • MCP 服务名:data-collector-mcp
  • 默认监听地址:0.0.0.0
  • 默认监听端口:8080
  • 默认 MCP 路径:/mcp
  • 默认 MCP HTTP endpoint:http://127.0.0.1:8080/mcp

8080 是 HTTP 服务监听端口,MCP endpoint 挂载在该 HTTP 服务的 /mcp 路径上。

技术栈

  • Python 3.11
  • FastMCP
  • Uvicorn
  • Requests
  • SQLAlchemy
  • PostgreSQL
  • psycopg2-binary

项目结构

data-collector-mcp/
  pyproject.toml
  Dockerfile
  README.md
  docs/
    data-collector-mcp-design.md
  data_collector_mcp/
    __init__.py
    __main__.py
    server.py
    auth.py
    db.py
    http_client.py
    gateway_api.py
    collector_api.py
    protocols/
      __init__.py
      base.py
      modbus.py
  tests/
    test_auth.py
    test_gateway_api.py
    test_collector_api.py

数据库设计

不使用 SQLite。服务通过 DATABASE_URL 连接 PostgreSQL。

数据库名建议为:

data_collector_mcp

如果手动创建 PostgreSQL 数据库,由于名称包含 -,需要使用双引号:

CREATE DATABASE "data_collector_mcp";

表名:sys_config

表结构与 D:\Projects\project-data-mcp 中的 sys_config 保持一致:

id integer primary key
key varchar(128) unique not null
value json/jsonb not null

如果需要手动建表,推荐使用 PostgreSQL IDENTITY 写法:

CREATE TABLE IF NOT EXISTS public.sys_config (
    id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    key character varying(128) NOT NULL UNIQUE,
    value json NOT NULL
);

不要直接使用下面这种 SQL,除非已经提前创建了 sys_config_id_seq

id integer NOT NULL DEFAULT nextval('sys_config_id_seq'::regclass)

否则 PostgreSQL 会报错:

relation "sys_config_id_seq" does not exist

配置 key:

mcp_data_collector_projects
mcp_data_collector_token_<project_key>

项目配置

sys_config.key = mcp_data_collector_projects 的 value 为项目数组。

示例:

[
  {
    "project_key": "dev-01",
    "project_name": "DEV开发环境",
    "base_url": "http://127.0.0.1:8000",
    "data_collector_base_url": "http://127.0.0.1:32080",
    "username": "admin",
    "password": "123456",
    "enabled": true
  }
]

字段说明:

字段 必填 含义
project_key 项目唯一标识,MCP tools 通过该字段选择项目。
project_name 项目展示名称,缺省时可使用 project_key
base_url 基础地址,用于登录接口和采集测试网关接口。
data_collector_base_url 汇采管理接口基础地址,用于创建设备、创建点位、查询设备列表。缺失时直接报错。
username 登录用户名。
password 登录密码。
enabled 是否启用,默认 true

地址约定

登录接口使用 base_url

POST {base_url}/api/ai/auth/password_login

Modbus 点位采集测试使用 base_url

POST {base_url}/api/dc-gateway/modbus/read_points

汇采创建设备使用 data_collector_base_url

POST {data_collector_base_url}/api/collector/device

汇采创建 Modbus 点位使用 data_collector_base_url

POST {data_collector_base_url}/api/collector/modbus/point/add_collect_point

汇采设备列表使用 data_collector_base_url

GET {data_collector_base_url}/api/collector/device?num_points=false

data_collector_base_url 缺失时必须直接报错,不回退到 base_url,避免将设备或点位创建请求误发到采集测试网关。

鉴权设计

登录请求:

POST {base_url}/api/ai/auth/password_login
Content-Type: application/json

请求体:

{
  "username": "admin",
  "password": "123456"
}

成功响应示例:

{
  "errcode": 0,
  "msg": "成功",
  "token": "TOKEN",
  "token_expire_time": 1781583584
}

token 缓存到 sys_config

mcp_data_collector_token_<project_key>

缓存值示例:

{
  "auth_token": "TOKEN",
  "expire_at": 1781583584,
  "updated_at": "2026-06-16T00:00:00Z"
}

调用汇采管理接口时,HTTP header 带:

Authorization: TOKEN

采集测试网关接口是否需要 token 以后可扩展;首期按接口文档直接请求。

MCP Tools

project.list

查询启用项目列表。调用其他工具前应先调用该工具选择 project_key

返回示例:

{
  "projects": [
    {
      "project_key": "dev-01",
      "project_name": "DEV开发环境"
    }
  ],
  "total": 1
}

modbus.point_collect_test

调用采集测试网关读取 Modbus 点位并转换为业务值。

上游接口:

POST {base_url}/api/dc-gateway/modbus/read_points

参数建议:

参数 类型 默认值 说明
project_key string 项目标识。
ip string Modbus TCP 设备 IP,不是网关地址。
port integer Modbus TCP 设备端口。
slave_id integer Modbus 从站 ID。
device_type string ModbusTCP 当前只支持 ModbusTCP
word_byte_order string ABCD 网关接口使用的字节序和字序组合。
address_base integer 0 地址偏移。协议地址为 address + address_base
points object[] 点位定义列表。

word_byte_order 取值:

含义
ABCD 寄存器内字节不反转,寄存器顺序不反转。
BADC 每个寄存器内两个字节反转,寄存器顺序不反转。
CDAB 寄存器顺序反转,寄存器内字节不反转。
DCBA 每个寄存器内字节反转,同时寄存器顺序反转。

当 AI 使用采集测试网关读取点位成功后,如果需要继续创建汇采 Modbus 设备,必须将采集网关的 word_byte_order 映射为汇采创建设备接口的 byte_orderword_order

映射关系:

采集网关 word_byte_order 汇采 byte_order 汇采 word_order 含义
ABCD 1 1 大端字节序,大端字序。
BADC 2 1 小端字节序,大端字序。
CDAB 1 2 大端字节序,小端字序。
DCBA 2 2 小端字节序,小端字序。

该映射主要影响 int32uint32float32int64uint64float64 等多寄存器数据类型。对于 int16uint16 等单寄存器数据,word_order 通常不影响结果;对于线圈和离散输入布尔点位,字节序和字序通常不影响结果。

points[].function_code 含义:

含义
1 Read Coils,线圈。
2 Read Discrete Inputs,离散输入。
3 Read Holding Registers,保持寄存器。
4 Read Input Registers,输入寄存器。

points[].type 支持:

bool, int16, uint16, int32, uint32, int64, uint64, float32, float64

请求示例:

{
  "project_key": "dev-01",
  "ip": "192.168.75.240",
  "port": 505,
  "slave_id": 1,
  "word_byte_order": "ABCD",
  "address_base": 0,
  "points": [
    {
      "function_code": 3,
      "address": 7,
      "type": "int16"
    },
    {
      "function_code": 3,
      "address": 10,
      "type": "bool",
      "bit": 2
    }
  ]
}

响应透传上游返回。业务成功按响应体 code == 0 判断。

collector.modbus_device_create

调用汇采接口创建 Modbus 设备。

上游接口:

POST {data_collector_base_url}/api/collector/device

该接口使用汇采设备创建参数,特别注意:

  • byte_orderword_order 是汇采创建设备接口的枚举值。
  • 它们不是采集测试网关 modbus.point_collect_test 中的 word_byte_order
  • 两套参数不能混用。如果先通过采集网关用 word_byte_order 测试成功,再创建汇采设备,需要按映射表转换为 byte_orderword_order

byte_order

含义
1 Big Endian,大端字节序。
2 Small Endian,小端字节序。

word_order

含义
1 Big Endian,大端字序。
2 Small Endian,小端字序。

采集网关 word_byte_order 到汇采设备枚举的映射:

采集网关 word_byte_order 汇采 byte_order 汇采 word_order
ABCD 1 1
BADC 2 1
CDAB 1 2
DCBA 2 2

device_type

含义
1 TCP。
2 RTU。
3 UDP。
4 RTU OVER TCP。
5 RTU OVER UDP。

MCP 工具可以接收一个 payload,服务端校验必填项、补齐默认值后转发。

必填项:

  • device_type:协议类型,1=TCP2=RTU3=UDP4=RTU OVER TCP5=RTU OVER UDP
  • ip:IP 地址。
  • port:端口号。
  • name:设备名称。
  • slave_id:Modbus 从站 ID。
  • word_order:字顺序。
  • byte_order:字节顺序。
  • address_base:地址基准;转发给汇采接口时转换为 address_offset

默认值:

{
  "type": "modbus",
  "timeout": 3,
  "is_persistent": true,
  "group_id": 0,
  "alarm_interval": 90,
  "collect_interval": 5,
  "retry_times": 0
}

请求示例:

{
  "project_key": "dev-01",
  "payload": {
    "name": "modbus_tcp_1",
    "device_type": 1,
    "ip": "127.0.0.1",
    "port": 5502,
    "slave_id": 1,
    "byte_order": 1,
    "word_order": 1,
    "address_base": 0
  }
}

实际转发给汇采的请求体:

{
  "name": "modbus_tcp_1",
  "type": "modbus",
  "device_type": 1,
  "ip": "127.0.0.1",
  "port": 5502,
  "slave_id": 1,
  "timeout": 3,
  "byte_order": 1,
  "word_order": 1,
  "is_persistent": true,
  "group_id": 0,
  "alarm_interval": 90,
  "collect_interval": 5,
  "address_offset": 0,
  "retry_times": 0
}

响应透传上游返回。业务成功按响应体 state == 0 判断。

collector.modbus_point_create

调用汇采接口创建 Modbus 采集点位。

上游接口:

POST {data_collector_base_url}/api/collector/modbus/point/add_collect_point

MCP 工具接收 payload,服务端做少量默认值补齐后转发。

创建点位时必须传入以下字段:

字段 说明
payload.name 点位名称。
payload.address 寄存器地址。
payload.type 数据类型。
payload.func_codepayload.register_type 寄存器类型。

如果传入 payload.register_type,服务会自动转换为汇采接口要求的 payload.func_code

payload.register_type 映射:

register_type func_code 含义
coil 1 Read Coils,线圈。
discrete_input 2 Read Discrete Inputs,离散输入。
holding_register 3 Read Holding Registers,保持寄存器。
input_register 4 Read Input Registers,输入寄存器。

payload.func_code 含义:

含义
1 Read Coils,线圈。
2 Read Discrete Inputs,离散输入。
3 Read Holding Registers,保持寄存器。
4 Read Input Registers,输入寄存器。

payload.type 支持:

bool, int16, uint16, int32, uint32, int64, uint64, float32, float64

为避免 AI 按点表原始类型传错,服务会自动转换常见点表类型别名:

点表类型 汇采 type 说明
BOOL / BOOLEAN bool 布尔值。
SHORT int16 16 位有符号整数。
WORD uint16 16 位无符号整数。
LONG int32 32 位有符号整数。
DWORD uint32 32 位无符号整数。
FLOAT / REAL float32 32 位浮点数。
DOUBLE float64 64 位浮点数。
LONGLONG int64 64 位有符号整数。
QWORD uint64 64 位无符号整数。

默认值:

{
  "scale_ratio": 1,
  "value_offset": 0,
  "group_id": 0,
  "invalid_values": "",
  "valid_range_start": null,
  "valid_range_end": null,
  "bit": 0
}

请求示例:

{
  "project_key": "dev-01",
  "payload": {
    "device_id": 1,
    "name": "holding_register_uint16",
    "point_id": "HR_UINT16",
    "describe": "保持寄存器 uint16 示例",
    "register_type": "holding_register",
    "address": 10,
    "type": "WORD"
  }
}

实际转发给汇采的请求体:

{
  "device_id": 1,
  "name": "holding_register_uint16",
  "point_id": "HR_UINT16",
  "describe": "保持寄存器 uint16 示例",
  "func_code": 3,
  "address": 10,
  "type": "uint16",
  "scale_ratio": 1,
  "value_offset": 0,
  "group_id": 0,
  "invalid_values": "",
  "valid_range_start": null,
  "valid_range_end": null,
  "bit": 0
}

响应透传上游返回。业务成功按响应体 state == 0 判断。

collector.device_list

查询汇采设备列表。

上游接口:

GET {data_collector_base_url}/api/collector/device?num_points=false

参数:

参数 类型 默认值 说明
project_key string 项目标识。
num_points boolean false 是否统计设备或点位分组下的点位数量。只有需要统计时才传 true

响应透传上游返回。业务成功按响应体 state == 0 判断。

上游响应处理

首期创建类工具和查询类工具均透传上游 JSON 响应,不做复杂业务包装。

HTTP 层错误或非 JSON 响应应抛出明确异常:

  • HTTP 请求失败或超时:提示上游服务不可达或请求超时。
  • HTTP 返回非 JSON:提示状态码和响应体预览。
  • HTTP 状态码大于等于 400:提示 HTTP 状态码和响应 JSON。

业务成功判断规则保留在工具描述和 README 中:

  • 采集测试网关:code == 0 表示成功。
  • 汇采接口:state == 0 表示成功。

扩展性设计

为后续支持 S7、Rockwell、OPC UA,首期内部采用协议模块结构,但 MCP tools 仍保持协议专用命名。

不建议首期只暴露一个泛化工具,例如:

collector.device_create(protocol, payload)

原因是不同协议的地址、数据类型、连接参数差异较大,泛化工具容易让 AI 混淆参数。

推荐方式:

  • MCP 层暴露协议专用 tools,工具描述明确协议规则。
  • 内部实现复用 HTTP 请求、鉴权、项目配置、默认值合并等通用逻辑。
  • 后续新增协议时新增协议模块和专用 MCP tools。

协议模块建议定义:

class ProtocolSpec:
    protocol: str
    create_device_path: str
    create_point_path: str
    point_test_path: str | None
    device_defaults: dict
    point_defaults: dict

Modbus 协议模块首期包含:

protocol = "modbus"
create_device_path = "/api/collector/device"
create_point_path = "/api/collector/modbus/point/add_collect_point"
point_test_path = "/api/dc-gateway/modbus/read_points"

后续扩展示例:

protocols/s7.py
protocols/rockwell.py
protocols/opcua.py

对应 tools 示例:

s7.point_collect_test
collector.s7_device_create
collector.s7_point_create
rockwell.point_collect_test
collector.rockwell_device_create
collector.rockwell_point_create
opcua.point_collect_test
collector.opcua_device_create
collector.opcua_point_create

环境变量

环境变量 默认值 说明
DATABASE_URL PostgreSQL 连接字符串,必填。
MCP_HOST 0.0.0.0 HTTP 服务监听地址。
MCP_PORT 8080 HTTP 服务监听端口。
MCP_PATH /mcp MCP endpoint 路径。
UPSTREAM_REQUEST_TIMEOUT 60 上游 HTTP 请求超时时间,单位秒。

DATABASE_URL 示例:

postgresql+psycopg2://user:password@host:5432/data_collector_mcp

Dockerfile 要求

Docker 镜像需要:

  • 基于 Python 3.11。
  • 安装项目依赖。
  • 暴露端口 8080
  • 默认启动 python -m data_collector_mcp

运行时通过环境变量传入:

DATABASE_URL
MCP_HOST
MCP_PORT
MCP_PATH
UPSTREAM_REQUEST_TIMEOUT

首期测试范围

单元测试使用 mock,不依赖真实上游服务。

需要覆盖:

  • 项目配置读取。
  • 缺失 data_collector_base_url 时报错。
  • 登录 token 缓存命中。
  • 登录 token 过期后重新登录。
  • modbus.point_collect_test 请求 URL、请求体和透传响应。
  • collector.modbus_device_create 默认值合并、请求 URL、Authorization header 和透传响应。
  • collector.modbus_point_create 默认值合并、请求 URL、Authorization header 和透传响应。
  • collector.device_list 默认 num_points=false

非目标

首期不实现:

  • 前端页面。
  • 异步任务。
  • 调用日志。
  • 点位历史数据。
  • 拓扑查询。
  • 自动发现设备。
  • 自动解析 Excel 点表。
  • 自动将采集测试结果转换为汇采点位创建请求。