| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """Compare Modbus TCP point values with realtime HTTP API values.
- Run this while the Modbus server is already running.
- """
- import argparse
- import socket
- import sys
- import threading
- import time
- from pathlib import Path
- from pymodbus.client import ModbusTcpClient
- sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
- from app_config import load_config # noqa: E402
- from constants import DEFAULT_BATCH_SIZE # noqa: E402
- from db import create_connection # noqa: E402
- from http_value_provider import HttpValueProvider # noqa: E402
- from modbus_context import ReadonlyHoldingRegisterContext # noqa: E402
- from modbus_codec import encode_registers # noqa: E402
- from modbus_server import run_modbus_server # noqa: E402
- from point_loader import load_points # noqa: E402
- from point_model import ModbusPoint # noqa: E402
- from register_store import RegisterStore, initialize_register_store # noqa: E402
- from value_refresh import ValueRefreshWorker # noqa: E402
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(
- description="Read points by pymodbus, read the same points by HTTP API, then compare registers."
- )
- parser.add_argument("--config", default="config.yaml", help="config file path")
- parser.add_argument("--modbus-host", help="Modbus TCP host. Defaults to config host, 0.0.0.0 becomes 127.0.0.1")
- parser.add_argument("--modbus-port", type=int, help="Modbus TCP port. Defaults to config port, or 15020 with --self-start")
- parser.add_argument("--timeout", type=float, default=3, help="Modbus client timeout seconds")
- parser.add_argument("--limit", type=int, default=0, help="limit point count after filtering; 0 means no limit")
- parser.add_argument("--point-id", action="append", default=[], help="point_id to compare; can be repeated or comma-separated")
- parser.add_argument("--max-details", type=int, default=20, help="max mismatch/error details to print")
- parser.add_argument("--self-start", action="store_true", help="start an in-process Modbus server from DB points before comparing")
- return parser.parse_args()
- def main() -> int:
- args = parse_args()
- config = load_config(args.config)
- points = _load_points(config)
- points = _filter_points(points, args.point_id, args.limit)
- if not points:
- print("没有可比较的点位")
- return 2
- host = args.modbus_host or _client_host(config.modbus.host)
- port = args.modbus_port or (15020 if args.self_start else config.modbus.port)
- provider = HttpValueProvider(config.http_provider.url, config.http_provider.timeout_seconds)
- if args.self_start:
- host = args.modbus_host or "127.0.0.1"
- print(f"开始读取HTTP实时值接口快照,url={config.http_provider.url}, 点位数={len(points)}")
- http_values = _read_http_values(provider, [point.point_id for point in points])
- _start_embedded_modbus_server(host, port, points, http_values, config.modbus.interval, args.timeout)
- else:
- http_values = None
- print(f"开始读取Modbus点位,host={host}, port={port}, 点位数={len(points)}")
- modbus_results = _read_modbus_values(host, port, args.timeout, points)
- if http_values is None:
- print(f"开始读取HTTP实时值接口,url={config.http_provider.url}, 点位数={len(points)}")
- http_values = _read_http_values(provider, [point.point_id for point in points])
- return _compare(points, modbus_results, http_values, args.max_details)
- def _load_points(config) -> list[ModbusPoint]:
- conn = create_connection(config.db)
- try:
- return load_points(conn)
- finally:
- conn.close()
- def _filter_points(points: list[ModbusPoint], raw_point_ids: list[str], limit: int) -> list[ModbusPoint]:
- point_ids = {
- point_id.strip()
- for item in raw_point_ids
- for point_id in item.split(",")
- if point_id.strip()
- }
- if point_ids:
- points = [point for point in points if point.point_id in point_ids]
- if limit > 0:
- points = points[:limit]
- return points
- def _client_host(host: str) -> str:
- return "127.0.0.1" if host in {"", "0.0.0.0", "::"} else host
- def _start_embedded_modbus_server(
- host: str,
- port: int,
- points: list[ModbusPoint],
- http_values: dict[str, object],
- interval_seconds: int,
- timeout_seconds: float,
- ) -> None:
- store = RegisterStore()
- initialize_register_store(points, store)
- ValueRefreshWorker(points, SnapshotProvider(http_values), store, interval_seconds).refresh_once(initial=True)
- context = ReadonlyHoldingRegisterContext(store)
- thread = threading.Thread(
- target=run_modbus_server,
- args=(context, host, port),
- name="embedded-modbus-server",
- daemon=True,
- )
- thread.start()
- _wait_for_tcp(host, port, timeout_seconds)
- print(f"已启动内置Modbus Server,host={host}, port={port}, {store.describe()}")
- class SnapshotProvider:
- def __init__(self, values: dict[str, object]):
- self.values = values
- def fetch_values(self, point_ids: list[str]) -> dict[str, object]:
- return {point_id: self.values[point_id] for point_id in point_ids if point_id in self.values}
- def _wait_for_tcp(host: str, port: int, timeout_seconds: float) -> None:
- deadline = time.monotonic() + timeout_seconds
- last_error = None
- while time.monotonic() < deadline:
- try:
- with socket.create_connection((host, port), timeout=0.2):
- return
- except OSError as exc:
- last_error = exc
- time.sleep(0.1)
- raise RuntimeError(f"等待内置Modbus Server启动超时: {host}:{port}, last_error={last_error}")
- def _read_modbus_values(host: str, port: int, timeout: float, points: list[ModbusPoint]) -> dict[str, list[int] | str]:
- client = ModbusTcpClient(host, port=port, timeout=timeout)
- if not client.connect():
- raise RuntimeError(f"Modbus连接失败: {host}:{port}")
- try:
- results: dict[str, list[int] | str] = {}
- for point in points:
- response = client.read_holding_registers(
- point.address,
- count=point.register_count,
- device_id=point.slave_id,
- )
- if response.isError():
- results[point.point_id] = str(response)
- else:
- results[point.point_id] = list(response.registers)
- return results
- finally:
- client.close()
- def _read_http_values(provider: HttpValueProvider, point_ids: list[str]) -> dict[str, object]:
- values: dict[str, object] = {}
- for start in range(0, len(point_ids), DEFAULT_BATCH_SIZE):
- batch = point_ids[start:start + DEFAULT_BATCH_SIZE]
- values.update(provider.fetch_values(batch))
- return values
- def _compare(
- points: list[ModbusPoint],
- modbus_results: dict[str, list[int] | str],
- http_values: dict[str, object],
- max_details: int,
- ) -> int:
- matched = 0
- modbus_errors = []
- http_missing = []
- mismatches = []
- for point in points:
- modbus_value = modbus_results.get(point.point_id)
- if not isinstance(modbus_value, list):
- modbus_errors.append((point, modbus_value))
- continue
- if point.point_id not in http_values:
- http_missing.append(point)
- continue
- http_value = http_values[point.point_id]
- try:
- expected_registers = encode_registers(http_value, point.data_type)
- except Exception as exc:
- mismatches.append((point, modbus_value, f"HTTP值编码失败: {exc}", http_value))
- continue
- if modbus_value == expected_registers:
- matched += 1
- else:
- mismatches.append((point, modbus_value, expected_registers, http_value))
- print(
- "比较完成: "
- f"点位总数={len(points)}, 一致={matched}, 不一致={len(mismatches)}, "
- f"Modbus读取失败={len(modbus_errors)}, HTTP缺失={len(http_missing)}"
- )
- if len(modbus_errors) == len(points):
- print(
- "诊断: 所有点位都 Modbus 读取失败,本次没有完成有效比较。"
- "请确认连接的是当前启动的 Modbus Server,并检查服务端日志里的寄存器存储初始化范围和非法地址日志。"
- )
- _print_modbus_errors(modbus_errors, max_details)
- _print_http_missing(http_missing, max_details)
- _print_mismatches(mismatches, max_details)
- return 0 if not modbus_errors and not http_missing and not mismatches else 1
- def _point_label(point: ModbusPoint) -> str:
- return (
- f"point_id={point.point_id}, name={point.name}, data_type={point.data_type}, "
- f"slave_id={point.slave_id}, address={point.address}"
- )
- def _print_modbus_errors(errors, max_details: int) -> None:
- for point, error in errors[:max_details]:
- print(f"Modbus读取失败: {_point_label(point)}, error={error}")
- if len(errors) > max_details:
- print(f"Modbus读取失败还有 {len(errors) - max_details} 条未显示")
- def _print_http_missing(points: list[ModbusPoint], max_details: int) -> None:
- for point in points[:max_details]:
- print(f"HTTP缺失: {_point_label(point)}")
- if len(points) > max_details:
- print(f"HTTP缺失还有 {len(points) - max_details} 条未显示")
- def _print_mismatches(mismatches, max_details: int) -> None:
- for point, modbus_value, expected_registers, http_value in mismatches[:max_details]:
- print(
- f"不一致: {_point_label(point)}, "
- f"modbus_registers={modbus_value}, http_value={http_value}, expected_registers={expected_registers}"
- )
- if len(mismatches) > max_details:
- print(f"不一致还有 {len(mismatches) - max_details} 条未显示")
- if __name__ == "__main__":
- raise SystemExit(main())
|