| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- from __future__ import annotations
- import os
- from typing import Any
- import requests
- def request_timeout_seconds() -> int:
- try:
- return max(1, int(str(os.getenv("UPSTREAM_REQUEST_TIMEOUT", "60")).strip()))
- except (TypeError, ValueError):
- return 60
- def request_json(
- method: str,
- url: str,
- authorization: str | None = None,
- *,
- json_payload: dict[str, Any] | None = None,
- ) -> Any:
- headers: dict[str, str] = {}
- token = str(authorization or "").strip()
- if token:
- headers["Authorization"] = token
- response = requests.request(
- method=method,
- url=url,
- headers=headers,
- json=json_payload,
- timeout=request_timeout_seconds(),
- )
- try:
- payload = response.json()
- except ValueError as exc:
- preview = response.text[:300]
- raise ValueError(
- "upstream returned non-JSON response, "
- f"status={response.status_code}, body={preview}"
- ) from exc
- if response.status_code >= 400:
- raise ValueError(f"upstream HTTP {response.status_code}: {payload}")
- return payload
|