| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from __future__ import annotations
- import os
- from typing import Any
- from sqlalchemy import Integer, JSON, String, create_engine, select, text
- from sqlalchemy.engine import Engine
- from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
- class Base(DeclarativeBase):
- pass
- class SysConfig(Base):
- __tablename__ = "sys_config"
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
- value: Mapped[Any] = mapped_column(JSON, nullable=False)
- _ENGINE: Engine | None = None
- _ENGINE_URL: str | None = None
- def database_url() -> str:
- raw_value = str(os.getenv("DATABASE_URL") or "").strip()
- if not raw_value:
- raise ValueError("DATABASE_URL is required and must point to PostgreSQL")
- if raw_value.startswith("sqlite:"):
- raise ValueError("SQLite is not supported; use PostgreSQL DATABASE_URL")
- return raw_value
- def sql_engine() -> Engine:
- global _ENGINE, _ENGINE_URL
- effective_url = database_url()
- if _ENGINE is not None and _ENGINE_URL == effective_url:
- return _ENGINE
- _ENGINE = create_engine(
- effective_url,
- future=True,
- pool_pre_ping=True,
- pool_recycle=1800,
- )
- _ENGINE_URL = effective_url
- return _ENGINE
- def check_database_connection() -> None:
- engine = sql_engine()
- with engine.connect() as connection:
- connection.execute(text("SELECT 1"))
- def read_sys_config_value(config_key: str) -> Any | None:
- try:
- engine = sql_engine()
- Base.metadata.create_all(engine, checkfirst=True)
- with Session(engine) as session:
- return session.scalar(select(SysConfig.value).where(SysConfig.key == config_key))
- except Exception as exc:
- raise ValueError(f"failed to read sys_config key={config_key}: {exc}") from exc
- def write_sys_config_value(config_key: str, value: Any) -> None:
- try:
- engine = sql_engine()
- Base.metadata.create_all(engine, checkfirst=True)
- with Session(engine) as session:
- row = session.scalar(select(SysConfig).where(SysConfig.key == config_key))
- if row is None:
- session.add(SysConfig(key=config_key, value=value))
- else:
- row.value = value
- session.commit()
- except Exception as exc:
- raise ValueError(f"failed to write sys_config key={config_key}: {exc}") from exc
|