import pandas as pd class ConfigInfo: def __init__(self,config) -> None: self.config = config self.n_input = len(self.get_io_id('in')) self.n_output = len(self.get_io_id('out')) def get_io_id(self,io = 'in') -> list: if io == 'in': io_key = '_PORTS_IN' elif io == 'out': io_key = '_PORTS_OUT' else: raise Exception('WRONG io') input_id = [point['point_id'] for point in self.config[io_key]] return input_id def get_io_group_info(self,io='in',type:str='data',data:list=None) -> dict: if io == 'in': io_key = '_PORTS_IN_GROUP' elif io == 'out': io_key = '_PORTS_OUT_GROUP' group_info = {} for group in self.config[io_key]: name = group['name'] start_idx = group['start'] end_idx = group['end'] if type == 'data': # group_info = {'G1':[DF1,DF2], 'G2':[DF3,DF4]} if data is None: raise Exception('当type为data时,必须输入data参数') info = data[start_idx:end_idx] elif type == 'point_id': # group_info = {'G1':[point_1,point_2],'G2':[point_3,point_4]} point_id = self.get_io_id(io=io) info = point_id[start_idx:end_idx] else: raise Exception('WRONG type') group_info[name] = info return group_info def rename_df(self,dfs:list,io='in') -> list: result = [] point_id = self.get_io_id(io=io) if len(dfs) != len(point_id): raise Exception(f'数据长度有误,point_id:{point_id},dfs:{dfs}') for p,df in zip(point_id,dfs): if not isinstance(df,pd.DataFrame) or not isinstance(p,str): result.append(df) else: result.append( df.iloc[:,[0]].set_axis([p],axis=1) ) return result def split_df_by_groupinfo( self, data_map:dict, allow_data_map_is_subset:bool = False, allow_data_map_miss_group:list = None ) -> list: """ 根据组件配置的输出分组和桩,将数据输出 Parameters ---------- data_map : dict 分组名称及对应的数据 {分组1:DataFrame, 分组2:{桩A:Float,桩B:Bool}} Returns ------- list 数据列表 Raises ------ Exception 分组名称超出了限定的范围 Exception 桩名称超出了限定的范围 """ output_groupinfo = self.get_io_group_info('out',type='point_id') split_data = [] for group_name,points in output_groupinfo.items(): data = data_map.get(group_name) if data is None: if isinstance(allow_data_map_miss_group,list) and group_name in allow_data_map_miss_group: continue elif not allow_data_map_is_subset: raise Exception(f'组件输出的分组名称{group_name}有误,分组:{list(data_map.keys())}') else: continue for p in points: if isinstance(data,pd.DataFrame): if p not in data.columns: raise Exception(f'组件输出的桩名称有误,未找到{p},桩:{data.columns.to_list()}') p_data = data.loc[:,[p]] elif isinstance(data,dict): if p not in data.keys(): raise Exception(f'组件输出的桩名称有误,未找到{p},桩:{list(data.keys())}') p_data = data[p] split_data.append(p_data) return split_data def split_df(self,method:str,df:pd.DataFrame,by_group=None) -> list: if by_group is not None: output_point_id = self.get_io_group_info(io='out',type='point_id')[by_group] output_n = len(output_point_id) else: output_point_id = self.get_io_id(io='out') output_n = self.n_output df_list = [] if method == 'idx': if df.shape[1] != output_n: raise Exception(f'输出数据的个数不等于原数据中的列数,以下是原数据中包含的列:{df.columns.to_list()}') for idx in range(output_n): df_list.append(df.iloc[:,[idx]]) elif method == 'id': for point_id in output_point_id: if point_id not in df.columns: raise Exception(f'数据中没有{point_id},以下是数据中包含的列:{df.columns.to_list()}') df_list.append(df.loc[:,[point_id]]) return df_list def get_property(self,key:str,default=None): # 获取组件配置的属性 if key not in self.config.keys(): return default property = self.config[key] if property is None or property == '': return default return property def check_property_exist(self,property_name:dict): for param,param_name in property_name.items(): try: self.config[param] except: raise Exception(f'组件缺少自定义参数:{param_name}') def check_io_equal(self): input_id = self.get_io_id('in') output_id = self.get_io_id('out') is_equal = input_id == output_id return is_equal if __name__ == '__main__': ############################# config = { '_PORTS_IN': [], '_PORTS_OUT': [ {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '','type': 'DF','static': True, 'point_id': 'a'}, {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': 'P_ND2_Tdb'}], 'name': 'ND2_室外温度', 'type': 'DF', 'static': True, 'point_id': 'b'}, {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '','point_id': 'c'}, {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '', 'point_id': 'd'} ], '_PORTS_IN_GROUP': [], '_PORTS_OUT_GROUP': [ {'id': '1696909849014', 'end': 2, 'name': 'X', 'start': 0, 'static': True}, {'id': '1696909849434', 'end': 4, 'name': 'Y', 'start': 2, 'static': True} ], '_CODE' : None, '_DEVICE_CODE': None } config_info = ConfigInfo(config) df1 = pd.DataFrame({'a':[1,2,3],'b':[4,5,6]}) df2 = pd.DataFrame({'c':[7,8,9],'d':[10,11,12]}) res1 = config_info.split_df_by_groupinfo(df_map= {'X':df1,'Y':df2}) print('res1',res1) ########################### config = { '_PORTS_OUT': [], '_PORTS_IN': [ {'point_id': 'a'}, {'point_id': 'b'}, ], '_PORTS_IN_GROUP': [], '_PORTS_OUT_GROUP': [ ], '_CODE' : None, '_DEVICE_CODE': None } df1 = pd.DataFrame({'x':[1,2,3]}) df2 = pd.DataFrame({'y':[1,2,3]}) config_info = ConfigInfo(config) res2 = config_info.rename_df([df1,df2]) print('res2',res2)