http_client.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from __future__ import annotations
  2. import os
  3. from typing import Any
  4. import requests
  5. def request_timeout_seconds() -> int:
  6. try:
  7. return max(1, int(str(os.getenv("UPSTREAM_REQUEST_TIMEOUT", "60")).strip()))
  8. except (TypeError, ValueError):
  9. return 60
  10. def request_json(
  11. method: str,
  12. url: str,
  13. authorization: str | None = None,
  14. *,
  15. json_payload: dict[str, Any] | None = None,
  16. ) -> Any:
  17. headers: dict[str, str] = {}
  18. token = str(authorization or "").strip()
  19. if token:
  20. headers["Authorization"] = token
  21. response = requests.request(
  22. method=method,
  23. url=url,
  24. headers=headers,
  25. json=json_payload,
  26. timeout=request_timeout_seconds(),
  27. )
  28. try:
  29. payload = response.json()
  30. except ValueError as exc:
  31. preview = response.text[:300]
  32. raise ValueError(
  33. "upstream returned non-JSON response, "
  34. f"status={response.status_code}, body={preview}"
  35. ) from exc
  36. if response.status_code >= 400:
  37. raise ValueError(f"upstream HTTP {response.status_code}: {payload}")
  38. return payload