from __future__ import annotations import os import time from datetime import datetime, timezone from typing import Any import requests from .db import read_sys_config_value, write_sys_config_value PROJECTS_CONFIG_KEY = "mcp_project_data_projects" PROJECT_TOKEN_CONFIG_KEY_PREFIX = "mcp_project_data_project_token:" PROJECT_AUTH_LOGIN_PATH = "/api/ai/auth/password_login" def _to_iso_utc(timestamp: float) -> str: return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() def _request_timeout_seconds() -> int: return max(1, int(os.getenv("UPSTREAM_REQUEST_TIMEOUT", "60"))) def _safe_int(raw_value: Any, field_name: str) -> int: try: return int(str(raw_value).strip()) except Exception as exc: raise ValueError(f"invalid {field_name}: {raw_value}") from exc def _normalize_project_key(raw_value: Any) -> str: project_key = str(raw_value or "").strip() if not project_key: raise ValueError("project_key is required") return project_key def _coerce_bool(raw_value: Any, default: bool) -> bool: if raw_value is None: return default if isinstance(raw_value, bool): return raw_value text = str(raw_value).strip().lower() if text in {"1", "true", "yes", "y", "on"}: return True if text in {"0", "false", "no", "n", "off"}: return False return default def _request_json( method: str, url: str, authorization: str | None = None, *, json_payload: dict | None = None, ) -> Any: headers: dict[str, str] = {} token_text = str(authorization or "").strip() if token_text: headers["Authorization"] = token_text response = requests.request( method=method, url=url, headers=headers, json=json_payload, timeout=_request_timeout_seconds(), ) try: payload = response.json() except ValueError as exc: text_preview = response.text[:300] raise ValueError( "upstream returned non-JSON response, " f"status={response.status_code}, body={text_preview}" ) from exc if response.status_code >= 400: raise ValueError(f"upstream HTTP {response.status_code}: {payload}") return payload def _project_token_config_key(project_key: str) -> str: return f"{PROJECT_TOKEN_CONFIG_KEY_PREFIX}{project_key}" def _load_projects_config() -> list[dict[str, Any]]: raw_value = read_sys_config_value(PROJECTS_CONFIG_KEY) if raw_value is None: raise ValueError("missing sys_config key: mcp_project_data_projects") if not isinstance(raw_value, list): raise ValueError("sys_config mcp_project_data_projects must be a JSON array") normalized: list[dict[str, Any]] = [] latest_by_key: dict[str, dict[str, Any]] = {} for item in raw_value: if not isinstance(item, dict): continue project_key = _normalize_project_key(item.get("project_key")) project_name = str(item.get("project_name") or project_key).strip() or project_key base_url = str(item.get("base_url") or "").strip().rstrip("/") username = str(item.get("username") or "").strip() password = str(item.get("password") or "").strip() enabled = _coerce_bool(item.get("enabled"), True) if not base_url: raise ValueError(f"project '{project_key}' missing base_url") latest_by_key[project_key] = { "project_key": project_key, "project_name": project_name, "base_url": base_url, "username": username, "password": password, "enabled": enabled, } normalized.extend(latest_by_key.values()) if not normalized: raise ValueError("mcp_project_data_projects has no valid project entries") return normalized def load_projects_config() -> list[dict[str, Any]]: return _load_projects_config() def find_project_config(project_key: str) -> dict[str, Any]: expected = _normalize_project_key(project_key) for item in _load_projects_config(): if item["project_key"] != expected: continue if not item["enabled"]: raise ValueError(f"project '{expected}' is disabled") if not item["username"]: raise ValueError(f"project '{expected}' missing username") if not item["password"]: raise ValueError(f"project '{expected}' missing password") return item raise ValueError(f"project_key not found: {expected}") def _read_project_token_cache(project_key: str) -> dict[str, Any] | None: raw_value = read_sys_config_value(_project_token_config_key(project_key)) if not isinstance(raw_value, dict): return None return raw_value def _write_project_token_cache(project_key: str, *, auth_token: str, expire_at: int) -> None: write_sys_config_value( _project_token_config_key(project_key), { "auth_token": auth_token, "expire_at": expire_at, "updated_at": _to_iso_utc(time.time()), }, ) def _login_project(project_cfg: dict[str, Any]) -> tuple[str, int]: payload = _request_json( "POST", f"{project_cfg['base_url']}{PROJECT_AUTH_LOGIN_PATH}", authorization=None, json_payload={ "username": project_cfg["username"], "password": project_cfg["password"], }, ) if not isinstance(payload, dict): raise ValueError("login failed: invalid response payload") errcode = payload.get("errcode") if str(errcode) not in {"0", "0.0"}: message = str(payload.get("msg") or payload.get("message") or "").strip() raise ValueError(f"login failed: {message or payload}") auth_token = str(payload.get("token") or "").strip() if not auth_token: raise ValueError("login failed: missing token") expire_raw = payload.get("token_expire_time") if expire_raw is None: expire_at = int(time.time()) + 3600 else: expire_at = _safe_int(expire_raw, "token_expire_time") if expire_at <= int(time.time()): expire_at = int(time.time()) + 3600 return auth_token, expire_at def resolve_project_token(project_cfg: dict[str, Any]) -> str: project_key = project_cfg["project_key"] cached = _read_project_token_cache(project_key) if isinstance(cached, dict): token_text = str(cached.get("auth_token") or "").strip() expire_raw = cached.get("expire_at") if token_text and expire_raw is not None: try: expire_at = int(str(expire_raw).strip()) if int(time.time()) < expire_at: return token_text except (TypeError, ValueError): pass auth_token, expire_at = _login_project(project_cfg) _write_project_token_cache(project_key, auth_token=auth_token, expire_at=expire_at) return auth_token