config_reader.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from typing import Union
  2. from datetime import datetime
  3. import pandas as pd
  4. class ConfigReader:
  5. def __init__(self,path):
  6. self.path = path
  7. self.equp = pd.read_excel(self.path,sheet_name='设备')
  8. self.point = pd.read_excel(self.path,sheet_name='点位')
  9. self.app = pd.read_excel(self.path,sheet_name='程序')
  10. self.meta = pd.read_excel(self.path,sheet_name='元数据')
  11. self.meta = {self.meta.loc[:,'Key'].iat[i]:self.meta.loc[:,'Value'].iat[i] for i in range(len(self.meta))}
  12. @property
  13. def all_equp_names(self):
  14. return self.equp.loc[:,'编号'].to_list()
  15. def get_equp_info(self,equp_name,key:str,info_type):
  16. self.check_equp_is_exist(equp_name)
  17. equp_info = self.equp.loc[self.equp.loc[:,'编号']==equp_name,'名称':].to_dict(orient='list')
  18. equp_info = {k:v[0] for k,v in equp_info.items()}
  19. info_value = equp_info[key]
  20. info_value = convert_info_type(info_value,info_type)
  21. return info_value
  22. def get_equp_point(self,equp_name,equp_class:Union[str,list]=None) -> dict:
  23. # 点位类型
  24. # A: 模型输入
  25. # B: 模型输出
  26. self.check_equp_is_exist(equp_name)
  27. equp_class = self.point.类型.to_list() if equp_class is None else equp_class
  28. equp_class = [equp_class] if isinstance(equp_class,str) else equp_class
  29. point_info = self.point.loc[self.point.类型.isin(equp_class)]
  30. point_map = dict(zip(point_info.编号.to_list(),point_info.点位.to_list()))
  31. # 根据配置情况更新全局点位
  32. if equp_name in self.point.columns:
  33. for point_name in point_map.keys():
  34. new_point_id = self.point.loc[self.point.编号==point_name,equp_name].iat[0]
  35. if isinstance(new_point_id,str):
  36. point_map[point_name] = new_point_id
  37. return point_map
  38. def get_app_info(
  39. self,
  40. equp_name: str,
  41. app_type : str,
  42. key : str,
  43. info_type: str = None
  44. ) -> str:
  45. value = self.app.loc[lambda dt:(dt.程序类型==app_type) & (dt.项目==key)]
  46. if value.shape[0] != 1:
  47. raise Exception(f'程序类型({app_type})下不存在唯一的的项目({key})')
  48. info_value = value.loc[:,'全局配置'].iat[0]
  49. # 更新全局配置
  50. if equp_name in value.columns:
  51. equp_value = value.loc[:,equp_name].iat[0]
  52. if isinstance(equp_value,str):
  53. info_value = equp_value
  54. # 调整数据类型
  55. info_value = convert_info_type(info_value,info_type)
  56. return info_value
  57. def check_equp_is_exist(self,equp_name):
  58. if equp_name not in self.all_equp_names:
  59. raise Exception(f"设备{equp_name}不存在")
  60. def convert_info_type(info_value,info_type):
  61. if info_type == 'int':
  62. info_value = int(info_value)
  63. elif info_type == 'float':
  64. info_value = float(info_value)
  65. elif info_type == 'bool':
  66. info_value = bool(info_value)
  67. elif info_type == 'datetime':
  68. if not isinstance(info_value,datetime):
  69. info_value = datetime.strptime(info_value,'%Y/%m/%d %H:%M:%S')
  70. elif info_type == 'str':
  71. info_value = str(info_value)
  72. elif info_type is None:
  73. pass
  74. else:
  75. raise ValueError(f"{info_type} type error!")
  76. return info_value