point_io.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. from datetime import datetime
  2. import time
  3. try:
  4. from workflowlib import requests
  5. except:
  6. import requests
  7. import os
  8. import json
  9. import numpy as np
  10. import pandas as pd
  11. class PointReader:
  12. def __init__(
  13. self,
  14. point_ids: list,
  15. dt_begin : datetime,
  16. dt_end : datetime,
  17. url : str
  18. ) -> None:
  19. self.point_ids = point_ids
  20. self.dt_begin = dt_begin
  21. self.dt_end = dt_end
  22. self.url = url
  23. def read_current(self):
  24. post_data = {'point_ids':self.point_ids}
  25. try:
  26. res = requests.post(url=self.url,data=json.dumps(post_data)).json()
  27. res_state = res['state']
  28. except:
  29. print('url',self.url)
  30. print('res',res)
  31. if res_state != 0:
  32. print('post_data',post_data)
  33. if len(res['data']) != 0:
  34. point_id = [_['point_id'] for _ in res['data']]
  35. point_value = [_['value'] for _ in res['data']]
  36. point_ts = [_['ts'] for _ in res['data']]
  37. data = pd.DataFrame(
  38. data = [point_value],
  39. columns = point_id,
  40. index = [point_ts[0]]
  41. )
  42. data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
  43. data = data.tz_localize(tz=None)
  44. data.index = data.index.floor('min')
  45. else:
  46. data = pd.DataFrame(
  47. data = np.zeros(shape=[1,len(self.point_ids)]) * np.nan,
  48. columns = self.point_ids,
  49. index = [datetime.now()]
  50. )
  51. data.index = data.index.floor('min')
  52. # 当点位不存在时(包括没入库),此时接口返回的res中,不包含该point_id对应的数据
  53. # 因此用所有id进行reindex
  54. data = data.reindex(columns=self.point_ids)
  55. return data
  56. def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
  57. dt_begin = self.dt_begin if dt_begin is None else dt_begin
  58. dt_begin = dt_begin.replace(second=0,microsecond=0)
  59. dt_end = self.dt_end if dt_end is None else dt_end
  60. dt_end = dt_end.replace(second=0,microsecond=0)
  61. if dt_begin > dt_end:
  62. raise Exception('开始时间晚于起始时间')
  63. ts_begin = round(dt_begin.timestamp())
  64. ts_end = round(dt_end.timestamp())
  65. post_data = {
  66. "point_ids": self.point_ids,
  67. "begin" : ts_begin,
  68. "end" : ts_end,
  69. "interval" : 1,
  70. "type" : 3,
  71. }
  72. try:
  73. res = requests.post(url=self.url, data=json.dumps(post_data))
  74. time.sleep(0.1)
  75. res_state = res.json()['state']
  76. except:
  77. print('post_data',post_data)
  78. print('url',self.url)
  79. print('res',res)
  80. raise Exception(res.json())
  81. if res_state != 0:
  82. print('post_data',post_data)
  83. raise Exception(res.json())
  84. point_df = []
  85. for point_info in res.json()['data']:
  86. point_id = point_info['point_id']
  87. point_data = point_info['data']
  88. if (point_data is None) or (len(point_data) == 0):
  89. point_df_index = pd.Index(
  90. data = pd.date_range(start=dt_begin, end=dt_end, freq='1min').to_pydatetime().tolist(),
  91. name = 'ts'
  92. )
  93. df = pd.DataFrame({point_id:np.nan},index=point_df_index)
  94. else:
  95. df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
  96. df.index = pd.to_datetime(df.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
  97. df = df.tz_localize(tz=None)
  98. point_df.append(df)
  99. check_df_missing(point_df,post_data,res)
  100. data = pd.concat(point_df,axis=1).reindex(self.point_ids,axis=1)
  101. return data
  102. def read_interval(self) -> pd.DataFrame:
  103. interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
  104. interval += [self.dt_end]
  105. data = []
  106. for idx in range(len(interval)):
  107. if idx == len(interval)-1:
  108. continue
  109. start = interval[idx]
  110. end = interval[idx+1]
  111. finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
  112. print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
  113. data.append( self._read(dt_begin=start,dt_end=end) )
  114. data = pd.concat(data,axis=0)
  115. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  116. return data
  117. def read_int_interval(self,freq='H') -> pd.DataFrame:
  118. if freq == 'H':
  119. start = self.dt_begin.replace(minute=0)
  120. end = self.dt_end.replace(minute=0)
  121. elif freq == 'D':
  122. start = self.dt_begin.replace(hour=0,minute=0)
  123. end = self.dt_end.replace(hour=0,minute=0)
  124. start = start.replace(second=0,microsecond=0)
  125. end = end.replace(second=0,microsecond=0)
  126. int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
  127. if len(int_interval) == 0:
  128. raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
  129. # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
  130. data = []
  131. for idx,dt in enumerate(int_interval):
  132. data.append(self._read(dt_begin=dt,dt_end=dt))
  133. finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
  134. print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
  135. data = pd.concat(data,axis=0)
  136. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  137. return data
  138. def check_df_missing(point_df,post_data,res) -> None:
  139. is_df_nan = [df.isna().all().all() for df in point_df]
  140. is_all_df_nan = all(is_df_nan)
  141. is_any_df_nan = any(is_df_nan)
  142. if is_all_df_nan or is_any_df_nan:
  143. print('post_data',post_data)
  144. # print('res.json',res.json())
  145. if is_all_df_nan:
  146. print('【所有点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
  147. if is_any_df_nan:
  148. print('【部分点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
  149. class PointWriter:
  150. def __init__(self) -> None:
  151. self.url = f'{os.environ.get("DATA_UPLOAD_URL")}ai/addpointdatum'
  152. def ai_add_point_data(
  153. self,
  154. point_id: str,
  155. ts : list,
  156. value : list
  157. ):
  158. if len(point_id) == 0:
  159. print('无数据写入')
  160. return
  161. data = []
  162. for ts_i,value_i in zip(ts,value):
  163. ts_i = int(ts_i.timestamp())
  164. value_i = str(value_i)
  165. data.append({'ts':ts_i,'value':value_i})
  166. post_data = [{'point_id':point_id,'data':data}]
  167. post_data = json.dumps(post_data)
  168. resp = requests.post(
  169. url = self.url,
  170. headers = {'Content-Type': 'application/json; charset=UTF-8'},
  171. data = post_data
  172. )
  173. result = resp.json()
  174. state = result['state']
  175. if state == 0:
  176. print(f"\nput {point_id} data success!")
  177. else:
  178. print(result)
  179. print(post_data)
  180. raise Exception('ai_add_point_data failed')
  181. return result