db.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import annotations
  2. import os
  3. from typing import Any
  4. from sqlalchemy import Integer, JSON, String, create_engine, select
  5. from sqlalchemy.engine import Engine
  6. from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
  7. class Base(DeclarativeBase):
  8. pass
  9. class SysConfig(Base):
  10. __tablename__ = "sys_config"
  11. id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
  12. key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
  13. value: Mapped[Any] = mapped_column(JSON, nullable=False)
  14. _ENGINE: Engine | None = None
  15. _ENGINE_URL: str | None = None
  16. def database_url() -> str:
  17. raw_value = str(os.getenv("DATABASE_URL") or "").strip()
  18. if not raw_value:
  19. raise ValueError("DATABASE_URL is required and must point to PostgreSQL")
  20. if raw_value.startswith("sqlite:"):
  21. raise ValueError("SQLite is not supported; use PostgreSQL DATABASE_URL")
  22. return raw_value
  23. def sql_engine() -> Engine:
  24. global _ENGINE, _ENGINE_URL
  25. effective_url = database_url()
  26. if _ENGINE is not None and _ENGINE_URL == effective_url:
  27. return _ENGINE
  28. _ENGINE = create_engine(
  29. effective_url,
  30. future=True,
  31. pool_pre_ping=True,
  32. pool_recycle=1800,
  33. )
  34. _ENGINE_URL = effective_url
  35. return _ENGINE
  36. def read_sys_config_value(config_key: str) -> Any | None:
  37. try:
  38. engine = sql_engine()
  39. Base.metadata.create_all(engine, checkfirst=True)
  40. with Session(engine) as session:
  41. return session.scalar(select(SysConfig.value).where(SysConfig.key == config_key))
  42. except Exception as exc:
  43. raise ValueError(f"failed to read sys_config key={config_key}: {exc}") from exc
  44. def write_sys_config_value(config_key: str, value: Any) -> None:
  45. try:
  46. engine = sql_engine()
  47. Base.metadata.create_all(engine, checkfirst=True)
  48. with Session(engine) as session:
  49. row = session.scalar(select(SysConfig).where(SysConfig.key == config_key))
  50. if row is None:
  51. session.add(SysConfig(key=config_key, value=value))
  52. else:
  53. row.value = value
  54. session.commit()
  55. except Exception as exc:
  56. raise ValueError(f"failed to write sys_config key={config_key}: {exc}") from exc