value_refresh.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. """Refresh realtime values into the register store."""
  2. import logging
  3. import threading
  4. import time
  5. from modbus_codec import encode_registers
  6. logger = logging.getLogger(__name__)
  7. class ValueRefreshWorker(threading.Thread):
  8. def __init__(self, points, provider, store, interval_seconds: int, batch_size: int):
  9. super().__init__(name="value-refresh-worker", daemon=True)
  10. self.points = points
  11. self.provider = provider
  12. self.store = store
  13. self.interval_seconds = interval_seconds
  14. self.batch_size = batch_size
  15. def run(self) -> None:
  16. logger.info("实时值刷新线程已启动,刷新周期=%s秒", self.interval_seconds)
  17. while True:
  18. try:
  19. self.refresh_once(initial=False)
  20. except Exception:
  21. logger.exception("实时值刷新失败")
  22. time.sleep(self.interval_seconds)
  23. def refresh_once(self, initial: bool) -> None:
  24. start_time = time.perf_counter()
  25. point_by_id = {point.point_id: point for point in self.points}
  26. point_ids = list(point_by_id)
  27. total_batches = 0
  28. failed_batches = 0
  29. try:
  30. for start in range(0, len(point_ids), self.batch_size):
  31. total_batches += 1
  32. batch = point_ids[start:start + self.batch_size]
  33. try:
  34. values = self.provider.fetch_values(batch)
  35. except Exception:
  36. failed_batches += 1
  37. if not initial:
  38. logger.exception("周期刷新实时值请求失败,跳过当前批次,起始序号=%d,数量=%d", start, len(batch))
  39. continue
  40. logger.exception("初始化实时值请求失败,起始序号=%d,数量=%d", start, len(batch))
  41. raise
  42. missing_point_ids = []
  43. for point_id in batch:
  44. point = point_by_id[point_id]
  45. if point_id not in values:
  46. if initial:
  47. logger.error("初始化实时值缺失,point_id=%s", point_id)
  48. missing_point_ids.append(point_id)
  49. continue
  50. else:
  51. logger.warning("周期刷新实时值缺失,point_id=%s,保持旧值", point_id)
  52. continue
  53. else:
  54. value = values[point_id]
  55. registers = encode_registers(value, point.data_type)
  56. self.store.write_internal(point.slave_id, point.address, registers)
  57. if missing_point_ids:
  58. raise RuntimeError(f"初始化实时值缺失,数量={len(missing_point_ids)}")
  59. finally:
  60. elapsed_ms = (time.perf_counter() - start_time) * 1000
  61. phase = "初始化" if initial else "周期刷新"
  62. logger.info(
  63. "%s实时值HTTP请求完成,点位总数=%d,批次数=%d,失败批次数=%d,耗时=%.2fms",
  64. phase,
  65. len(point_ids),
  66. total_batches,
  67. failed_batches,
  68. elapsed_ms,
  69. )