config_reader.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. from typing import Union
  2. import pandas as pd
  3. class ConfigReader:
  4. def __init__(self,path):
  5. self.path = path
  6. self.equp = pd.read_excel(self.path,sheet_name='设备')
  7. self.point = pd.read_excel(self.path,sheet_name='点位')
  8. @property
  9. def all_equp_names(self):
  10. return self.equp.loc[:,'编号'].to_list()
  11. def get_equp_info(self,equp_name) -> dict:
  12. self.check_equp_is_exist(equp_name)
  13. equp_info = self.equp.loc[self.equp.loc[:,'编号']==equp_name,'名称':].to_dict(orient='list')
  14. equp_info = {k:v[0] for k,v in equp_info.items()}
  15. return equp_info
  16. def get_equp_point(self,equp_name,equp_class:Union[str,list]=None) -> dict:
  17. # 点位类型
  18. # A: 模型输入
  19. # B: 模型输出
  20. self.check_equp_is_exist(equp_name)
  21. equp_class = self.point.类型.to_list() if equp_class is None else equp_class
  22. equp_class = [equp_class] if isinstance(equp_class,str) else equp_class
  23. point_info = self.point.loc[self.point.类型.isin(equp_class)]
  24. point_map = dict(zip(point_info.编号.to_list(),point_info.点位.to_list()))
  25. # 根据配置情况更新全局点位
  26. if equp_name in self.point.columns:
  27. for point_name in point_map.keys():
  28. new_point_id = self.point.loc[self.point.编号==point_name,equp_name].iat[0]
  29. if isinstance(new_point_id,str):
  30. point_map[point_name] = new_point_id
  31. return point_map
  32. def check_equp_is_exist(self,equp_name):
  33. if equp_name not in self.all_equp_names:
  34. raise Exception(f"设备{equp_name}不存在")