| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- """Refresh realtime values into the register store."""
- import logging
- import threading
- import time
- from modbus_codec import encode_registers
- logger = logging.getLogger(__name__)
- class ValueRefreshWorker(threading.Thread):
- def __init__(self, points, provider, store, interval_seconds: int, batch_size: int):
- super().__init__(name="value-refresh-worker", daemon=True)
- self.points = points
- self.provider = provider
- self.store = store
- self.interval_seconds = interval_seconds
- self.batch_size = batch_size
- def run(self) -> None:
- logger.info("实时值刷新线程已启动,刷新周期=%s秒", self.interval_seconds)
- while True:
- try:
- self.refresh_once(initial=False)
- except Exception:
- logger.exception("实时值刷新失败")
- time.sleep(self.interval_seconds)
- def refresh_once(self, initial: bool) -> None:
- start_time = time.perf_counter()
- point_by_id = {point.point_id: point for point in self.points}
- point_ids = list(point_by_id)
- total_batches = 0
- failed_batches = 0
- try:
- for start in range(0, len(point_ids), self.batch_size):
- total_batches += 1
- batch = point_ids[start:start + self.batch_size]
- try:
- values = self.provider.fetch_values(batch)
- except Exception:
- failed_batches += 1
- if not initial:
- logger.exception("周期刷新实时值请求失败,跳过当前批次,起始序号=%d,数量=%d", start, len(batch))
- continue
- logger.exception("初始化实时值请求失败,起始序号=%d,数量=%d", start, len(batch))
- raise
- missing_point_ids = []
- for point_id in batch:
- point = point_by_id[point_id]
- if point_id not in values:
- if initial:
- logger.error("初始化实时值缺失,point_id=%s", point_id)
- missing_point_ids.append(point_id)
- continue
- else:
- logger.warning("周期刷新实时值缺失,point_id=%s,保持旧值", point_id)
- continue
- else:
- value = values[point_id]
- registers = encode_registers(value, point.data_type)
- self.store.write_internal(point.slave_id, point.address, registers)
- if missing_point_ids:
- raise RuntimeError(f"初始化实时值缺失,数量={len(missing_point_ids)}")
- finally:
- elapsed_ms = (time.perf_counter() - start_time) * 1000
- phase = "初始化" if initial else "周期刷新"
- logger.info(
- "%s实时值HTTP请求完成,点位总数=%d,批次数=%d,失败批次数=%d,耗时=%.2fms",
- phase,
- len(point_ids),
- total_batches,
- failed_batches,
- elapsed_ms,
- )
|