from __future__ import annotations import time from datetime import datetime, timezone from typing import Any from .db import read_sys_config_value, write_sys_config_value from .http_client import request_json PROJECTS_CONFIG_KEY = "mcp_data_collector_projects" PROJECT_TOKEN_CONFIG_KEY_PREFIX = "mcp_data_collector_token_" PROJECT_AUTH_LOGIN_PATH = "/api/ai/auth/password_login" def _to_iso_utc(timestamp: float) -> str: return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat().replace("+00:00", "Z") def _normalize_project_key(raw_value: Any) -> str: project_key = str(raw_value or "").strip() if not project_key: raise ValueError("project_key is required") return project_key def _normalize_base_url(raw_value: Any, field_name: str) -> str: base_url = str(raw_value or "").strip().rstrip("/") if not base_url: raise ValueError(f"{field_name} is required") return base_url def _coerce_bool(raw_value: Any, default: bool) -> bool: if raw_value is None: return default if isinstance(raw_value, bool): return raw_value text = str(raw_value).strip().lower() if text in {"1", "true", "yes", "y", "on"}: return True if text in {"0", "false", "no", "n", "off"}: return False return default def _safe_int(raw_value: Any, field_name: str) -> int: try: return int(str(raw_value).strip()) except Exception as exc: raise ValueError(f"invalid {field_name}: {raw_value}") from exc def _project_token_config_key(project_key: str) -> str: return f"{PROJECT_TOKEN_CONFIG_KEY_PREFIX}{project_key}" def load_projects_config() -> list[dict[str, Any]]: raw_value = read_sys_config_value(PROJECTS_CONFIG_KEY) if raw_value is None: raise ValueError(f"missing sys_config key: {PROJECTS_CONFIG_KEY}") if not isinstance(raw_value, list): raise ValueError(f"sys_config {PROJECTS_CONFIG_KEY} must be a JSON array") latest_by_key: dict[str, dict[str, Any]] = {} for item in raw_value: if not isinstance(item, dict): continue project_key = _normalize_project_key(item.get("project_key")) project_name = str(item.get("project_name") or project_key).strip() or project_key base_url = _normalize_base_url(item.get("base_url"), "base_url") data_collector_base_url = _normalize_base_url( item.get("data_collector_base_url"), "data_collector_base_url", ) username = str(item.get("username") or "").strip() password = str(item.get("password") or "").strip() enabled = _coerce_bool(item.get("enabled"), True) latest_by_key[project_key] = { "project_key": project_key, "project_name": project_name, "base_url": base_url, "data_collector_base_url": data_collector_base_url, "username": username, "password": password, "enabled": enabled, } projects = list(latest_by_key.values()) if not projects: raise ValueError(f"{PROJECTS_CONFIG_KEY} has no valid project entries") return projects def find_project_config(project_key: str) -> dict[str, Any]: expected = _normalize_project_key(project_key) for item in load_projects_config(): if item["project_key"] != expected: continue if not item["enabled"]: raise ValueError(f"project '{expected}' is disabled") if not item["username"]: raise ValueError(f"project '{expected}' missing username") if not item["password"]: raise ValueError(f"project '{expected}' missing password") return item raise ValueError(f"project_key not found: {expected}") def _read_project_token_cache(project_key: str) -> dict[str, Any] | None: raw_value = read_sys_config_value(_project_token_config_key(project_key)) if not isinstance(raw_value, dict): return None return raw_value def _write_project_token_cache(project_key: str, *, auth_token: str, expire_at: int) -> None: write_sys_config_value( _project_token_config_key(project_key), { "auth_token": auth_token, "expire_at": expire_at, "updated_at": _to_iso_utc(time.time()), }, ) def _login_project(project_cfg: dict[str, Any]) -> tuple[str, int]: payload = request_json( "POST", f"{project_cfg['base_url']}{PROJECT_AUTH_LOGIN_PATH}", json_payload={ "username": project_cfg["username"], "password": project_cfg["password"], }, ) if not isinstance(payload, dict): raise ValueError("login failed: invalid response payload") errcode = payload.get("errcode") if str(errcode) not in {"0", "0.0"}: message = str(payload.get("msg") or payload.get("message") or "").strip() raise ValueError(f"login failed: {message or payload}") auth_token = str(payload.get("token") or "").strip() if not auth_token: raise ValueError("login failed: missing token") expire_raw = payload.get("token_expire_time") if expire_raw is None: expire_at = int(time.time()) + 3600 else: expire_at = _safe_int(expire_raw, "token_expire_time") if expire_at <= int(time.time()): expire_at = int(time.time()) + 3600 return auth_token, expire_at def resolve_project_token(project_cfg: dict[str, Any]) -> str: project_key = project_cfg["project_key"] cached = _read_project_token_cache(project_key) if isinstance(cached, dict): token = str(cached.get("auth_token") or "").strip() expire_raw = cached.get("expire_at") if token and expire_raw is not None: try: if int(time.time()) < int(str(expire_raw).strip()): return token except (TypeError, ValueError): pass auth_token, expire_at = _login_project(project_cfg) _write_project_token_cache(project_key, auth_token=auth_token, expire_at=expire_at) return auth_token