| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- from datetime import datetime
- import time
- try:
- from workflowlib import requests
- except:
- import requests
- import json
- import pandas as pd
- class PointReader:
-
- def __init__(
- self,
- point_ids: list,
- dt_begin : datetime,
- dt_end : datetime,
- url : str
- ) -> None:
-
- self.point_ids = point_ids
- self.dt_begin = dt_begin
- self.dt_end = dt_end
- self.url = url
-
- def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
-
- dt_begin = self.dt_begin if dt_begin is None else dt_begin
- dt_begin = dt_begin.replace(second=0,microsecond=0)
- dt_end = self.dt_end if dt_end is None else dt_end
- dt_end = dt_end.replace(second=0,microsecond=0)
-
- if dt_begin > dt_end:
- raise Exception('开始时间晚于起始时间')
-
- ts_begin = round(dt_begin.timestamp())
- ts_end = round(dt_end.timestamp())
-
- post_data = {
- "point_ids": self.point_ids,
- "begin" : ts_begin,
- "end" : ts_end,
- "interval" : 1,
- "type" : 3,
- }
-
- try:
- res = requests.post(url=self.url, data=json.dumps(post_data))
- time.sleep(0.1)
- res_state = res.json()['state']
- except:
- print('post_data',post_data)
- print('url',self.url)
- print('res',res)
- print('json',res.json())
- raise Exception(res.json())
-
- if res_state != 0:
- print('post_data',post_data)
- raise Exception(res.json())
-
- point_df = []
- for point_info in res.json()['data']:
- point_id = point_info['point_id']
- point_data = point_info['data']
-
- if (point_data is None) or (len(point_data) == 0):
- print('post_data',post_data)
- print('res.json',res.json())
- print('未获取到对应的点位数据, 检查时间是否正确')
- continue
-
- df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
- point_df.append(df)
- if len(point_df) == 0:
- print('post_data',post_data)
- print('res.json',res.json())
- print('所有点位的数据均未获取到, 需检查点位或接口')
- return None
-
- data = pd.concat(point_df,axis=1)
- data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
- data = data.tz_localize(tz=None)
-
- return data
- def read_interval(self) -> pd.DataFrame:
- interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
- interval += [self.dt_end]
-
- data = []
- for idx in range(len(interval)):
- if idx == len(interval)-1:
- continue
- start = interval[idx]
- end = interval[idx+1]
- finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
- print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
-
- data.append( self._read(dt_begin=start,dt_end=end) )
- data = pd.concat(data,axis=0)
- data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
-
- return data
-
- def read_int_interval(self,freq='H') -> pd.DataFrame:
-
- if freq == 'H':
- start = self.dt_begin.replace(minute=0)
- end = self.dt_end.replace(minute=0)
- elif freq == 'D':
- start = self.dt_begin.replace(hour=0,minute=0)
- end = self.dt_end.replace(hour=0,minute=0)
-
- start = start.replace(second=0,microsecond=0)
- end = end.replace(second=0,microsecond=0)
- int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
-
- if len(int_interval) == 0:
- raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
-
- # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
- data = []
- for idx,dt in enumerate(int_interval):
- data.append(self._read(dt_begin=dt,dt_end=dt))
- finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
- print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
-
- data = pd.concat(data,axis=0)
- data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
-
- return data
|