from typing import Union from datetime import datetime import re import numpy as np import pandas as pd class ConfigReader: def __init__(self,path): self.path = path self.equp = pd.read_excel(self.path,sheet_name='设备') self.point = pd.read_excel(self.path,sheet_name='点位') self.app = pd.read_excel(self.path,sheet_name='程序') @property def all_equp_names(self): return self.equp.loc[lambda dt:dt.启用==True].loc[:,'编号'].to_list() @property def all_equp_names_total(self): return self.equp.loc[:,'编号'].to_list() @property def all_equp_names_short(self): return self.equp.loc[lambda dt:dt.启用==True].loc[:,'名称'].to_list() def get_equp_info(self,equp_name,key:str,info_type): self.check_equp_is_exist(equp_name) equp_info = self.equp.loc[self.equp.loc[:,'编号']==equp_name,'名称':].to_dict(orient='list') equp_info = {k:v[0] for k,v in equp_info.items()} info_value = equp_info[key] info_value = convert_info_type(info_value,info_type) return info_value def get_equp_point( self, equp_name : str, equp_class: Union[str,list]=None ) -> dict: equp_type = self.get_equp_info(equp_name,'设备类型',info_type='str') # 点位类型 # A: 模型输入 # B: 模型输出 self.check_equp_is_exist(equp_name) equp_class = self.point.类型.to_list() if equp_class is None else equp_class equp_class = [equp_class] if isinstance(equp_class,str) else equp_class point_info = ( self.point .loc[self.point.类型.isin(equp_class)] .loc[lambda dt:dt.设备类型.str.split(',',expand=False).apply(lambda lst: equp_type in lst)] ) point_map = dict(zip(point_info.编号.to_list(),point_info.点位.to_list())) for point_name,point_id in point_map.items(): if not isinstance(point_id,str) and np.isnan(point_id): raise Exception(f'点位{point_name}的点位ID缺失') # 根据配置情况更新全局点位 all_adjust_point = self.point.loc[:,'点位':].columns.to_list() for each_adj in all_adjust_point: all_equp = re.findall(r'\[(.*?)\]',each_adj) if equp_name not in all_equp: continue for point_name in point_map.keys(): new_point_id = ( point_info .loc[self.point.编号==point_name,each_adj].iat[0] ) if isinstance(new_point_id,str): point_map[point_name] = new_point_id # 根据房间数量重新调整点位 if 'C' in equp_class: num_room = self.get_equp_info(equp_name,'房间数量',info_type='int') for point_name in point_map.copy().keys(): match = re.match(r'room_(\d+)_',point_name) if match: room_id = int(match.group(1)) if room_id > num_room: point_map.pop(point_name) if not self.get_equp_info(equp_name,'有后表冷','bool'): if 'coil_3_Val' in point_map: point_map['coil_3_Val'] = 0 return point_map def get_point_info(self,equp_name,point_name) -> dict: self.check_equp_is_exist(equp_name) equp_type = self.get_equp_info(equp_name,'设备类型',info_type='str') point_info = ( self.point .loc[lambda dt:dt.设备类型.str.split(',',expand=False).apply(lambda lst: equp_type in lst)] .loc[lambda dt:dt.编号==point_name] ) if len(point_info) == 0: raise Exception(f'未找到{equp_name}的{point_name}信息') elif len(point_info) > 1: raise Exception(f'{equp_name}下存在多个{point_name}信息') else: point_info = point_info.to_dict(orient='list') point_info = {k:v[0] for k,v in point_info.items() if k not in self.all_equp_names_total} return point_info def get_app_info( self, equp_name: str, app_type : str, key : str, info_type: str = None ) -> str: value = self.app.loc[lambda dt:(dt.程序类型==app_type) & (dt.项目==key)] if value.shape[0] != 1: raise Exception(f'程序类型({app_type})下不存在唯一的的项目({key})') info_value = value.loc[:,'全局配置'].iat[0] # 更新全局配置 if equp_name in value.columns: equp_value = value.loc[:,equp_name].iat[0] if isinstance(equp_value,(str,float)) and not np.isnan(equp_value): info_value = equp_value # 调整数据类型 info_value = convert_info_type(info_value,info_type) return info_value def check_equp_is_exist(self,equp_name): if equp_name not in self.all_equp_names: raise Exception(f"设备{equp_name}不存在") def convert_info_type(info_value,info_type): if info_type == 'int': info_value = int(info_value) elif info_type == 'float': info_value = float(info_value) elif info_type == 'bool': if np.isnan(info_value): info_value = False else: info_value = bool(info_value) elif info_type == 'datetime': if info_value == 'NOW': info_value = datetime.now() elif not isinstance(info_value,datetime): info_value = datetime.strptime(info_value,'%Y/%m/%d %H:%M:%S') elif info_type == 'str': info_value = str(info_value) elif info_type is None: pass else: raise ValueError(f"{info_type} type error!") return info_value