config_reader.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from typing import Union
  2. from datetime import datetime
  3. import re
  4. import numpy as np
  5. import pandas as pd
  6. class ConfigReader:
  7. def __init__(self,path):
  8. self.path = path
  9. self.equp = pd.read_excel(self.path,sheet_name='设备')
  10. self.point = pd.read_excel(self.path,sheet_name='点位')
  11. self.app = pd.read_excel(self.path,sheet_name='程序')
  12. @property
  13. def all_equp_names(self):
  14. return self.equp.loc[lambda dt:dt.启用==True].loc[:,'编号'].to_list()
  15. @property
  16. def all_equp_names_total(self):
  17. return self.equp.loc[:,'编号'].to_list()
  18. @property
  19. def all_equp_names_short(self):
  20. return self.equp.loc[lambda dt:dt.启用==True].loc[:,'名称'].to_list()
  21. def get_equp_info(self,equp_name,key:str,info_type):
  22. self.check_equp_is_exist(equp_name)
  23. equp_info = self.equp.loc[self.equp.loc[:,'编号']==equp_name,'名称':].to_dict(orient='list')
  24. equp_info = {k:v[0] for k,v in equp_info.items()}
  25. info_value = equp_info[key]
  26. info_value = convert_info_type(info_value,info_type)
  27. return info_value
  28. def get_equp_point(
  29. self,
  30. equp_name : str,
  31. equp_class: Union[str,list]=None
  32. ) -> dict:
  33. equp_type = self.get_equp_info(equp_name,'设备类型',info_type='str')
  34. # 点位类型
  35. # A: 模型输入
  36. # B: 模型输出
  37. self.check_equp_is_exist(equp_name)
  38. equp_class = self.point.类型.to_list() if equp_class is None else equp_class
  39. equp_class = [equp_class] if isinstance(equp_class,str) else equp_class
  40. point_info = (
  41. self.point
  42. .loc[self.point.类型.isin(equp_class)]
  43. .loc[lambda dt:dt.设备类型.str.split(',',expand=False).apply(lambda lst: equp_type in lst)]
  44. )
  45. point_map = dict(zip(point_info.编号.to_list(),point_info.点位.to_list()))
  46. for point_name,point_id in point_map.items():
  47. if not isinstance(point_id,str) and np.isnan(point_id):
  48. raise Exception(f'点位{point_name}的点位ID缺失')
  49. # 根据配置情况更新全局点位
  50. all_adjust_point = self.point.loc[:,'点位':].columns.to_list()
  51. for each_adj in all_adjust_point:
  52. all_equp = re.findall(r'\[(.*?)\]',each_adj)
  53. if equp_name not in all_equp:
  54. continue
  55. for point_name in point_map.keys():
  56. new_point_id = (
  57. point_info
  58. .loc[self.point.编号==point_name,each_adj].iat[0]
  59. )
  60. if isinstance(new_point_id,str):
  61. point_map[point_name] = new_point_id
  62. # 根据房间数量重新调整点位
  63. if 'C' in equp_class:
  64. num_room = self.get_equp_info(equp_name,'房间数量',info_type='int')
  65. for point_name in point_map.copy().keys():
  66. match = re.match(r'room_(\d+)_',point_name)
  67. if match:
  68. room_id = int(match.group(1))
  69. if room_id > num_room:
  70. point_map.pop(point_name)
  71. if not self.get_equp_info(equp_name,'有后表冷','bool'):
  72. if 'coil_3_Val' in point_map:
  73. point_map['coil_3_Val'] = 0
  74. return point_map
  75. def get_point_info(self,equp_name,point_name) -> dict:
  76. self.check_equp_is_exist(equp_name)
  77. equp_type = self.get_equp_info(equp_name,'设备类型',info_type='str')
  78. point_info = (
  79. self.point
  80. .loc[lambda dt:dt.设备类型.str.split(',',expand=False).apply(lambda lst: equp_type in lst)]
  81. .loc[lambda dt:dt.编号==point_name]
  82. )
  83. if len(point_info) == 0:
  84. raise Exception(f'未找到{equp_name}的{point_name}信息')
  85. elif len(point_info) > 1:
  86. raise Exception(f'{equp_name}下存在多个{point_name}信息')
  87. else:
  88. point_info = point_info.to_dict(orient='list')
  89. point_info = {k:v[0] for k,v in point_info.items() if k not in self.all_equp_names_total}
  90. return point_info
  91. def get_app_info(
  92. self,
  93. equp_name: str,
  94. app_type : str,
  95. key : str,
  96. info_type: str = None
  97. ) -> str:
  98. value = self.app.loc[lambda dt:(dt.程序类型==app_type) & (dt.项目==key)]
  99. if value.shape[0] != 1:
  100. raise Exception(f'程序类型({app_type})下不存在唯一的的项目({key})')
  101. info_value = value.loc[:,'全局配置'].iat[0]
  102. # 更新全局配置
  103. if equp_name in value.columns:
  104. equp_value = value.loc[:,equp_name].iat[0]
  105. if isinstance(equp_value,(str,float)) and not np.isnan(equp_value):
  106. info_value = equp_value
  107. # 调整数据类型
  108. info_value = convert_info_type(info_value,info_type)
  109. return info_value
  110. def check_equp_is_exist(self,equp_name):
  111. if equp_name not in self.all_equp_names:
  112. raise Exception(f"设备{equp_name}不存在")
  113. def convert_info_type(info_value,info_type):
  114. if info_type == 'int':
  115. info_value = int(info_value)
  116. elif info_type == 'float':
  117. info_value = float(info_value)
  118. elif info_type == 'bool':
  119. if np.isnan(info_value):
  120. info_value = False
  121. else:
  122. info_value = bool(info_value)
  123. elif info_type == 'datetime':
  124. if info_value == 'NOW':
  125. info_value = datetime.now()
  126. elif not isinstance(info_value,datetime):
  127. info_value = datetime.strptime(info_value,'%Y/%m/%d %H:%M:%S')
  128. elif info_type == 'str':
  129. info_value = str(info_value)
  130. elif info_type is None:
  131. pass
  132. else:
  133. raise ValueError(f"{info_type} type error!")
  134. return info_value