"""Application entrypoint.""" import logging import sys from app_config import load_config from constants import DEFAULT_BATCH_SIZE from db import create_connection from http_value_provider import HttpValueProvider from logging_config import setup_logging from modbus_context import ReadonlyHoldingRegisterContext from modbus_server import run_modbus_server from point_loader import check_point_exists, load_points, validate_address_overlaps, validate_data_types from register_store import RegisterStore, initialize_register_store from value_refresh import ValueRefreshWorker logger = logging.getLogger(__name__) def main(config_path: str = "config.yaml") -> int: config = load_config(config_path) setup_logging(config.logging.dir, config.logging.retention_days, config.logging.level) logger.info("正在启动Modbus Server") logger.info("日志系统初始化完成") logger.info("配置文件加载完成") logger.info( "运行配置: 数据库=%s:%s/%s, Modbus监听=%s:%s, 刷新周期=%s秒, 批量大小=%s", config.db.host, config.db.port, config.db.database, config.modbus.host, config.modbus.port, config.modbus.interval, DEFAULT_BATCH_SIZE, ) try: points = _load_and_validate_points(config) except Exception: logger.exception("初始化失败") return 1 store = RegisterStore() initialize_register_store(points, store) logger.info("寄存器存储初始化完成,%s", store.describe()) provider = HttpValueProvider(config.http_provider.url, config.http_provider.timeout_seconds) logger.info("实时值Provider初始化完成,类型=http") worker = ValueRefreshWorker(points, provider, store, config.modbus.interval) logger.info("开始请求初始化实时值") try: worker.refresh_once(initial=True) except Exception: logger.exception("初始化实时值失败") return 1 worker.start() context = ReadonlyHoldingRegisterContext(store) logger.info("上下文初始化完成(Modbus)") try: run_modbus_server(context, config.modbus.host, config.modbus.port) except Exception: logger.exception("Modbus TCP服务启动或运行失败") return 1 return 0 def _load_and_validate_points(config) -> list: conn = create_connection(config.db) try: logger.info("数据库连接成功") logger.info("开始从modbus_server_point加载全部点位") points = load_points(conn) logger.info("点位加载完成,数量=%s", len(points)) if not points: logger.warning("数据表modbus_server_point没有点位,将启动空Modbus Server") logger.info("开始校验点位data_type") type_errors = validate_data_types(points) if type_errors: for error in type_errors: logger.error("点位data_type非法: %s", error) raise RuntimeError("点位data_type校验失败") logger.info("开始校验Modbus地址重叠") overlap_errors = validate_address_overlaps(points) if overlap_errors: for error in overlap_errors: logger.error(error) raise RuntimeError("Modbus地址重叠校验失败") logger.info("开始校验pt_point点位是否存在,批量大小=%s", DEFAULT_BATCH_SIZE) missing_point_ids = check_point_exists(conn, [point.point_id for point in points]) if missing_point_ids: logger.error("数据表pt_point中缺失以下point_id: %s", missing_point_ids) raise RuntimeError("pt_point点位存在性校验失败") return points finally: conn.close() def cli() -> None: raise SystemExit(main(sys.argv[1] if len(sys.argv) > 1 else "config.yaml")) if __name__ == "__main__": cli()