http_value_provider.py 749 B

1234567891011121314151617181920212223
  1. """HTTP realtime value provider."""
  2. import requests
  3. from value_provider import ValueProvider
  4. class HttpValueProvider(ValueProvider):
  5. def __init__(self, url: str, timeout_seconds: int):
  6. self.url = url
  7. self.timeout_seconds = timeout_seconds
  8. def fetch_values(self, point_ids: list[str]) -> dict[str, object]:
  9. response = requests.post(
  10. self.url,
  11. json={"point_ids": point_ids},
  12. timeout=self.timeout_seconds,
  13. )
  14. response.raise_for_status()
  15. payload = response.json()
  16. if payload.get("state") != 0:
  17. raise RuntimeError(f"realtime api failed: {payload}")
  18. return {item["point_id"]: item.get("value") for item in payload.get("data", [])}