| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- from datetime import datetime
- import time
- try:
- from workflowlib import requests
- except:
- import requests
- import os
- import json
- import numpy as np
- import pandas as pd
- class PointReader:
-
- def __init__(
- self,
- point_ids: list,
- dt_begin : datetime,
- dt_end : datetime,
- url : str
- ) -> None:
-
- self.point_ids = point_ids
- self.dt_begin = dt_begin
- self.dt_end = dt_end
- self.url = url
-
- def read_current(self):
- post_data = {'point_ids':self.point_ids}
- try:
- res = requests.post(url=self.url,data=json.dumps(post_data)).json()
- res_state = res['state']
- except:
- print('url',self.url)
- print('res',res)
-
- if res_state != 0:
- print('post_data',post_data)
-
- if len(res['data']) != 0:
- point_id = [_['point_id'] for _ in res['data']]
- point_value = [_['value'] for _ in res['data']]
- point_ts = [_['ts'] for _ in res['data']]
-
- data = pd.DataFrame(
- data = [point_value],
- columns = point_id,
- index = [point_ts[0]]
- )
- data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
- data = data.tz_localize(tz=None)
- data.index = data.index.floor('min')
- else:
- data = pd.DataFrame(
- data = np.zeros(shape=[1,len(self.point_ids)]) * np.nan,
- columns = self.point_ids,
- index = [datetime.now()]
- )
- data.index = data.index.floor('min')
-
- # 当点位不存在时(包括没入库),此时接口返回的res中,不包含该point_id对应的数据
- # 因此用所有id进行reindex
- data = data.reindex(columns=self.point_ids)
-
- return data
-
-
- def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
-
- dt_begin = self.dt_begin if dt_begin is None else dt_begin
- dt_begin = dt_begin.replace(second=0,microsecond=0)
- dt_end = self.dt_end if dt_end is None else dt_end
- dt_end = dt_end.replace(second=0,microsecond=0)
-
- if dt_begin > dt_end:
- raise Exception('开始时间晚于起始时间')
-
- ts_begin = round(dt_begin.timestamp())
- ts_end = round(dt_end.timestamp())
-
- post_data = {
- "point_ids": self.point_ids,
- "begin" : ts_begin,
- "end" : ts_end,
- "interval" : 1,
- "type" : 3,
- }
-
- try:
- res = requests.post(url=self.url, data=json.dumps(post_data))
- time.sleep(0.1)
- res_state = res.json()['state']
- except:
- print('post_data',post_data)
- print('url',self.url)
- print('res',res)
- raise Exception(res.json())
-
- if res_state != 0:
- print('post_data',post_data)
- raise Exception(res.json())
-
- point_df = []
- for point_info in res.json()['data']:
- point_id = point_info['point_id']
- point_data = point_info['data']
-
- if (point_data is None) or (len(point_data) == 0):
- point_df_index = pd.Index(
- data = pd.date_range(start=dt_begin, end=dt_end, freq='1min').to_pydatetime().tolist(),
- name = 'ts'
- )
- df = pd.DataFrame({point_id:np.nan},index=point_df_index)
- else:
- df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
- df.index = pd.to_datetime(df.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
- df = df.tz_localize(tz=None)
-
- point_df.append(df)
- check_df_missing(point_df,post_data,res)
- data = pd.concat(point_df,axis=1).reindex(self.point_ids,axis=1)
-
- return data
- def read_interval(self) -> pd.DataFrame:
- interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
- interval += [self.dt_end]
-
- data = []
- for idx in range(len(interval)):
- if idx == len(interval)-1:
- continue
- start = interval[idx]
- end = interval[idx+1]
- finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
- print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
-
- data.append( self._read(dt_begin=start,dt_end=end) )
- data = pd.concat(data,axis=0)
- data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
-
- return data
-
- def read_int_interval(self,freq='H') -> pd.DataFrame:
-
- if freq == 'H':
- start = self.dt_begin.replace(minute=0)
- end = self.dt_end.replace(minute=0)
- elif freq == 'D':
- start = self.dt_begin.replace(hour=0,minute=0)
- end = self.dt_end.replace(hour=0,minute=0)
-
- start = start.replace(second=0,microsecond=0)
- end = end.replace(second=0,microsecond=0)
- int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
-
- if len(int_interval) == 0:
- raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
-
- # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
- data = []
- for idx,dt in enumerate(int_interval):
- data.append(self._read(dt_begin=dt,dt_end=dt))
- finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
- print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
-
- data = pd.concat(data,axis=0)
- data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
-
- return data
-
- def check_df_missing(point_df,post_data,res) -> None:
- is_df_nan = [df.isna().all().all() for df in point_df]
- is_all_df_nan = all(is_df_nan)
- is_any_df_nan = any(is_df_nan)
- if is_all_df_nan or is_any_df_nan:
- print('post_data',post_data)
- # print('res.json',res.json())
- if is_all_df_nan:
- print('【所有点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
- if is_any_df_nan:
- print('【部分点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
- class PointWriter:
-
- def __init__(self) -> None:
- self.url = f'{os.environ.get("DATA_UPLOAD_URL")}ai/addpointdatum'
-
- def ai_add_point_data(
- self,
- point_id: str,
- ts : list,
- value : list
- ):
-
- if len(point_id) == 0:
- print('无数据写入')
- return
-
- data = []
- for ts_i,value_i in zip(ts,value):
- ts_i = int(ts_i.timestamp())
- value_i = str(value_i)
- data.append({'ts':ts_i,'value':value_i})
-
- post_data = [{'point_id':point_id,'data':data}]
- post_data = json.dumps(post_data)
-
- resp = requests.post(
- url = self.url,
- headers = {'Content-Type': 'application/json; charset=UTF-8'},
- data = post_data
- )
-
- result = resp.json()
- state = result['state']
- if state == 0:
- print(f"\nput {point_id} data success!")
- else:
- print(result)
- print(post_data)
- raise Exception('ai_add_point_data failed')
-
- return result
|