from datetime import datetime import time try: from workflowlib import requests except: import requests import json import pandas as pd class PointReader: def __init__( self, point_ids: list, dt_begin : datetime, dt_end : datetime, url : str ) -> None: self.point_ids = point_ids self.dt_begin = dt_begin self.dt_end = dt_end self.url = url def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame: dt_begin = self.dt_begin if dt_begin is None else dt_begin dt_begin = dt_begin.replace(second=0,microsecond=0) dt_end = self.dt_end if dt_end is None else dt_end dt_end = dt_end.replace(second=0,microsecond=0) if dt_begin > dt_end: raise Exception('开始时间晚于起始时间') ts_begin = round(dt_begin.timestamp()) ts_end = round(dt_end.timestamp()) post_data = { "point_ids": self.point_ids, "begin" : ts_begin, "end" : ts_end, "interval" : 1, "type" : 3, } try: res = requests.post(url=self.url, data=json.dumps(post_data)) time.sleep(0.1) res_state = res.json()['state'] except: print('post_data',post_data) print('url',self.url) print('res',res) print('json',res.json()) raise Exception(res.json()) if res_state != 0: print('post_data',post_data) raise Exception(res.json()) point_df = [] for point_info in res.json()['data']: point_id = point_info['point_id'] point_data = point_info['data'] if (point_data is None) or (len(point_data) == 0): print('post_data',post_data) print('res.json',res.json()) print('未获取到对应的点位数据, 检查时间是否正确') continue df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts') point_df.append(df) if len(point_df) == 0: print('post_data',post_data) print('res.json',res.json()) print('所有点位的数据均未获取到, 需检查点位或接口') return None data = pd.concat(point_df,axis=1) data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai') data = data.tz_localize(tz=None) return data def read_interval(self) -> pd.DataFrame: interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist() interval += [self.dt_end] data = [] for idx in range(len(interval)): if idx == len(interval)-1: continue start = interval[idx] end = interval[idx+1] finish_pct = round((idx+1)/(len(interval)-1) * 100,2) print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}') data.append( self._read(dt_begin=start,dt_end=end) ) data = pd.concat(data,axis=0) data = data.loc[~data.index.duplicated(keep='last'),:].sort_index() return data def read_int_interval(self,freq='H') -> pd.DataFrame: if freq == 'H': start = self.dt_begin.replace(minute=0) end = self.dt_end.replace(minute=0) elif freq == 'D': start = self.dt_begin.replace(hour=0,minute=0) end = self.dt_end.replace(hour=0,minute=0) start = start.replace(second=0,microsecond=0) end = end.replace(second=0,microsecond=0) int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime() if len(int_interval) == 0: raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})') # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval] data = [] for idx,dt in enumerate(int_interval): data.append(self._read(dt_begin=dt,dt_end=dt)) finish_pct = round((idx+1)/(len(int_interval)) * 100,2) print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}') data = pd.concat(data,axis=0) data = data.loc[~data.index.duplicated(keep='last'),:].sort_index() return data