| 1234567891011121314151617181920212223 |
- """HTTP realtime value provider."""
- import requests
- from value_provider import ValueProvider
- class HttpValueProvider(ValueProvider):
- def __init__(self, url: str, timeout_seconds: int):
- self.url = url
- self.timeout_seconds = timeout_seconds
- def fetch_values(self, point_ids: list[str]) -> dict[str, object]:
- response = requests.post(
- self.url,
- json={"point_ids": point_ids},
- timeout=self.timeout_seconds,
- )
- response.raise_for_status()
- payload = response.json()
- if payload.get("state") != 0:
- raise RuntimeError(f"realtime api failed: {payload}")
- return {item["point_id"]: item.get("value") for item in payload.get("data", [])}
|