compare_modbus_http_client.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """Compare Modbus TCP point values with realtime HTTP API values.
  2. Run this while the Modbus server is already running.
  3. """
  4. import argparse
  5. import socket
  6. import sys
  7. import threading
  8. import time
  9. from pathlib import Path
  10. from pymodbus.client import ModbusTcpClient
  11. sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
  12. from app_config import load_config # noqa: E402
  13. from db import create_connection # noqa: E402
  14. from http_value_provider import HttpValueProvider # noqa: E402
  15. from modbus_context import ReadonlyHoldingRegisterContext # noqa: E402
  16. from modbus_codec import encode_registers # noqa: E402
  17. from modbus_server import run_modbus_server # noqa: E402
  18. from point_loader import load_points # noqa: E402
  19. from point_model import ModbusPoint # noqa: E402
  20. from register_store import RegisterStore, initialize_register_store # noqa: E402
  21. from value_refresh import ValueRefreshWorker # noqa: E402
  22. def parse_args() -> argparse.Namespace:
  23. parser = argparse.ArgumentParser(
  24. description="Read points by pymodbus, read the same points by HTTP API, then compare registers."
  25. )
  26. parser.add_argument("--config", default="config.yaml", help="config file path")
  27. parser.add_argument("--modbus-host", help="Modbus TCP host. Defaults to config host, 0.0.0.0 becomes 127.0.0.1")
  28. parser.add_argument("--modbus-port", type=int, help="Modbus TCP port. Defaults to config port, or 15020 with --self-start")
  29. parser.add_argument("--timeout", type=float, default=3, help="Modbus client timeout seconds")
  30. parser.add_argument("--limit", type=int, default=0, help="limit point count after filtering; 0 means no limit")
  31. parser.add_argument("--point-id", action="append", default=[], help="point_id to compare; can be repeated or comma-separated")
  32. parser.add_argument("--max-details", type=int, default=20, help="max mismatch/error details to print")
  33. parser.add_argument("--self-start", action="store_true", help="start an in-process Modbus server from DB points before comparing")
  34. return parser.parse_args()
  35. def main() -> int:
  36. args = parse_args()
  37. config = load_config(args.config)
  38. points = _load_points(config)
  39. points = _filter_points(points, args.point_id, args.limit)
  40. if not points:
  41. print("没有可比较的点位")
  42. return 2
  43. host = args.modbus_host or _client_host(config.modbus.host)
  44. port = args.modbus_port or (15020 if args.self_start else config.modbus.port)
  45. provider = HttpValueProvider(config.http_provider.url, config.http_provider.timeout_seconds)
  46. if args.self_start:
  47. host = args.modbus_host or "127.0.0.1"
  48. print(f"开始读取HTTP实时值接口快照,url={config.http_provider.url}, 点位数={len(points)}")
  49. http_values = _read_http_values(provider, [point.point_id for point in points], config.http_provider.batch_size)
  50. _start_embedded_modbus_server(
  51. host,
  52. port,
  53. points,
  54. http_values,
  55. config.http_provider.interval,
  56. config.http_provider.batch_size,
  57. args.timeout,
  58. )
  59. else:
  60. http_values = None
  61. print(f"开始读取Modbus点位,host={host}, port={port}, 点位数={len(points)}")
  62. modbus_results = _read_modbus_values(host, port, args.timeout, points)
  63. if http_values is None:
  64. print(f"开始读取HTTP实时值接口,url={config.http_provider.url}, 点位数={len(points)}")
  65. http_values = _read_http_values(provider, [point.point_id for point in points], config.http_provider.batch_size)
  66. return _compare(points, modbus_results, http_values, args.max_details)
  67. def _load_points(config) -> list[ModbusPoint]:
  68. conn = create_connection(config.db)
  69. try:
  70. return load_points(conn)
  71. finally:
  72. conn.close()
  73. def _filter_points(points: list[ModbusPoint], raw_point_ids: list[str], limit: int) -> list[ModbusPoint]:
  74. point_ids = {
  75. point_id.strip()
  76. for item in raw_point_ids
  77. for point_id in item.split(",")
  78. if point_id.strip()
  79. }
  80. if point_ids:
  81. points = [point for point in points if point.point_id in point_ids]
  82. if limit > 0:
  83. points = points[:limit]
  84. return points
  85. def _client_host(host: str) -> str:
  86. return "127.0.0.1" if host in {"", "0.0.0.0", "::"} else host
  87. def _start_embedded_modbus_server(
  88. host: str,
  89. port: int,
  90. points: list[ModbusPoint],
  91. http_values: dict[str, object],
  92. interval_seconds: int,
  93. batch_size: int,
  94. timeout_seconds: float,
  95. ) -> None:
  96. store = RegisterStore()
  97. initialize_register_store(points, store)
  98. ValueRefreshWorker(points, SnapshotProvider(http_values), store, interval_seconds, batch_size).refresh_once(initial=True)
  99. context = ReadonlyHoldingRegisterContext(store)
  100. thread = threading.Thread(
  101. target=run_modbus_server,
  102. args=(context, host, port),
  103. name="embedded-modbus-server",
  104. daemon=True,
  105. )
  106. thread.start()
  107. _wait_for_tcp(host, port, timeout_seconds)
  108. print(f"已启动内置Modbus Server,host={host}, port={port}, {store.describe()}")
  109. class SnapshotProvider:
  110. def __init__(self, values: dict[str, object]):
  111. self.values = values
  112. def fetch_values(self, point_ids: list[str]) -> dict[str, object]:
  113. return {point_id: self.values[point_id] for point_id in point_ids if point_id in self.values}
  114. def _wait_for_tcp(host: str, port: int, timeout_seconds: float) -> None:
  115. deadline = time.monotonic() + timeout_seconds
  116. last_error = None
  117. while time.monotonic() < deadline:
  118. try:
  119. with socket.create_connection((host, port), timeout=0.2):
  120. return
  121. except OSError as exc:
  122. last_error = exc
  123. time.sleep(0.1)
  124. raise RuntimeError(f"等待内置Modbus Server启动超时: {host}:{port}, last_error={last_error}")
  125. def _read_modbus_values(host: str, port: int, timeout: float, points: list[ModbusPoint]) -> dict[str, list[int] | str]:
  126. client = ModbusTcpClient(host, port=port, timeout=timeout)
  127. if not client.connect():
  128. raise RuntimeError(f"Modbus连接失败: {host}:{port}")
  129. try:
  130. results: dict[str, list[int] | str] = {}
  131. for point in points:
  132. response = client.read_holding_registers(
  133. point.address,
  134. count=point.register_count,
  135. device_id=point.slave_id,
  136. )
  137. if response.isError():
  138. results[point.point_id] = str(response)
  139. else:
  140. results[point.point_id] = list(response.registers)
  141. return results
  142. finally:
  143. client.close()
  144. def _read_http_values(provider: HttpValueProvider, point_ids: list[str], batch_size: int) -> dict[str, object]:
  145. values: dict[str, object] = {}
  146. for start in range(0, len(point_ids), batch_size):
  147. batch = point_ids[start:start + batch_size]
  148. values.update(provider.fetch_values(batch))
  149. return values
  150. def _compare(
  151. points: list[ModbusPoint],
  152. modbus_results: dict[str, list[int] | str],
  153. http_values: dict[str, object],
  154. max_details: int,
  155. ) -> int:
  156. matched = 0
  157. modbus_errors = []
  158. http_missing = []
  159. mismatches = []
  160. for point in points:
  161. modbus_value = modbus_results.get(point.point_id)
  162. if not isinstance(modbus_value, list):
  163. modbus_errors.append((point, modbus_value))
  164. continue
  165. if point.point_id not in http_values:
  166. http_missing.append(point)
  167. continue
  168. http_value = http_values[point.point_id]
  169. try:
  170. expected_registers = encode_registers(http_value, point.data_type)
  171. except Exception as exc:
  172. mismatches.append((point, modbus_value, f"HTTP值编码失败: {exc}", http_value))
  173. continue
  174. if modbus_value == expected_registers:
  175. matched += 1
  176. else:
  177. mismatches.append((point, modbus_value, expected_registers, http_value))
  178. print(
  179. "比较完成: "
  180. f"点位总数={len(points)}, 一致={matched}, 不一致={len(mismatches)}, "
  181. f"Modbus读取失败={len(modbus_errors)}, HTTP缺失={len(http_missing)}"
  182. )
  183. if len(modbus_errors) == len(points):
  184. print(
  185. "诊断: 所有点位都 Modbus 读取失败,本次没有完成有效比较。"
  186. "请确认连接的是当前启动的 Modbus Server,并检查服务端日志里的寄存器存储初始化范围和非法地址日志。"
  187. )
  188. _print_modbus_errors(modbus_errors, max_details)
  189. _print_http_missing(http_missing, max_details)
  190. _print_mismatches(mismatches, max_details)
  191. return 0 if not modbus_errors and not http_missing and not mismatches else 1
  192. def _point_label(point: ModbusPoint) -> str:
  193. return (
  194. f"point_id={point.point_id}, name={point.name}, data_type={point.data_type}, "
  195. f"slave_id={point.slave_id}, address={point.address}"
  196. )
  197. def _print_modbus_errors(errors, max_details: int) -> None:
  198. for point, error in errors[:max_details]:
  199. print(f"Modbus读取失败: {_point_label(point)}, error={error}")
  200. if len(errors) > max_details:
  201. print(f"Modbus读取失败还有 {len(errors) - max_details} 条未显示")
  202. def _print_http_missing(points: list[ModbusPoint], max_details: int) -> None:
  203. for point in points[:max_details]:
  204. print(f"HTTP缺失: {_point_label(point)}")
  205. if len(points) > max_details:
  206. print(f"HTTP缺失还有 {len(points) - max_details} 条未显示")
  207. def _print_mismatches(mismatches, max_details: int) -> None:
  208. for point, modbus_value, expected_registers, http_value in mismatches[:max_details]:
  209. print(
  210. f"不一致: {_point_label(point)}, "
  211. f"modbus_registers={modbus_value}, http_value={http_value}, expected_registers={expected_registers}"
  212. )
  213. if len(mismatches) > max_details:
  214. print(f"不一致还有 {len(mismatches) - max_details} 条未显示")
  215. if __name__ == "__main__":
  216. raise SystemExit(main())