point_reader.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from datetime import datetime
  2. import time
  3. try:
  4. from workflowlib import requests
  5. except:
  6. import requests
  7. import json
  8. import numpy as np
  9. import pandas as pd
  10. class PointReader:
  11. def __init__(
  12. self,
  13. point_ids: list,
  14. dt_begin : datetime,
  15. dt_end : datetime,
  16. url : str
  17. ) -> None:
  18. self.point_ids = point_ids
  19. self.dt_begin = dt_begin
  20. self.dt_end = dt_end
  21. self.url = url
  22. def read_current(self):
  23. post_data = {'point_ids':self.point_ids}
  24. try:
  25. res = requests.post(url=self.url,data=json.dumps(post_data)).json()
  26. res_state = res['state']
  27. except:
  28. print('url',self.url)
  29. print('res',res)
  30. if res_state != 0:
  31. print('post_data',post_data)
  32. if len(res['data']) != 0:
  33. point_id = [_['point_id'] for _ in res['data']]
  34. point_value = [_['value'] for _ in res['data']]
  35. point_ts = [_['ts'] for _ in res['data']]
  36. data = pd.DataFrame(
  37. data = [point_value],
  38. columns = point_id,
  39. index = [point_ts[0]]
  40. )
  41. data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
  42. data = data.tz_localize(tz=None)
  43. data.index = data.index.floor('min')
  44. else:
  45. data = pd.DataFrame(
  46. data = np.zeros(shape=[1,len(self.point_ids)]) * np.nan,
  47. columns = self.point_ids,
  48. index = [datetime.now()]
  49. )
  50. data.index = data.index.floor('min')
  51. # 当点位不存在时(包括没入库),此时接口返回的res中,不包含该point_id对应的数据
  52. # 因此用所有id进行reindex
  53. data = data.reindex(columns=self.point_ids)
  54. return data
  55. def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
  56. dt_begin = self.dt_begin if dt_begin is None else dt_begin
  57. dt_begin = dt_begin.replace(second=0,microsecond=0)
  58. dt_end = self.dt_end if dt_end is None else dt_end
  59. dt_end = dt_end.replace(second=0,microsecond=0)
  60. if dt_begin > dt_end:
  61. raise Exception('开始时间晚于起始时间')
  62. ts_begin = round(dt_begin.timestamp())
  63. ts_end = round(dt_end.timestamp())
  64. post_data = {
  65. "point_ids": self.point_ids,
  66. "begin" : ts_begin,
  67. "end" : ts_end,
  68. "interval" : 1,
  69. "type" : 3,
  70. }
  71. try:
  72. res = requests.post(url=self.url, data=json.dumps(post_data))
  73. time.sleep(0.1)
  74. res_state = res.json()['state']
  75. except:
  76. print('post_data',post_data)
  77. print('url',self.url)
  78. print('res',res)
  79. raise Exception(res.json())
  80. if res_state != 0:
  81. print('post_data',post_data)
  82. raise Exception(res.json())
  83. point_df = []
  84. for point_info in res.json()['data']:
  85. point_id = point_info['point_id']
  86. point_data = point_info['data']
  87. if (point_data is None) or (len(point_data) == 0):
  88. point_df_index = pd.Index(
  89. data = pd.date_range(start=dt_begin, end=dt_end, freq='1min').to_pydatetime().tolist(),
  90. name = 'ts'
  91. )
  92. df = pd.DataFrame({point_id:np.nan},index=point_df_index)
  93. else:
  94. df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
  95. df.index = pd.to_datetime(df.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
  96. df = df.tz_localize(tz=None)
  97. point_df.append(df)
  98. check_df_missing(point_df,post_data,res)
  99. data = pd.concat(point_df,axis=1).reindex(self.point_ids,axis=1)
  100. return data
  101. def read_interval(self) -> pd.DataFrame:
  102. interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
  103. interval += [self.dt_end]
  104. data = []
  105. for idx in range(len(interval)):
  106. if idx == len(interval)-1:
  107. continue
  108. start = interval[idx]
  109. end = interval[idx+1]
  110. finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
  111. print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
  112. data.append( self._read(dt_begin=start,dt_end=end) )
  113. data = pd.concat(data,axis=0)
  114. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  115. return data
  116. def read_int_interval(self,freq='H') -> pd.DataFrame:
  117. if freq == 'H':
  118. start = self.dt_begin.replace(minute=0)
  119. end = self.dt_end.replace(minute=0)
  120. elif freq == 'D':
  121. start = self.dt_begin.replace(hour=0,minute=0)
  122. end = self.dt_end.replace(hour=0,minute=0)
  123. start = start.replace(second=0,microsecond=0)
  124. end = end.replace(second=0,microsecond=0)
  125. int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
  126. if len(int_interval) == 0:
  127. raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
  128. # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
  129. data = []
  130. for idx,dt in enumerate(int_interval):
  131. data.append(self._read(dt_begin=dt,dt_end=dt))
  132. finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
  133. print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
  134. data = pd.concat(data,axis=0)
  135. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  136. return data
  137. def check_df_missing(point_df,post_data,res) -> None:
  138. is_df_nan = [df.isna().all().all() for df in point_df]
  139. is_all_df_nan = all(is_df_nan)
  140. is_any_df_nan = any(is_df_nan)
  141. if is_all_df_nan or is_any_df_nan:
  142. print('post_data',post_data)
  143. # print('res.json',res.json())
  144. if is_all_df_nan:
  145. print('【所有点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
  146. if is_any_df_nan:
  147. print('【部分点位】的【所有时间段】数据均未获取到, 需检查点位或接口')