from datetime import datetime import time try: from workflowlib import requests except: import requests import os import json import numpy as np import pandas as pd class PointReader: def __init__( self, point_ids: list, dt_begin : datetime, dt_end : datetime, url : str ) -> None: self.point_ids = point_ids self.dt_begin = dt_begin self.dt_end = dt_end self.url = url def read_current(self): post_data = {'point_ids':self.point_ids} try: res = requests.post(url=self.url,data=json.dumps(post_data)).json() res_state = res['state'] except: print('url',self.url) print('res',res) if res_state != 0: print('post_data',post_data) if len(res['data']) != 0: point_id = [_['point_id'] for _ in res['data']] point_value = [_['value'] for _ in res['data']] point_ts = [_['ts'] for _ in res['data']] data = pd.DataFrame( data = [point_value], columns = point_id, index = [point_ts[0]] ) data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai') data = data.tz_localize(tz=None) data.index = data.index.floor('min') else: data = pd.DataFrame( data = np.zeros(shape=[1,len(self.point_ids)]) * np.nan, columns = self.point_ids, index = [datetime.now()] ) data.index = data.index.floor('min') # 当点位不存在时(包括没入库),此时接口返回的res中,不包含该point_id对应的数据 # 因此用所有id进行reindex data = data.reindex(columns=self.point_ids) return data def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame: dt_begin = self.dt_begin if dt_begin is None else dt_begin dt_begin = dt_begin.replace(second=0,microsecond=0) dt_end = self.dt_end if dt_end is None else dt_end dt_end = dt_end.replace(second=0,microsecond=0) if dt_begin > dt_end: raise Exception('开始时间晚于起始时间') ts_begin = round(dt_begin.timestamp()) ts_end = round(dt_end.timestamp()) post_data = { "point_ids": self.point_ids, "begin" : ts_begin, "end" : ts_end, "interval" : 1, "type" : 3, } try: res = requests.post(url=self.url, data=json.dumps(post_data)) time.sleep(0.1) res_state = res.json()['state'] except: print('post_data',post_data) print('url',self.url) print('res',res) raise Exception(res.json()) if res_state != 0: print('post_data',post_data) raise Exception(res.json()) point_df = [] for point_info in res.json()['data']: point_id = point_info['point_id'] point_data = point_info['data'] if (point_data is None) or (len(point_data) == 0): point_df_index = pd.Index( data = pd.date_range(start=dt_begin, end=dt_end, freq='1min').to_pydatetime().tolist(), name = 'ts' ) df = pd.DataFrame({point_id:np.nan},index=point_df_index) else: df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts') df.index = pd.to_datetime(df.index,unit='s',utc=True).tz_convert('Asia/Shanghai') df = df.tz_localize(tz=None) point_df.append(df) check_df_missing(point_df,post_data,res) data = pd.concat(point_df,axis=1).reindex(self.point_ids,axis=1) return data def read_interval(self) -> pd.DataFrame: interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist() interval += [self.dt_end] data = [] for idx in range(len(interval)): if idx == len(interval)-1: continue start = interval[idx] end = interval[idx+1] finish_pct = round((idx+1)/(len(interval)-1) * 100,2) print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}') data.append( self._read(dt_begin=start,dt_end=end) ) data = pd.concat(data,axis=0) data = data.loc[~data.index.duplicated(keep='last'),:].sort_index() return data def read_int_interval(self,freq='H') -> pd.DataFrame: if freq == 'H': start = self.dt_begin.replace(minute=0) end = self.dt_end.replace(minute=0) elif freq == 'D': start = self.dt_begin.replace(hour=0,minute=0) end = self.dt_end.replace(hour=0,minute=0) start = start.replace(second=0,microsecond=0) end = end.replace(second=0,microsecond=0) int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime() if len(int_interval) == 0: raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})') # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval] data = [] for idx,dt in enumerate(int_interval): data.append(self._read(dt_begin=dt,dt_end=dt)) finish_pct = round((idx+1)/(len(int_interval)) * 100,2) print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}') data = pd.concat(data,axis=0) data = data.loc[~data.index.duplicated(keep='last'),:].sort_index() return data def check_df_missing(point_df,post_data,res) -> None: is_df_nan = [df.isna().all().all() for df in point_df] is_all_df_nan = all(is_df_nan) is_any_df_nan = any(is_df_nan) if is_all_df_nan or is_any_df_nan: print('post_data',post_data) # print('res.json',res.json()) if is_all_df_nan: print('【所有点位】的【所有时间段】数据均未获取到, 需检查点位或接口') if is_any_df_nan: print('【部分点位】的【所有时间段】数据均未获取到, 需检查点位或接口') class PointWriter: def __init__(self) -> None: self.url = f'{os.environ.get("DATA_UPLOAD_URL")}ai/addpointdatum' def ai_add_point_data( self, point_id: str, ts : list, value : list ): if len(point_id) == 0: print('无数据写入') return data = [] for ts_i,value_i in zip(ts,value): ts_i = int(ts_i.timestamp()) value_i = str(value_i) data.append({'ts':ts_i,'value':value_i}) post_data = [{'point_id':point_id,'data':data}] post_data = json.dumps(post_data) resp = requests.post( url = self.url, headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = post_data ) result = resp.json() state = result['state'] if state == 0: print(f"\nput {point_id} data success!") else: print(result) print(post_data) raise Exception('ai_add_point_data failed') return result