point_reader.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. from datetime import datetime
  2. import time
  3. try:
  4. from workflowlib import requests
  5. except:
  6. import requests
  7. import json
  8. import pandas as pd
  9. class PointReader:
  10. def __init__(
  11. self,
  12. point_ids: list,
  13. dt_begin : datetime,
  14. dt_end : datetime,
  15. url : str
  16. ) -> None:
  17. self.point_ids = point_ids
  18. self.dt_begin = dt_begin
  19. self.dt_end = dt_end
  20. self.url = url
  21. def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
  22. dt_begin = self.dt_begin if dt_begin is None else dt_begin
  23. dt_begin = dt_begin.replace(second=0,microsecond=0)
  24. dt_end = self.dt_end if dt_end is None else dt_end
  25. dt_end = dt_end.replace(second=0,microsecond=0)
  26. if dt_begin > dt_end:
  27. raise Exception('开始时间晚于起始时间')
  28. ts_begin = round(dt_begin.timestamp())
  29. ts_end = round(dt_end.timestamp())
  30. post_data = {
  31. "point_ids": self.point_ids,
  32. "begin" : ts_begin,
  33. "end" : ts_end,
  34. "interval" : 1,
  35. "type" : 3,
  36. }
  37. try:
  38. res = requests.post(url=self.url, data=json.dumps(post_data))
  39. time.sleep(0.1)
  40. res_state = res.json()['state']
  41. except:
  42. print('post_data',post_data)
  43. print('url',self.url)
  44. print('res',res)
  45. print('json',res.json())
  46. raise Exception(res.json())
  47. if res_state != 0:
  48. print('post_data',post_data)
  49. raise Exception(res.json())
  50. point_df = []
  51. for point_info in res.json()['data']:
  52. point_id = point_info['point_id']
  53. point_data = point_info['data']
  54. if (point_data is None) or (len(point_data) == 0):
  55. print('post_data',post_data)
  56. print('res.json',res.json())
  57. print('未获取到对应的点位数据, 检查时间是否正确')
  58. continue
  59. df = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
  60. point_df.append(df)
  61. if len(point_df) == 0:
  62. print('post_data',post_data)
  63. print('res.json',res.json())
  64. print('所有点位的数据均未获取到, 需检查点位或接口')
  65. return None
  66. data = pd.concat(point_df,axis=1)
  67. data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
  68. data = data.tz_localize(tz=None)
  69. return data
  70. def read_interval(self) -> pd.DataFrame:
  71. interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
  72. interval += [self.dt_end]
  73. data = []
  74. for idx in range(len(interval)):
  75. if idx == len(interval)-1:
  76. continue
  77. start = interval[idx]
  78. end = interval[idx+1]
  79. finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
  80. print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
  81. data.append( self._read(dt_begin=start,dt_end=end) )
  82. data = pd.concat(data,axis=0)
  83. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  84. return data
  85. def read_int_interval(self,freq='H') -> pd.DataFrame:
  86. if freq == 'H':
  87. start = self.dt_begin.replace(minute=0)
  88. end = self.dt_end.replace(minute=0)
  89. elif freq == 'D':
  90. start = self.dt_begin.replace(hour=0,minute=0)
  91. end = self.dt_end.replace(hour=0,minute=0)
  92. start = start.replace(second=0,microsecond=0)
  93. end = end.replace(second=0,microsecond=0)
  94. int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
  95. if len(int_interval) == 0:
  96. raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
  97. # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
  98. data = []
  99. for idx,dt in enumerate(int_interval):
  100. data.append(self._read(dt_begin=dt,dt_end=dt))
  101. finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
  102. print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
  103. data = pd.concat(data,axis=0)
  104. data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
  105. return data