Browse Source

feat(_data): 添加缓存数据处理功能

- 新增 cache_data.py 文件,实现缓存数据的读取、更新和管理
- 新增 main.py 文件,包含数据加载、处理和更新缓存的主流程
- 新增 config_info.py 文件,提供配置信息解析功能
- 新增 data_cleaner.py 文件,实现数据清洗功能

这些文件共同构成了数据处理模块,支持从缓存或数据库获取数据,进行数据清洗和处理,并更新缓存。
zhangshenhao 7 months ago
parent
commit
3cfcae84fb

+ 100 - 0
_data/cache_data.py

@@ -0,0 +1,100 @@
+from datetime import datetime
+from typing import Union
+import os 
+
+import pandas as pd 
+    
+class CacheData:
+    
+    def __init__(
+        self,
+        PATH,
+        point_ids : list,
+        time_start: datetime,
+        time_end  : datetime,
+        int_time  : str,
+    ) -> None:
+        
+        self.PATH = PATH 
+
+        if self.PATH.exists():
+            self.all_cache_data:pd.DataFrame = pd.read_pickle(self.PATH)
+        else:
+            self.all_cache_data = None
+        
+        self.point_ids  = point_ids
+        self.time_start = time_start
+        self.time_end   = time_end
+        self.int_time   = 'min' if int_time == 'M' else int_time
+        self.dt_range   = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
+        
+        self.need_cache_data  = self._read_cache(drop_na=False)
+        self.exist_cache_data = self._read_cache(drop_na=True)
+        
+    def _read_cache(self,drop_na:bool) -> Union[pd.DataFrame,None]:
+        # 包含all_cache_data中的所有列
+        if self.all_cache_data is None or self.all_cache_data.shape[0] == 1:
+            return None
+        
+        dt_range       = pd.date_range(start=self.time_start,end=self.time_end,freq=self.int_time)
+        dt_range_index = pd.Index(dt_range,name='ts')
+        cache_data     = self.all_cache_data.reindex(index=dt_range_index)
+        
+        if drop_na:
+            cache_data = cache_data.dropna()
+        
+        return cache_data
+    
+    def is_missing_point(self) -> bool:
+        if self.all_cache_data is None:
+            return True
+        
+        missing_point = pd.Index(self.point_ids).difference(self.exist_cache_data.columns)
+        if len(missing_point) > 0:
+            return True
+        else:
+            return False
+    
+    def is_missing_time(self) -> bool:
+        if self.all_cache_data is None:
+            return True
+
+        missing_time = self.dt_range.difference(self.exist_cache_data.index)
+        if len(missing_time) > 0:
+            return True
+        else:
+            return False
+    
+    def get_missing_dt(self) -> tuple:
+        if not (self.is_missing_time() and (not self.is_missing_point())):
+            return None
+        
+        dt_range = (
+            self.need_cache_data
+            .loc[:,self.point_ids]
+            .loc[lambda dt:dt.isna().all(axis=1),:]
+            .index
+        )
+        dt_max = dt_range.max().to_pydatetime()
+        dt_min = dt_range.min().to_pydatetime()
+        
+        return dt_max,dt_min
+        
+    def update_cache(self,new_data:pd.DataFrame) -> None:
+        
+        if self.all_cache_data is not None:
+            old_total_cache_data = self.all_cache_data.copy(deep=True)
+            new_index            = old_total_cache_data.index.append(new_data.index).drop_duplicates()
+            new_columns          = old_total_cache_data.columns.append(new_data.columns).drop_duplicates()
+            new_total_cache_data = old_total_cache_data.reindex(index=new_index,columns=new_columns).sort_index()
+            new_total_cache_data.update(new_data)
+            pd.to_pickle(new_total_cache_data,self.PATH)
+        else:
+            pd.to_pickle(new_data,self.PATH)
+    
+    @classmethod
+    def clean_cache(self,PATH) -> None:
+        try:
+            os.remove(PATH)
+        except:
+            pass

+ 241 - 0
_data/main.py

@@ -0,0 +1,241 @@
+import os
+from datetime import datetime,timedelta
+from pathlib import Path
+
+import numpy as np
+import pandas as pd
+
+from .._utils.point_io import PointReader
+from .._utils.config_info import ConfigInfo
+from .._utils.data_summary import summary_dataframe
+from .cache_data import CacheData
+
+def main(set_time_start=None,set_time_end=None,config=None):
+    
+    config_info = ConfigInfo(config)
+    int_time    = config_info.get_property('int_time',default='M')
+    from_cache  = config_info.get_property('from_cache',default=False)
+    clean_cache = config_info.get_property('clean_cache',default=False)
+    na_behave   = config_info.get_property('na_behave',default='Exception')
+    output_id   = config_info.get_io_id(io='out')
+    time_start,time_end = get_time(config_info,set_time_start,set_time_end,int_time)
+    
+    point_ids = [point['point_id'] for point in config['_PORTS_OUT']]
+    file_name = config.get('_CODE','DEFAULT')
+    file_name = 'DEFAULT' if file_name is None else file_name     # 组件config中的_CODE偶尔会出现能取到None的情况
+    PATH      = Path(f'/mnt/workflow_data/{file_name}_File.pkl')
+    
+    if int_time == 'S':
+        url = 'http://basedataportal-svc:8080/data/get_points_real_value'
+    else:
+        url = config_info.get_property('url','http://basedataportal-svc:8080/data/getpointsdata')
+    
+    # 清除缓存数据
+    if clean_cache:
+        CacheData.clean_cache(PATH=PATH)
+    
+    data   = get_data(point_ids,time_start,time_end,int_time,url,from_cache,PATH)
+    result = get_result(data,point_ids,output_id,na_behave)
+    
+    return result[0] if len(result)==1 else result
+
+
+def parse_str_time(str_time):
+    return datetime.strptime(str_time, '%Y-%m-%d %H:%M:%S')
+
+def get_result(data,point_ids,output_id,na_behave):
+    
+    result   = []
+    
+    if len(output_id) == 0:
+        return [None]
+    
+    for col in point_ids:
+        if col in data.columns:
+            df = data.loc[:,[col]]
+        else:
+            df = pd.DataFrame({col:np.ones(shape=data.index)*np.nan},index=data.index)
+        
+        if not df.isna().any().any():
+            result.append(df)
+        
+        elif na_behave == 'Exception':
+            raise Exception(f'{col}未获取到对应数据')
+        
+        elif na_behave == 'Zero':
+            df = df.fillna(0)
+            result.append(df)
+        
+        else:
+            raise Exception('WRONG')
+        
+    # 非优化回算任务打印数据
+    if 'OPTIM_CALC_LOG_ID' not in os.environ:
+        print(result)
+    
+    return result
+    
+
+def get_data(
+    points_id : list,
+    time_start: datetime,
+    time_end  : datetime,
+    int_time  : str,
+    url       : str,
+    from_cache: bool,
+    PATH
+) -> pd.DataFrame:
+    cache_data = CacheData(PATH,points_id,time_start,time_end,int_time)
+    
+    if not from_cache:
+        print('Data Source : DB (Set)')
+        data = get_data_from_db(points_id,time_start,time_end,int_time,url)
+        data = data.fillna(-9999)
+        
+    elif cache_data.is_missing_point():
+        print('Data Source : DB (Missing Point)')
+        data = get_data_from_db(points_id,time_start,time_end,int_time,url)
+        data = data.fillna(-9999)
+    
+    elif cache_data.is_missing_time():
+        dt_end,dt_start = cache_data.get_missing_dt()
+        print(f'Data Source : DB+CACHE (Missing Time {dt_start}~{dt_end})')
+        
+        data_db    = get_data_from_db(points_id,dt_start,dt_end,int_time,url)
+        data_db    = data_db.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
+        data_cache = cache_data.need_cache_data
+        data_cache = data_cache.loc[lambda dt:~dt.index.duplicated(keep='last'),data_db.columns.to_list()]
+
+        data = data_cache.copy()
+        data.update(data_db.fillna(value=-9999))
+        data = data.loc[lambda dt:~dt.index.duplicated(keep='last'),:].sort_index()
+        
+    else:
+        print('Data Source : CACHE')
+        data = cache_data.need_cache_data
+    
+    cache_data.update_cache(data)
+    
+    print(f'URL : {url}')
+    print(f'Int : {int_time}')
+    summary_dataframe(data,'点位加载数据')
+    
+    return data 
+
+def get_data_from_db(
+    points_id : list,
+    time_start: datetime,
+    time_end  : datetime,
+    int_time  : str,
+    url       : str  
+) -> pd.DataFrame:
+    
+    point_reader = PointReader(point_ids=points_id,dt_begin=time_start,dt_end=time_end,url=url)
+    
+    if int_time in ['M']:
+        data = point_reader.read_interval()
+    elif int_time in ['H','D']:
+        data = point_reader.read_int_interval(freq=int_time)
+    elif int_time in ['S']:
+        data = point_reader.read_current()
+    else:
+        raise Exception('WRONG PARAM')
+    
+    return data
+
+def get_time(
+    config_info:ConfigInfo,
+    set_time_start,
+    set_time_end,
+    int_time:str
+) -> tuple:
+    
+    if 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
+        # 任务计划获取当前时间
+        NOW = parse_str_time(os.environ.get('JOB_SCHEDULE_CALC_TIME_NOW')) - timedelta(minutes=1)
+    else:
+        # 如果取当前分钟的数据,在该分钟的前几秒可能会取不到数据,因此将当前时间往前推一分钟
+        NOW = datetime.now() - timedelta(minutes=1)
+    
+    # 获取组件设定的开始和结束时间
+    if isinstance(set_time_start,datetime) and isinstance(set_time_end,datetime): 
+        time_type         = '组件输入'
+        config_time_end   = set_time_end
+        config_time_start = set_time_start
+    else:
+        time_type        = '组件配置'
+        
+        config_time_end  = config_info.get_property('time_end',None)
+        config_time_end  = parse_str_time(config_time_end) if config_time_end is not None else NOW
+        time_end_shift   = config_info.get_property('time_end_shift',0)
+        time_end_shift   = timedelta(minutes=time_end_shift)
+        config_time_end -= time_end_shift
+        
+        if config_info.get_property('time_start1',None) is not None:
+            config_time_start = config_info.get_property('time_start1')
+            config_time_start = parse_str_time(config_time_start)
+            
+        elif config_info.get_property('time_start2',None) is not None:
+            config_time_start = config_info.get_property('time_start2')
+            config_time_start = config_time_end - timedelta(minutes=config_time_start)
+        
+        else:
+            config_time_start = config_time_end
+        
+    # # 根据环境确定取数时间
+    # 仿真环境
+    if 'SIMULATOR_JOB_ID' in os.environ: 
+        time_type  = '仿真环境:' + time_type
+        time_end   = NOW
+        time_start = time_end
+        
+    # 监控环境
+    elif 'MONITOR_TYPE' in os.environ:
+        time_type  = '监控环境:' + time_type
+        monitor_begin = os.environ.get('MONITOR_BEGIN', None)
+        monitor_end   = os.environ.get('MONITOR_END', None)
+        time_end      = parse_str_time(monitor_end)
+        time_start    = parse_str_time(monitor_begin) - timedelta(hours=1)
+    
+    # 优化回算
+    elif 'OPTIM_CALC_LOG_ID' in os.environ:
+        time_type  = '优化回算:' + time_type
+        time_end   = parse_str_time(os.environ['OPTIM_TIME'])
+        time_start = time_end - timedelta(minutes=10)
+        print(f"os.environ['OPTIM_TIME']: {os.environ['OPTIM_TIME']}")
+    
+    # 实时优化环境
+    elif 'OPTIM_JOB_ID' in os.environ:
+        time_type  = '实时优化:' + time_type
+        time_end   = config_time_end
+        time_start = config_time_start
+    
+    elif 'JOB_SCHEDULE_CALC_TIME_NOW' in os.environ:
+        time_type  = '任务计划:' + time_type
+        time_end   = config_time_end
+        time_start = config_time_start
+    
+    # 调试环境 WORKFLOW_DEBUG
+    else:
+        time_type  = '调试环境:' + time_type
+        time_end   = config_time_end
+        time_start = config_time_start
+    
+    if int_time == 'H':
+        time_start = time_start.replace(minute=0)
+        time_end   = time_end.replace(minute=0)
+    elif int_time == 'D':
+        time_start = time_start.replace(hour=0,minute=0)
+        time_end   = time_end.replace(hour=0,minute=0)
+    elif int_time == 'S':
+        time_start = datetime.now()
+        time_end   = datetime.now()
+
+    time_start = time_start.replace(second=0,microsecond=0)
+    time_end   = time_end.replace(second=0,microsecond=0)
+    
+    print(f'time_type  : {time_type}')
+    print(f'time_start : {time_start}')
+    print(f'time_end   : {time_end}')
+    
+    return time_start,time_end

+ 295 - 0
_utils/config_info.py

@@ -0,0 +1,295 @@
+from typing import Union
+import pandas as pd
+
+class ConfigInfo:
+    def __init__(self,config) -> None:
+        self.config   = config
+        self.n_input  = len(self.get_io_id('in'))
+        self.n_output = len(self.get_io_id('out'))
+    
+    def get_io_id(self,io = 'in') -> list:
+        if io == 'in':
+            io_key = '_PORTS_IN'
+        elif io == 'out':
+            io_key = '_PORTS_OUT'
+        else:
+            raise Exception('WRONG io')
+        
+        input_id = [point['point_id'] for point in self.config[io_key]]
+        
+        return input_id
+    
+    def get_io_group_info(self,io='in',type:str='data',data:list=None) -> dict:
+        if io == 'in':
+            io_key = '_PORTS_IN_GROUP'
+        elif io == 'out':
+            io_key = '_PORTS_OUT_GROUP'
+        
+        group_info = {}
+        for group in self.config[io_key]:
+            name           = group['name']
+            start_idx      = group['start']
+            end_idx        = group['end']
+            
+            if type == 'data':
+                # group_info = {'G1':[DF1,DF2], 'G2':[DF3,DF4]}
+                if data is None:
+                    raise Exception('当type为data时,必须输入data参数')
+                info = data[start_idx:end_idx]
+            
+            elif type == 'point_id':
+                # group_info = {'G1':[point_1,point_2],'G2':[point_3,point_4]}
+                point_id = self.get_io_id(io=io)
+                info     = point_id[start_idx:end_idx]
+            
+            else:
+                raise Exception('WRONG type')
+            
+            group_info[name] = info
+        
+        return group_info
+    
+    def get_data_by_group_and_name(self,data:list,name:str,group:str=None,
+                                   allow_group_missing=False) -> list:
+        group_data = self.get_io_group_info(io='in',type='data',data=data)
+        group_name = self.get_io_group_info(io='in',type='point_id')
+        
+        if group is None:
+            group = list(group_name.keys())
+        elif isinstance(group,str):
+            group = [group]
+        elif isinstance(group,list):
+            pass
+        else:
+            raise Exception('WRONG')
+        
+        all_data = []
+        for each_group in group:
+            if each_group not in group_name:
+                if allow_group_missing == False:
+                    raise Exception(f'缺失{each_group}')
+                else:
+                    continue
+            data_idx   = group_name[each_group].index(name)
+            data_i     = group_data[each_group][data_idx]
+            
+            if isinstance(data_i,pd.DataFrame):
+                data_i = data_i.iloc[:,[0]].set_axis([each_group],axis=1)
+            all_data.append(data_i)
+            
+        return all_data
+    
+    def get_io_info_by_pc(self,io='in',type:str='data',data:list=None,drop_groups:list=[]) -> dict:
+        """
+        对组件中每个分组的点位进行分割,根据点位的编号重新分组
+        """
+
+        if io == 'in':
+            io_key = '_PORTS_IN_GROUP'
+        elif io == 'out':
+            io_key = '_PORTS_OUT_GROUP'
+
+        group_pc_info_all = self.get_io_group_info(io=io,type='point_id',data=data)
+        if drop_groups is None:
+            group_pc_info = group_pc_info_all
+        elif isinstance(drop_groups,list):
+            group_pc_info = {k:v for k,v in group_pc_info_all.items() if k not in drop_groups}
+
+        if not all([i==list(group_pc_info.values())[0] for i in group_pc_info.values()]):
+            raise Exception('请确保所有分组中点位的编号是一致的!')
+            
+        pc_num = len(list(group_pc_info.values())[0])
+        pc_info = {}
+        for count in range(pc_num):
+            info_list = []
+            for group_info in self.config[io_key]:
+                if group_info['name'] in drop_groups:
+                    continue
+                idx         = group_info['start'] + count
+                point_id    = self.get_io_id(io=io)[idx]
+                if type == 'data':
+                    data_info = data[idx]
+                    info_list.append(data_info)
+                elif type == 'point_id':
+                    group_name = group_info['name']
+                    info_list.append(group_name)
+            pc_info[point_id] = info_list
+        
+        return pc_info
+
+    def rename_df(self,dfs:list,io='in') -> list:
+        
+        result = []
+        point_id = self.get_io_id(io=io)
+        
+        if len(dfs) != len(point_id):
+            raise Exception(f'数据长度有误,point_id:{point_id},dfs:{dfs}')
+        
+        for p,df in zip(point_id,dfs):
+            if not isinstance(df,pd.DataFrame) or not isinstance(p,str):
+                result.append(df)
+            else:
+                result.append(
+                    df.iloc[:,[0]].set_axis([p],axis=1)
+                )
+        
+        return result
+    
+    def split_df_by_groupinfo(
+        self,
+        data_map:dict,
+        allow_data_map_is_subset:bool  = False,
+        allow_data_map_miss_group:list = None
+    ) -> list:
+        """
+        根据组件配置的输出分组和桩,将数据输出
+
+        Parameters
+        ----------
+        data_map : dict
+            分组名称及对应的数据 {分组1:DataFrame, 分组2:{桩A:Float,桩B:Bool}}
+
+        Returns
+        -------
+        list
+            数据列表
+
+        Raises
+        ------
+        Exception
+            分组名称超出了限定的范围
+        Exception
+            桩名称超出了限定的范围
+        """
+        output_groupinfo = self.get_io_group_info('out',type='point_id')
+        
+        split_data = []
+        
+        for group_name,points in output_groupinfo.items():
+            data = data_map.get(group_name)
+            
+            if data is None:
+                if isinstance(allow_data_map_miss_group,list) and group_name in allow_data_map_miss_group:
+                    continue
+                elif not allow_data_map_is_subset:
+                    raise Exception(f'组件输出的分组名称{group_name}有误,分组:{list(data_map.keys())}')
+                else:
+                    continue
+            
+            for p in points:
+                
+                if isinstance(data,pd.DataFrame):
+                    if p not in data.columns:
+                        raise Exception(f'组件输出的桩名称有误,未找到{p},桩:{data.columns.to_list()}')
+                    p_data = data.loc[:,[p]]
+                
+                elif isinstance(data,dict):
+                    if p not in data.keys():
+                        raise Exception(f'组件输出的桩名称有误,未找到{p},桩:{list(data.keys())}')
+                    p_data = data[p]
+                    
+                split_data.append(p_data)
+        
+        return split_data
+            
+    
+    def split_df(self,method:str,df:pd.DataFrame,by_group=None) -> list:
+        
+        if by_group is not None:
+            output_point_id = self.get_io_group_info(io='out',type='point_id')[by_group]
+            output_n = len(output_point_id)
+        else:
+            output_point_id = self.get_io_id(io='out')
+            output_n = self.n_output
+        
+        df_list = []
+        
+        if method == 'idx':
+            
+            if df.shape[1] != output_n:
+                raise Exception(f'输出数据的个数不等于原数据中的列数,以下是原数据中包含的列:{df.columns.to_list()}')
+            
+            for idx in range(output_n):
+                df_list.append(df.iloc[:,[idx]])    
+            
+        elif method == 'id':
+            for point_id in output_point_id:
+                if point_id not in df.columns:
+                    raise Exception(f'数据中没有{point_id},以下是数据中包含的列:{df.columns.to_list()}')
+                df_list.append(df.loc[:,[point_id]])
+        
+        return df_list
+    
+    def get_property(self,key:str,default=None):
+        # 获取组件配置的属性
+        if key not in self.config.keys():
+            return default
+        
+        property = self.config[key]
+        if property is None or property == '':
+            return default
+        
+        return property
+    
+    def check_property_exist(self,property_name:dict):
+        for param,param_name in property_name.items():
+            try:
+                self.config[param]
+            except:
+                raise Exception(f'组件缺少自定义参数:{param_name}')
+    
+    def check_io_equal(self):
+        input_id = self.get_io_id('in')
+        output_id = self.get_io_id('out')
+        is_equal = input_id == output_id
+        return is_equal
+    
+    
+if __name__ == '__main__':
+    #############################
+    config = {
+        '_PORTS_IN': [], 
+        '_PORTS_OUT': 
+            [
+                {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '','type': 'DF','static': True, 'point_id': 'a'}, 
+                {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': 'P_ND2_Tdb'}], 'name': 'ND2_室外温度', 'type': 'DF', 'static': True, 'point_id': 'b'}, 
+                {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '','point_id': 'c'}, 
+                {'cols': [{'type': 'date', 'title': '时间'}, {'type': 'float', 'title': ''}], 'name': '', 'point_id': 'd'}
+            ], 
+        '_PORTS_IN_GROUP': [],
+        '_PORTS_OUT_GROUP': 
+            [
+                {'id': '1696909849014', 'end': 2, 'name': 'X', 'start': 0, 'static': True}, 
+                {'id': '1696909849434', 'end': 4, 'name': 'Y', 'start': 2, 'static': True}
+            ], 
+            '_CODE'       : None,
+            '_DEVICE_CODE': None
+    }
+    config_info = ConfigInfo(config)
+    
+    df1 = pd.DataFrame({'a':[1,2,3],'b':[4,5,6]})
+    df2 = pd.DataFrame({'c':[7,8,9],'d':[10,11,12]})
+    
+    res1 = config_info.split_df_by_groupinfo(df_map= {'X':df1,'Y':df2})
+    print('res1',res1)
+    
+    ###########################
+    config = {
+        '_PORTS_OUT': [], 
+        '_PORTS_IN': 
+            [
+                {'point_id': 'a'}, 
+                {'point_id': 'b'}, 
+            ], 
+        '_PORTS_IN_GROUP': [],
+        '_PORTS_OUT_GROUP': 
+            [
+            ], 
+            '_CODE'       : None,
+            '_DEVICE_CODE': None
+    }
+    df1 = pd.DataFrame({'x':[1,2,3]})
+    df2 = pd.DataFrame({'y':[1,2,3]})
+    config_info = ConfigInfo(config)
+    res2 = config_info.rename_df([df1,df2])
+    print('res2',res2)

+ 295 - 0
_utils/data_cleaner.py

@@ -0,0 +1,295 @@
+import warnings
+from typing import Union
+from datetime import datetime
+
+import numpy as np
+import pandas as pd 
+from statsmodels.formula.api import rlm
+from scipy.stats import iqr
+
+from .data_summary import summary_dataframe
+
+class DataCleaner:
+    
+    def __init__(self,data:pd.DataFrame,print_process=True) -> None:
+        self.raw_data      = data
+        self.data          = data.copy()
+        self.drop_index    = np.array([False]*len(self.raw_data))
+        self.print_process = print_process
+        
+        if self.print_process:
+            summary_dataframe(df=self.raw_data,df_name='原始数据')
+    
+    def rm_na_and_inf(self):
+        # 删除缺失数据
+        is_na_data      = self.data.isna().any(axis=1).values
+        is_inf_data     = np.any(np.isinf(self.data.values),axis=1)
+        drop_index      = is_na_data | is_inf_data
+        self.drop_index = self.drop_index | drop_index
+        self._count_removed_data(index=drop_index,method='rm_na_and_inf')
+        return self
+    
+    def rm_constant(
+        self,
+        window        :int  = 10,
+        exclude_value :list = None,
+        include_cols  :list = '__ALL__',
+        include_by_re :bool = False,
+        exclude_cols  :list = None
+    ):
+        # 删除常数
+        data              = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        drop_index_matrix = (data.rolling(window=window).std()==0)
+        if exclude_value is not None:
+            for each_value in exclude_value:
+                keep_index_matrix = data.values == each_value
+                drop_index_matrix[keep_index_matrix] = False
+        drop_index        = drop_index_matrix.any(axis=1)
+        self.drop_index   = self.drop_index | drop_index
+        self._count_removed_data(index=drop_index,method='rm_constant',index_matrix=drop_index_matrix,var_name=data.columns)
+        return self 
+
+    def rm_rolling_fluct(
+        self,
+        window        :int             = 10,
+        unit          :Union[str,None] = 'min',
+        fun           :str             = 'ptp',
+        thre          :float           = 0,
+        include_cols  :list            = '__ALL__',
+        include_by_re :bool            = False,
+        exclude_cols  :list            = None
+    ):
+        data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        
+        if unit is None:
+            roll_window = window
+        else:
+            roll_window = str(window) + unit
+        roll_data = data.rolling(window=roll_window,min_periods=1,center=True) 
+        
+        if fun == 'ptp':
+            res = roll_data.max() - roll_data.min()
+        elif fun == 'pct':
+            res = (roll_data.max() - roll_data.min())/roll_data.min()
+        drop_index_matrix = res>thre
+        drop_index = drop_index_matrix.any(axis=1)
+        self.drop_index = self.drop_index | drop_index
+        self._count_removed_data(index=drop_index,method='rm_rolling_fluct',index_matrix=drop_index_matrix,var_name=data.columns)
+        return self
+    
+    def rm_outlier_rolling_mean(
+        self,
+        window       :int    = 10,
+        thre         :float  = 0.02,
+        include_cols :list   = '__ALL__',
+        include_by_re:bool   = False,
+        exclude_cols :list   = None
+    ):
+        # 删除时序异常
+        data            = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        data            = data.reset_index(drop=True)
+        windows_mean    = data.rolling(window=window,min_periods=1).mean()
+        drop_index      = (((data - windows_mean)/data).abs()>thre).any(axis=1).values
+        self.drop_index = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method='rm_outlier_mean')
+        return self 
+    
+    def rm_diff(
+        self,
+        thre         : float,
+        shift        : int    = 1, 
+        include_cols : list   = '__ALL__',
+        include_by_re: bool   = False,
+        exclude_cols : list   = None
+    ):
+        # shift 等于1时为后一项减前一项
+        data              = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        data_diff         = data.diff(periods=shift,axis=0)
+        drop_index_matrix = data_diff.abs() > thre
+        drop_index        = drop_index_matrix.any(axis=1).values
+        self.drop_index   = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method='rm_diff',index_matrix=drop_index_matrix,var_name=data.columns)
+        return self 
+    
+    def rm_zero(
+        self,
+        include_cols :list  = '__ALL__',
+        include_by_re:bool  = False,
+        exclude_cols :list  = None
+    ):
+        data            = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        drop_index      = (data==0).any(axis=1).values
+        self.drop_index = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method='rm_zero')
+        return self
+        
+    
+    def rm_negative(
+        self,
+        keep_zero     :bool = False,
+        include_cols :list  = '__ALL__',
+        include_by_re:bool  = False,
+        exclude_cols :list  = None
+    ):
+        # 删除负数
+        data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        if keep_zero is True:
+            drop_index = (data<0).any(axis=1).values
+        else:
+            drop_index = (data<=0).any(axis=1).values
+        self.drop_index = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method='rm_negative')
+        return self
+    
+    def rm_rule(self,remove_rule:str):
+        # 基于规则删除数据
+        data            = self.data.copy()
+        drop_index      = np.array(data.eval(remove_rule))
+        self.drop_index = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method=f'rm_rule({remove_rule})')
+        return self
+
+    def rm_regression_outlier(
+        self,
+        formula     : str,
+        rm_resid_IQR: float                = 1.5,
+        rm_dir      : str                  = 'both',
+        exclude_rule: Union[str,list,None] = None,
+        min_sample  : int                  = 30,
+    ):
+        #! 顺序敏感
+        RAW_INDEX    = np.arange(len(self.data))
+        
+        # 排除以外的数据,不参与计算
+        if exclude_rule is None:
+            exclude_rule = []
+        if isinstance(exclude_rule,str):
+            exclude_rule = [exclude_rule]
+        exclued_index = np.array([False]*len(self.raw_data))
+        for rule in exclude_rule:
+            exclued_index = exclued_index | np.array(self.data.eval(rule))
+        exclued_index      = pd.Series(data=exclued_index,index=RAW_INDEX)
+        exclude_index_drop = pd.Series(self.drop_index,index=RAW_INDEX).loc[exclued_index.values]
+        
+        # 待清洗的数据
+        data_clean   = self.data.assign(RAW_INDEX_=RAW_INDEX).loc[~(self.drop_index|exclued_index.values)]
+        filter_index = data_clean.RAW_INDEX_.values
+        
+        if len(data_clean) < min_sample:
+            return self 
+        
+        with warnings.catch_warnings():
+            warnings.simplefilter('ignore')
+            mod = rlm(formula,data=data_clean).fit(maxiter=500)
+        resid      = np.array(mod.resid)
+        IQR        = iqr(resid)
+        if rm_dir == 'both':
+            drop_index = (resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)) | (resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR))
+        elif rm_dir == 'lower':
+            drop_index = resid < (np.quantile(resid,q=0.25)-rm_resid_IQR*IQR)
+        elif rm_dir == 'upper':
+            drop_index = resid > (np.quantile(resid,q=0.75)+rm_resid_IQR*IQR)
+        else:
+            raise ValueError('rm_dir must be one of "both","lower","upper"')
+        
+        drop_index_incomplete = pd.Series(data=drop_index,index=filter_index).combine_first(exclude_index_drop)
+        drop_index_complete   = drop_index_incomplete.reindex(RAW_INDEX).fillna(False).values
+        self.drop_index       = drop_index_complete | self.drop_index
+        self._count_removed_data(index=drop_index,method=f'rm_reg({formula})')
+        return self
+    
+    def rm_date_range(self,start:datetime,end:datetime,col=None):
+        start = pd.Timestamp(start)
+        end   = pd.Timestamp(end)
+        if col is None:
+            ts = pd.to_datetime(self.raw_data.index)
+        else:
+            ts = pd.to_datetime(self.raw_data.loc[:,col])
+        drop_index = (ts>=start) & (ts<=end)
+        self.drop_index = drop_index | self.drop_index
+        self._count_removed_data(index=drop_index,method=f'rm_date_range({start}~{end})')
+        return self
+    
+    def rm_outrange(
+        self,
+        method        :str   = 'quantile',
+        upper         :float = 0.99,
+        lower         :float = 0.01,
+        include_cols  :list  = '__ALL__',
+        include_by_re :bool  = False,
+        exclude_cols  :list  = None
+    ):
+        data = self._get_data_by_cols(include_cols,include_by_re,exclude_cols)
+        if method == 'quantile':
+            q_upper = np.quantile(data.values,q=upper,axis=0)
+            q_lower = np.quantile(data.values,q=lower,axis=0)
+        elif method == 'raw':
+            q_upper = upper
+            q_lower = lower
+        else:
+            raise Exception('WRONG method')
+        
+        drop_index_matrix = (data > q_upper) | (data < q_lower)
+        drop_index        = drop_index_matrix.any(axis=1)
+        self.drop_index   = self.drop_index | drop_index
+        self._count_removed_data(index=drop_index,method='rm_outrange',index_matrix=drop_index_matrix,var_name=data.columns)
+        
+        return self
+        
+    
+    def get_data(self,fill=None,get_drop=False) -> pd.DataFrame:
+        index = self.drop_index if not get_drop else ~self.drop_index
+        if fill is None:
+            # 保留非删除数据
+            result_data = self.raw_data.loc[~index,:]
+        else:
+            # 填充非删除数据
+            result_data = self.raw_data.copy()
+            result_data.loc[index,:] = fill 
+        if self.print_process: 
+            summary_dataframe(result_data,df_name='结果数据')
+        return result_data
+    
+    def _get_data_by_cols(
+        self,
+        include_cols :list = '__ALL__',
+        include_by_re:bool = False,
+        exclude_cols :list = None,
+    ) -> pd.DataFrame:
+        data = self.data.copy()
+        
+        if include_by_re is True:
+            if isinstance(include_cols,str):
+                cols = data.loc[:,data.columns.str.contains(include_cols,regex=True)].columns
+            else:
+                raise Exception('WRONG')
+            
+        elif include_by_re is False:
+            if include_cols == '__ALL__':
+                cols = data.columns
+            elif isinstance(include_cols,str):
+                cols = [include_cols]
+            elif isinstance(include_cols,list):
+                cols = data.loc[:,include_cols].columns
+            else:
+                raise Exception('WRONG')
+            
+        if exclude_cols is not None:
+            cols = cols.difference(other=exclude_cols)
+        
+        return data.loc[:,cols]
+        
+    
+    def _count_removed_data(self,index,method,index_matrix=None,var_name=None):
+        count = index.sum()
+        pct   = round(count / len(index) * 100,2)
+        if self.print_process:
+            print(f'remove {count}({pct}%) by {method}')
+        
+        if index_matrix is not None and var_name is not None:
+            var_drop_count = np.sum(index_matrix,axis=0)
+            for var,drop_count in zip(var_name,var_drop_count):
+                if drop_count == 0:
+                    continue
+                if self.print_process:
+                    print(f'{var}:{drop_count}')

+ 671 - 0
_utils/data_service.py

@@ -0,0 +1,671 @@
+import json
+import datetime
+import time
+import pandas as pd
+import traceback
+import sys
+import os
+from functools import partial
+from dateutil import tz
+
+try:
+    from workflowlib import requests
+except:
+    import requests
+
+
+urlcfg = {
+    'getpointdata_url'      : "data/getpointdata",
+    'getpointsdata_url'     : "data/getpointsdata",
+    'getpointsdataforai_url': "data/getpointsdataforai",
+    'getpointsruntime_url'  : "data/getpointsruntime",
+    'getcurrdata_url'       : "ai/getcurrdata",
+    'gethisdata_url'        : "ai/gethisdata",
+    'putaidata_url'         : "ai/putaidata",
+    'uploadaifile_url'      : "ai/uploadaifile",
+    'addpointdatum_url'     : "ai/addpointdatum",
+}
+
+
+class PointReader:
+    
+    root_url               = os.environ.get('DATA_ROOT_URL')
+    upload_url             = os.environ.get('DATA_UPLOAD_URL')
+    getpointdata_url       = urlcfg["getpointdata_url"]
+    getpointsdata_url      = urlcfg["getpointsdata_url"]
+    getpointsdataforai_url = urlcfg["getpointsdataforai_url"]
+    getpointsruntime_url   = urlcfg["getpointsruntime_url"]
+    getcurrdata_url        = urlcfg["getcurrdata_url"]
+    gethisdata_url         = urlcfg["gethisdata_url"]
+    putaidata_url          = urlcfg["putaidata_url"]
+    uploadaifile_url       = urlcfg["uploadaifile_url"]
+    addpointdatum_url      = urlcfg["addpointdatum_url"]
+
+    dtfromts = partial(datetime.datetime.fromtimestamp, tz=tz.gettz('Asia/Shanghai'))
+    # 最大连接重试次数,连接失败等待时间
+    max_try    = 10
+    post_sleep = 1
+    
+    def __init__(self,url=None) -> None:
+        if url is not None:
+            self.url = url
+            print(f'使用临时url:{self.url}')
+        elif self.root_url is None:
+            raise Exception('未在环境变量中获取到 DATA_ROOT_URL')
+        else:
+            self.url = self.root_url + self.getpointsdata_url
+    
+    # 传入包含多个point id的list,返回一个dataframe里面包含了ts,point_id,value
+    def get_points_data(self,point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None, return_type='dict'):
+        """
+        :param point_ids: list
+        :param from_time: datetime 开始时间
+        :param to_time: datetime 结束时间
+        :param interval: int=1 时间间隔
+        :param type_: =3 后端业务要求
+        :param ts2dt_col: list timestamp需要转换为datetime的列名
+        :param return_type: str in {'dict', 'df', 'dfcol'} default='dict' 指定返回的数据结构
+        'dict' 返回 {point_id: DataFrame} (原始结构)
+        'df' 返回各点位加入 point_id 列再按行拼合后的结果
+        'dfcol' 返回各点位以时间戳为索引,按列拼合,并用 point_id 作为 value 的列名,有 value 时才生效
+        :return: DataFrame
+        """
+        post_data = {
+            "point_ids": point_ids,
+            "begin"    : round(from_time.timestamp()),
+            "end"      : round(to_time.timestamp()),
+            "interval" : interval,
+            "type"     : type_,
+        }
+        rem_try = self.max_try
+        while rem_try > 0:
+            try:
+                resp = requests.post(url=self.url, data=json.dumps(post_data),timeout=60)
+                data = resp.json()['data']
+                if data:
+                    res = dict()
+                    for point in data:
+                        res[point['point_id']] = pd.DataFrame(point['data'])
+                        if ts2dt_col is not None:
+                            res[point['point_id']] = self.ts2dt(res[point['point_id']], ts2dt_col)
+                        # res[point['point_id']].set_index(['ts'], inplace=True)
+                    if return_type == 'dict':
+                        return res
+                    elif return_type == 'df':
+                        for point in res.keys():
+                            res[point]['point_id'] = point
+                        return pd.concat(res.values(), axis=0)
+                    elif return_type == 'dfcol':
+                        res_df = pd.DataFrame()
+                        for point_id, df_ in res.items():
+                            res_df = pd.concat(
+                                [res_df, df_.set_index('ts').rename(columns={'value': point_id})],
+                                axis=1)
+                        return res_df.reset_index()
+                else:
+                    rem_try -= 1
+                    time.sleep(self.post_sleep)
+            except Exception as e:
+                self.error_print(sys._getframe().f_code.co_name)
+                rem_try -= 1
+                time.sleep(self.post_sleep)
+        if rem_try == 0:
+            print("\nget_points_data failed")
+
+    # 内部函数:打印报错信息
+    def error_print(self,func_name):
+        print()
+        print(f"{self.dtfromts(time.time())}:")
+        print(f"function {func_name} error!")
+        print(f"Exception Info:")
+        e_type, e_value, e_traceback = sys.exc_info()
+        print(e_type)
+        print(e_value)
+        traceback.print_tb(e_traceback)
+        print()
+
+    # 内部函数:将timestamp转换成datetime
+    def ts2dt(self,df: pd.DataFrame, cols):
+        for col in cols:
+            df[col] = pd.Series(map(self.dtfromts, df[col]))
+        return df
+
+class PointWriter:
+    upload_url        = os.environ.get('DATA_UPLOAD_URL')
+    addpointdatum_url = urlcfg.get('addpointdatum_url')
+    # 最大连接重试次数,连接失败等待时间
+    max_try    = 10
+    post_sleep = 1
+    
+    def __init__(self,url=None) -> None:
+        if url is not None:
+            self.url = url
+            print(f'使用临时url:{self.url}')
+        elif self.upload_url is None:
+            raise Exception('未在环境变量中获取到 DATA_UPLOAD_URL')
+        else:
+            self.url = self.upload_url + self.addpointdatum_url
+    
+    # 上传数据至点位数据库
+    def ai_add_point_data(self, point_id, ts, value):
+        """
+        :param point_id: str 数据点位,需要预先在点位库中创建好
+        :param timestamp: str 当前时刻的时间 datetime
+        :param value: str 点位数值
+        :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
+        """
+        url = self.upload_url + self.addpointdatum_url
+        post_data = [
+            {
+                "point_id": str(point_id),
+                "data":[{"ts": int(ts.timestamp()), "value": str(value)}]
+                }
+            ]
+
+        rem_try = self.max_try
+        while rem_try > 0:
+            try:
+                resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data),timeout=60)
+                state = resp.json()['state']
+                if state == 0:
+                    print(f"\nput {point_id} data success!")
+                    return resp.json()
+                else:
+                    print(f"strange resp: {resp.json()}")
+                    rem_try -= 1
+                    time.sleep(self.post_sleep)
+                resp.close()
+            except Exception as e:
+                self.error_print(sys._getframe().f_code.co_name)
+                rem_try -= 1
+                time.sleep(self.post_sleep)
+        if rem_try == 0:
+            print("\nai_add_point_data failed")
+            return None
+
+    # 内部函数:打印报错信息
+    def error_print(self,func_name):
+        print()
+        print(f"{self.dtfromts(time.time())}:")
+        print(f"function {func_name} error!")
+        print(f"Exception Info:")
+        e_type, e_value, e_traceback = sys.exc_info()
+        print(e_type)
+        print(e_value)
+        traceback.print_tb(e_traceback)
+        print()
+        
+
+       
+# # 内部函数:将datetime转换成timestamp
+# def dt2ts(df: pd.DataFrame, cols):
+#     for col in cols:
+#         df[col] = pd.Series([dt.timestamp() for dt in df[col]])
+#     return df
+
+
+
+
+
+# # 传入一个point id,返回一个dataframe里面包含了ts,point_id,value
+# def get_point_data(point_id, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
+#     """
+#     :param point_id: string
+#     :param from_time: datetime 开始时间
+#     :param to_time: datetime 结束时间
+#     :param interval: int=1 时间间隔
+#     :param type_: =3 后端业务要求
+#     :param ts2dt_col: list timestamp需要转换为datetime的列名
+#     :return: DataFrame 包含时间
+#     """
+#     url = root_url + getpointdata_url
+#     post_data = {
+#         "point_id": point_id,
+#         "begin": round(from_time.timestamp()),
+#         "end": round(to_time.timestamp()),
+#         "interval": interval,
+#         "type": type_,
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             ts_data = resp.json()['data']
+#             if ts_data:
+#                 res = pd.DataFrame(ts_data)
+#                 if ts2dt_col is not None:
+#                     res = ts2dt(res, ts2dt_col)
+#                 # res.set_index(['ts'], inplace=True)
+#                 return res
+#             else:
+#                 rem_try -= 1
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nget_point_data failed")
+
+
+
+# # 传入包含多个point id的list,返回一个dataframe里面包含了ts,多个point_id求总后的value
+# def sum_points_by_ts(point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
+#     """
+#     :param point_ids: list
+#     :param from_time: datetime 开始时间
+#     :param to_time: datetime 结束时间
+#     :param interval: int=1 时间间隔
+#     :param type_: =3 后端业务要求
+#     :param ts2dt_col: list timestamp需要转换为datetime的列名
+#     :return: DataFrame
+#     """
+#     try:
+#         data_dict = get_points_data(point_ids, from_time, to_time, interval, type_, ts2dt_col)
+#         for point_id in data_dict.keys():
+#             data_dict[point_id].set_index(['ts'], inplace=True)
+#         return sum(_ for _ in data_dict.values()).reset_index()
+#     except Exception as e:
+#         error_print(sys._getframe().f_code.co_name)
+#         time.sleep(post_sleep)
+#         print("\nsum_points_by_ts failed")
+
+
+# # 传入包含多个point id的list,返回所有point_id最新的一条数据
+# def get_points_run_time(point_ids, ts2dt_col=['ts']):
+#     """
+#     :param point_ids: list
+#     :param ts2dt_col: list=['ts'] timestamp需要转换为datetime的列名
+#     :return: DataFrame
+#     """
+#     url = root_url + getpointsruntime_url
+#     post_data = {
+#         "point_ids": point_ids,
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             data = resp.json()['data']
+#             if data:
+#                 res = pd.DataFrame(data)
+#                 if ts2dt_col is not None:
+#                     res = ts2dt(res, ts2dt_col)
+#                 return res
+#             else:
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nget_points_run_time failed")
+
+
+# # 获取最新的AI数据
+# def ai_get_curr_data(
+#         model_id, model_version, algo, algo_version, module_id,
+#         ts2dt_col=None,
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param ts2dt_col: list timestamp需要转换为datetime的列名
+#     :return: datetime, DataFrame 时间戳和数据
+#     """
+#     url = upload_url + getcurrdata_url
+#     post_data = {
+#         "model_id": model_id,
+#         "model_version": model_version,
+#         "algo": algo,
+#         "algo_version": algo_version,
+#         "module_id": module_id,
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             data_resp = resp.json()['data']
+#             if data_resp:
+#                 # res = dict()
+#                 # for row in data_resp:
+#                 #     res[row['ts']] = pd.DataFrame(json.loads(row['data']))
+#                 ts_dt = dtfromts(data_resp[0]['ts'])
+#                 res = pd.DataFrame(json.loads(data_resp[0]['data']))
+#                 if ts2dt_col is not None:
+#                     res = ts2dt(res, ts2dt_col)
+#                 return ts_dt, res
+#             else:
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_get_curr_data failed")
+
+
+# # 获取历史AI数据
+# def ai_get_his_data(
+#         model_id, model_version, algo, algo_version, module_id,
+#         from_time: datetime.datetime,
+#         to_time: datetime.datetime,
+#         ts2dt_col=None,
+#         return_type='df'
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param from_time: datetime 开始时间
+#     :param to_time: datetime 结束时间
+#     :param ts2dt_col: list timestamp需要转换为datetime的列名
+#     :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
+#     'dict' 返回 {ts: DataFrame} (原始结构)
+#     'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
+#     :return: DataFrame里面包含了ts, value
+#     """
+#     url = upload_url + gethisdata_url
+#     post_data = {
+#         "model_id": model_id,
+#         "model_version": model_version,
+#         "algo": algo,
+#         "algo_version": algo_version,
+#         "module_id": module_id,
+#         "begin": round(from_time.timestamp()),
+#         "end": round(to_time.timestamp()),
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             data_resp = resp.json()['data']
+#             if data_resp:
+#                 res = dict()
+#                 for row in data_resp:
+#                     ts = dtfromts(row['ts'])
+#                     res[ts] = pd.DataFrame(json.loads(row['data']))
+#                     if ts2dt_col is not None:
+#                         res[ts] = ts2dt(res[ts], ts2dt_col)
+#                 if return_type == 'dict':
+#                     return res
+#                 elif return_type == 'df':
+#                     for ts in res.keys():
+#                         res[ts]['ts'] = ts
+#                     return pd.concat(res.values(), axis=0)
+#             else:
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_get_his_data failed")
+
+# def ai_get_his_data2(
+#         model_id, model_version, algo, algo_version, module_id,
+#         from_time: datetime.datetime,
+#         to_time: datetime.datetime,
+#         ts2dt_col=None,
+#         return_type='df'
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param from_time: datetime 开始时间
+#     :param to_time: datetime 结束时间
+#     :param ts2dt_col: list timestamp需要转换为datetime的列名
+#     :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
+#     'dict' 返回 {ts: DataFrame} (原始结构)
+#     'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
+#     :return: DataFrame里面包含了ts, value
+#     """
+#     url = root_url + gethisdata_url
+#     post_data = {
+#         "model_id": model_id,
+#         "model_version": model_version,
+#         "algo": algo,
+#         "algo_version": algo_version,
+#         "module_id": module_id,
+#         "begin": round(from_time.timestamp()),
+#         "end": round(to_time.timestamp()),
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             data_resp = resp.json()['data']
+#             if data_resp:
+#                 res = dict()
+#                 for row in data_resp:
+#                     ts = dtfromts(row['ts'])
+#                     res[ts] = pd.DataFrame(json.loads(row['data']))
+#                     if ts2dt_col is not None:
+#                         res[ts] = ts2dt(res[ts], ts2dt_col)
+#                 if return_type == 'dict':
+#                     return res
+#                 elif return_type == 'df':
+#                     for ts in res.keys():
+#                         res[ts]['ts'] = ts
+#                     return pd.concat(res.values(), axis=0)
+#             else:
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_get_his_data failed")
+
+
+# # 将 DataFrame 格式数据转换为 jsonlike 的 list 格式数据
+# def df2jsonlike(df: pd.DataFrame):
+#     res = []
+#     for _, row in df.iterrows():
+#         res.append(row.to_dict())
+#     return res
+
+
+# # 将模型所预测的数据存入数据库
+# # 点位数据
+# def ai_put_ai_data(
+#         model_id, model_version, algo, algo_version, module_id,
+#         ts_dt: datetime.datetime,
+#         data_df: pd.DataFrame,
+#         dt2ts_col=None
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
+#     :param data_df: DataFrame 业务数据
+#     :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
+#     :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
+#     """
+#     url = upload_url + putaidata_url
+#     if dt2ts_col is not None:
+#         data_df = dt2ts(data_df, dt2ts_col)
+#     post_data = {
+#         "model_id": model_id,
+#         "model_version": model_version,
+#         "algo": algo,
+#         "algo_version": algo_version,
+#         "module_id": module_id,
+#         "ts": round(ts_dt.timestamp()),
+#         "data": df2jsonlike(data_df),
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             state = resp.json()['state']
+#             if state == 0:
+#                 print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
+#                 return resp.json()
+#             else:
+#                 print(f"strange resp: {resp.json()}")
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_put_ai_data failed")
+#         return None
+
+
+# # 将模型所预测的数据存入数据库
+# # 能耗基线数据
+# def ai_put_ai_data2(
+#         model_id, model_version, algo, algo_version, module_id,
+#         ts_dt: datetime.datetime,
+#         data_df: pd.DataFrame,
+#         dt2ts_col=None
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
+#     :param data_df: DataFrame 业务数据
+#     :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
+#     :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
+#     """
+#     url = root_url + putaidata_url
+#     if dt2ts_col is not None:
+#         data_df = dt2ts(data_df, dt2ts_col)
+#     post_data = {
+#         "model_id"     : model_id,
+#         "model_version": model_version,
+#         "algo"         : algo,
+#         "algo_version" : algo_version,
+#         "module_id"    : module_id,
+#         "ts"           : round(ts_dt.timestamp()),
+#         "data"         : df2jsonlike(data_df) if isinstance(data_df, pd.DataFrame) else data_df,
+#     }
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url, data=json.dumps(post_data))
+#             state = resp.json()['state']
+#             if state == 0:
+#                 print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
+#                 return resp.json()
+#             else:
+#                 print(f"strange resp: {resp.json()}")
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_put_ai_data failed")
+#         return None
+
+
+# # 通过接口上传文件
+# def ai_upload_ai_file(
+#         model_id, algo, algo_version, module_id,
+#         file
+# ):
+#     """
+#     :param model_id: str 模型编号
+#     :param model_version: str 模型版本
+#     :param algo: str 算法名称
+#     :param algo_version: str 算法版本
+#     :param module_id: str 模块编号
+#     :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
+#     :param data_df: DataFrame 业务数据
+#     :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
+#     :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
+#     """
+#     url = upload_url + uploadaifile_url
+#     post_data = {
+#         "model_id"    : str(model_id),
+#         "algo"        : str(algo),
+#         "algo_version": str(algo_version),
+#         "module_id"   : str(module_id),
+#         "file"        : file
+#     }
+
+#     multipart_encoder = MultipartEncoder(
+#         fields=post_data
+#     )
+
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url,headers={'Content-Type': multipart_encoder.content_type}, data=multipart_encoder)
+#             state = resp.json()['state']
+#             if state == 0:
+#                 print(f"\nput {model_id}_{algo}_{algo_version}_{module_id} data success!")
+#                 return resp.json()
+#             else:
+#                 print(f"strange resp: {resp.json()}")
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#             resp.close()
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_upload_ai_file failed")
+#         return None
+
+
+# # 上传数据至点位数据库
+# def ai_add_point_data(
+#         point_id, ts, value
+# ):
+#     """
+#     :param point_id: str 数据点位,需要预先在点位库中创建好
+#     :param timestamp: str 当前时刻的时间 datetime
+#     :param value: str 点位数值
+#     :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
+#     """
+#     url = root_url + addpointdatum_url
+#     post_data = [{"point_id": str(point_id),
+#                   "data":[{"ts": int(ts.timestamp()), 
+#                            "value": str(value)}]}]
+
+#     rem_try = max_try
+#     while rem_try > 0:
+#         try:
+#             resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data))
+#             state = resp.json()['state']
+#             if state == 0:
+#                 print(f"\nput {point_id} data success!")
+#                 return resp.json()
+#             else:
+#                 print(f"strange resp: {resp.json()}")
+#                 rem_try -= 1
+#                 time.sleep(post_sleep)
+#             resp.close()
+#         except Exception as e:
+#             error_print(sys._getframe().f_code.co_name)
+#             rem_try -= 1
+#             time.sleep(post_sleep)
+#     if rem_try == 0:
+#         print("\nai_add_point_data failed")
+#         return None
+
+

+ 11 - 0
_utils/data_summary.py

@@ -0,0 +1,11 @@
+import pandas as pd 
+
+def summary_dataframe(df:pd.DataFrame,df_name:str):
+    with pd.option_context('display.max_rows', None,'display.max_columns', None,'display.width',500):
+        print('#'*20+f'   Data Summary : {df_name}   '+'#'*20)
+        print(df.describe().round(2).T)
+
+def print_dataframe(df:pd.DataFrame,df_name:str):
+    with pd.option_context('display.max_rows', None,'display.max_columns', None,'display.width',500):
+        print('#'*20+f'   Data : {df_name}   '+'#'*20)
+        print(df)

+ 190 - 0
_utils/datetime_func.py

@@ -0,0 +1,190 @@
+import datetime
+from dateutil import tz
+import time
+from functools import wraps
+import traceback
+import sys
+import threading
+
+
+# 获取带上海时区的当前时间,可指定以天为单位的延时,用于本地历史数据测试
+def get_now(delay_day=0):
+    """
+    :param delay_day: int 表示延时天数,只有测试历史数据时使用
+    :return: datetime
+    """
+    return datetime.datetime.now(tz=tz.gettz('Asia/Shanghai')) - datetime.timedelta(days=delay_day)
+
+
+# decorator 使被修饰任务整点运行,并处理异常
+def timed_exec(intv_timed: datetime.timedelta, round_mode, sleep=60):
+    """
+    被修饰任务的第一个参数必须为 now
+    且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间
+    :param intv_timed: timedelta 运行的时间间隔
+    :param round_mode: str 时间的取整方式
+    支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
+    :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
+    :return: function
+    """
+    def timed_exec_deco(func):
+        @wraps(func)
+        def timed_execute_wraps(*args, **kwargs):
+            now = round_time(get_now(), round_mode) - intv_timed
+            while True:
+                try:
+                    now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode)
+                    func(now, *args, **kwargs)
+                except Exception as e:
+                    print()
+                    print(f"{datetime.datetime.now()}:")
+                    print(f"when run {now} task an error occur!")
+                    print(f"Exception Info:")
+                    e_type, e_value, e_traceback = sys.exc_info()
+                    print(e_type)
+                    print(e_value)
+                    traceback.print_tb(e_traceback)
+                    print()
+                    time.sleep(sleep)
+        return timed_execute_wraps
+    return timed_exec_deco
+
+
+# decorator 使被修饰任务整点运行,并处理异常,且告知任务这次和上次的时间
+def timed_exec_last_now(intv_timed: datetime.timedelta, round_mode, bg_last_now='auto', sleep=60):
+    """
+    被修饰任务的前两个参数必须为 now,last_now,分别为本次运行时间和上次运行时间
+    且修饰后不用再向任务传入该参数,由装饰器向任务传送两个时间
+    :param intv_timed: timedelta 运行的时间间隔
+    :param round_mode: str 时间的取整方式
+    支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
+    :param bg_last_now: datetime 第一次执行时给的 last_now
+    default='auto' 自动向前减去 intv_timed 作为 last_now
+    :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
+    :return: function
+    """
+    def timed_exec_deco(func):
+        @wraps(func)
+        def timed_execute_wraps(*args, **kwargs):
+            if bg_last_now == 'auto':
+                last_now = round_time(get_now(), round_mode) - intv_timed
+            else:
+                last_now = bg_last_now
+            while True:
+                try:
+                    now = round_time(time_block(last_now, intv_timed, sleep=sleep), round_mode)
+                    func(now, last_now, *args, **kwargs)
+                    last_now = now
+                except Exception as e:
+                    print()
+                    print(f"{datetime.datetime.now()}:")
+                    print(f"when run {now} task an error occur!")
+                    print(f"Exception Info:")
+                    e_type, e_value, e_traceback = sys.exc_info()
+                    print(e_type)
+                    print(e_value)
+                    traceback.print_tb(e_traceback)
+                    print()
+                    time.sleep(sleep)
+        return timed_execute_wraps
+    return timed_exec_deco
+
+
+# decorator 使被修饰任务整点运行,并处理异常,且在出错时会打印线程信息
+def timed_exec_multhd(intv_timed: datetime.timedelta, round_mode, sleep=60):
+    """
+    被修饰任务的第一个参数必须为 now
+    且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间
+    :param intv_timed: timedelta 运行的时间间隔
+    :param round_mode: str 时间的取整方式
+    支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
+    :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
+    :return: function
+    """
+    def timed_exec_deco(func):
+        @wraps(func)
+        def timed_execute_wraps(*args, **kwargs):
+            now = round_time(get_now(), round_mode) - intv_timed
+            while True:
+                try:
+                    now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode)
+                    func(now, *args, **kwargs)
+                except Exception as e:
+                    thread_curr = threading.currentThread()
+                    print()
+                    print(f"{datetime.datetime.now()}:")
+                    print(f"In {thread_curr.name} (target: {thread_curr._target}, args: {thread_curr._args})")
+                    print(f"when run {now} task an error occur!")
+                    print(f"Exception Info:")
+                    e_type, e_value, e_traceback = sys.exc_info()
+                    print(e_type)
+                    print(e_value)
+                    traceback.print_tb(e_traceback)
+                    print()
+                    time.sleep(sleep)
+        return timed_execute_wraps
+    return timed_exec_deco
+
+
+# 对 datetime 向前取整
+def round_time(t_: datetime.datetime, lvl=None):
+    """
+    :param t_: datetime
+    :param lvl: str=None 取整方式,目前支持
+     {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
+    :return: datetime or "No support"
+    """
+    if lvl is None:
+        return t_
+    round_lvl = "round_" + lvl
+    if round_lvl in globals():
+        return globals()[round_lvl](t_)
+    else:
+        return "No support"
+
+
+def round_sec(t_: datetime.datetime):
+    return t_ - datetime.timedelta(microseconds=t_.microsecond)
+
+
+def round_min(t_: datetime.datetime):
+    return round_sec(t_) - datetime.timedelta(seconds=t_.second)
+
+
+def round_hou(t_: datetime.datetime):
+    return round_min(t_) - datetime.timedelta(minutes=t_.minute)
+
+
+def round_day(t_: datetime.datetime):
+    return round_hou(t_) - datetime.timedelta(hours=t_.hour)
+
+
+def round_10min(t_: datetime.datetime):
+    t_ = round_min(t_)
+    return t_ - datetime.timedelta(minutes=t_.minute % 10)
+
+
+def round_30min(t_: datetime.datetime):
+    t_ = round_min(t_)
+    if t_.minute < 30:
+        return t_ - datetime.timedelta(minutes=t_.minute)
+    else:
+        return t_ - datetime.timedelta(minutes=t_.minute - 30)
+
+
+# 半成品的时间比较函数
+def time_check(t_, st, timed_thre):
+    if t_ - st >= timed_thre:
+        return True
+    else:
+        return False
+
+
+# 阻塞并不断检查,直到当前时间和指定时间的差大于指定间隔时,释放并返回当前时间
+def time_block(t_: datetime.datetime, timed_thre: datetime.timedelta, sleep=1):
+    while True:
+        now = get_now()
+        if now - t_ >= timed_thre:
+            return now
+        else:
+            time.sleep(sleep)

+ 229 - 0
_utils/point_io.py

@@ -0,0 +1,229 @@
+from datetime import datetime
+import time
+
+try:
+    from workflowlib import requests
+except:
+    import requests
+
+import os
+import json
+import numpy as np
+import pandas as pd 
+
+
+
+class PointReader:
+    
+    def __init__(
+        self,
+        point_ids: list,
+        dt_begin : datetime,
+        dt_end   : datetime,
+        url      : str
+    ) -> None:
+    
+        self.point_ids = point_ids
+        self.dt_begin  = dt_begin
+        self.dt_end    = dt_end
+        self.url       = url
+    
+    def read_current(self):
+        post_data = {'point_ids':self.point_ids}
+        try:
+            res = requests.post(url=self.url,data=json.dumps(post_data)).json()
+            res_state = res['state']
+        except:
+            print('url',self.url)
+            print('res',res)
+        
+        if res_state != 0:
+            print('post_data',post_data)
+        
+        if len(res['data']) != 0:
+            point_id    = [_['point_id'] for _ in res['data']]
+            point_value = [_['value'] for _ in res['data']]
+            point_ts    = [_['ts'] for _ in res['data']]
+            
+            data = pd.DataFrame(
+                data    = [point_value],
+                columns = point_id,
+                index   = [point_ts[0]]
+            )
+            data.index = pd.to_datetime(data.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
+            data       = data.tz_localize(tz=None)
+            data.index = data.index.floor('min')
+        else:
+            data = pd.DataFrame(
+                data    = np.zeros(shape=[1,len(self.point_ids)]) * np.nan,
+                columns = self.point_ids,
+                index   = [datetime.now()]
+            )
+            data.index = data.index.floor('min')
+        
+        # 当点位不存在时(包括没入库),此时接口返回的res中,不包含该point_id对应的数据
+        # 因此用所有id进行reindex
+        data = data.reindex(columns=self.point_ids)
+        
+        return data 
+    
+    
+    def _read(self,dt_begin=None,dt_end=None) -> pd.DataFrame:
+        
+        dt_begin = self.dt_begin if dt_begin is None else dt_begin
+        dt_begin = dt_begin.replace(second=0,microsecond=0)
+        dt_end   = self.dt_end if dt_end is None else dt_end
+        dt_end   = dt_end.replace(second=0,microsecond=0)
+        
+        if dt_begin > dt_end:
+            raise Exception('开始时间晚于起始时间')
+        
+        ts_begin  = round(dt_begin.timestamp())
+        ts_end    = round(dt_end.timestamp())
+        
+        post_data = {
+            "point_ids": self.point_ids,
+            "begin"    : ts_begin,
+            "end"      : ts_end,
+            "interval" : 1,
+            "type"     : 3,
+        }
+        
+        try:
+            res = requests.post(url=self.url, data=json.dumps(post_data))
+            time.sleep(0.1)
+            res_state = res.json()['state']
+        except:
+            print('post_data',post_data)
+            print('url',self.url)
+            print('res',res)
+            raise Exception(res.json())
+        
+        if res_state != 0:
+            print('post_data',post_data)
+            raise Exception(res.json())
+        
+        point_df = []
+        for point_info in res.json()['data']:
+            point_id   = point_info['point_id']
+            point_data = point_info['data']
+            
+            if (point_data is None) or (len(point_data) == 0):
+                point_df_index = pd.Index(
+                    data = pd.date_range(start=dt_begin, end=dt_end, freq='1min').to_pydatetime().tolist(), 
+                    name = 'ts'
+                )
+                df = pd.DataFrame({point_id:np.nan},index=point_df_index)
+            else:
+                df       = pd.DataFrame(point_data).rename(columns={'value':point_id}).set_index('ts')
+                df.index = pd.to_datetime(df.index,unit='s',utc=True).tz_convert('Asia/Shanghai')
+                df       = df.tz_localize(tz=None)
+            
+            point_df.append(df)
+
+        check_df_missing(point_df,post_data,res)
+        data = pd.concat(point_df,axis=1).reindex(self.point_ids,axis=1)
+        
+        return data
+
+    def read_interval(self) -> pd.DataFrame:
+        interval = pd.date_range(start=self.dt_begin, end=self.dt_end, freq='1D').to_pydatetime().tolist()
+        interval += [self.dt_end]
+        
+        data = []
+        for idx in range(len(interval)):
+            if idx == len(interval)-1:
+                continue
+            start      = interval[idx]
+            end        = interval[idx+1]
+            finish_pct = round((idx+1)/(len(interval)-1) * 100,2)
+            print(f'获取第{idx}段数据({finish_pct}%),开始时间:{interval[idx]},结束时间:{interval[idx+1]}')
+            
+            data.append( self._read(dt_begin=start,dt_end=end) )
+        data = pd.concat(data,axis=0)
+        data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
+        
+        return data 
+    
+    def read_int_interval(self,freq='H') -> pd.DataFrame:
+        
+        if freq == 'H':
+            start = self.dt_begin.replace(minute=0)
+            end   = self.dt_end.replace(minute=0)
+        elif freq == 'D':
+            start = self.dt_begin.replace(hour=0,minute=0)
+            end   = self.dt_end.replace(hour=0,minute=0)
+        
+        start        = start.replace(second=0,microsecond=0)
+        end          = end.replace(second=0,microsecond=0)
+        int_interval = pd.date_range(start=start, end=end, freq=freq).to_pydatetime()
+        
+        if len(int_interval) == 0:
+            raise Exception(f'在指定的日期范围下没有获取到对应的时间点(start:{start},end:{end},freq:{freq})')
+        
+        # data = [self._read(dt_begin=dt,dt_end=dt) for dt in int_interval]
+        data = []
+        for idx,dt in enumerate(int_interval):
+            data.append(self._read(dt_begin=dt,dt_end=dt))
+            finish_pct = round((idx+1)/(len(int_interval)) * 100,2)
+            print(f'获取第{idx}段数据({finish_pct}%),时间:{dt}')
+            
+        data = pd.concat(data,axis=0)
+        data = data.loc[~data.index.duplicated(keep='last'),:].sort_index()
+        
+        return data 
+    
+def check_df_missing(point_df,post_data,res) -> None:
+    is_df_nan     = [df.isna().all().all() for df in point_df]
+    is_all_df_nan = all(is_df_nan)
+    is_any_df_nan = any(is_df_nan)
+    if is_all_df_nan or is_any_df_nan:
+        print('post_data',post_data)
+        # print('res.json',res.json())
+    if is_all_df_nan:
+        print('【所有点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
+    if is_any_df_nan:
+        print('【部分点位】的【所有时间段】数据均未获取到, 需检查点位或接口')
+
+
+class PointWriter:
+    
+    def __init__(self) -> None:
+        self.url = f'{os.environ.get("DATA_UPLOAD_URL")}ai/addpointdatum'
+    
+    def ai_add_point_data(
+        self,
+        point_id: str,
+        ts      : list,
+        value   : list
+    ):
+        
+        if len(point_id) == 0:
+            print('无数据写入')
+            return
+        
+        data = []
+        for ts_i,value_i in zip(ts,value):
+            ts_i    = int(ts_i.timestamp())
+            value_i = str(value_i)
+            data.append({'ts':ts_i,'value':value_i})
+        
+        post_data = [{'point_id':point_id,'data':data}]
+        post_data = json.dumps(post_data)
+        
+        resp = requests.post(
+            url     = self.url,
+            headers = {'Content-Type': 'application/json; charset=UTF-8'},
+            data    = post_data
+        )
+        
+        result = resp.json()
+        state  = result['state']
+        if state == 0:
+            print(f"\nput {point_id} data success!")
+        else:
+            print(result)
+            print(post_data)
+            raise Exception('ai_add_point_data failed')
+        
+        return result

+ 46 - 0
_utils/wf_cache.py

@@ -0,0 +1,46 @@
+from datetime import datetime
+from workflowlib.utils import cache as workflow_cache
+
+class WfCache:
+    
+    datetime_fmt = "%Y-%m-%d %H:%M:%S"
+    
+    def __init__(self) -> None:
+        return
+    
+    def convert_data_to_str(self,data):
+        if isinstance(data,(str,int,float)):
+            str_data = str(data)
+        elif isinstance(data,datetime):
+            str_data = datetime.strftime(data,self.datetime_fmt)
+        elif data is None:
+            str_data = ''
+        
+        return str_data
+    
+    def convert_str_to_data(self,str_data,data_type):
+        if str_data == '':
+            data = None
+            return data
+        if str_data is None:
+            return None
+        
+        if data_type == 'int':
+            data = int(str_data)
+        elif data_type == 'float':
+            data = float(str_data)
+        elif data_type == 'str':
+            data = str_data
+        elif data_type == 'datetime':
+            data = datetime.strptime(str_data,self.datetime_fmt)
+            
+        return data
+    
+    def read(self,key,data_type):
+        str_data = workflow_cache.get(key)
+        data = self.convert_str_to_data(str_data,data_type)
+        return data
+    
+    def write(self,key,data):
+        str_data = self.convert_data_to_str(data)
+        workflow_cache.set(key,str_data)

+ 134 - 0
_utils/wf_file_io.py

@@ -0,0 +1,134 @@
+import os 
+
+import numpy as np
+from typing import Union
+from datetime import datetime
+
+import pandas as pd 
+
+DIR_PATH = '/mnt/workflow_data'
+
+def write_file(
+    data      : Union[pd.DataFrame,dict,None],
+    file_name : str,
+    keep_n    : Union[int,None] = None,
+    keep_index: bool = True,
+) -> None:
+    
+    def write(
+        file_path: str,
+        df       : pd.DataFrame,
+        time     : datetime
+    ):
+        if keep_index:
+            df = df.rename_axis(index='_raw_index').reset_index(drop=False)
+        else:
+            df = df.reset_index(drop=True)
+        
+        current_df = (
+            df
+            .assign(_index=np.arange(len(df)))
+            .melt(id_vars='_index',var_name='variable',value_name='values')
+            .assign(write_ts=time)
+        )
+        
+        if os.path.exists(file_path):
+            exist_df = pd.read_csv(file_path)
+            save_df  = pd.concat([exist_df,current_df],axis=0)
+        else:
+            save_df = current_df
+        
+        save_df.write_ts = pd.to_datetime(save_df.write_ts)
+        
+        if isinstance(keep_n,int):
+            keep_time = save_df.write_ts.sort_values().drop_duplicates().nlargest(keep_n).nsmallest(1).iat[0]
+            save_df   = save_df.loc[lambda dt:dt.write_ts>= keep_time]
+            print(f'仅保留{keep_time}以后的数据')
+        
+        save_df.to_csv(file_path,index=False)
+        print(f'文件已保存:{file_path}')
+    
+    now = datetime.now()
+    
+    if isinstance(data,pd.DataFrame):
+        file_path = get_file_path(file_name=file_name,data_name=None)
+        write(file_path=file_path,df=data,time=now)
+    
+    elif isinstance(data,dict):
+        for df_name,df in data.items():
+            if not isinstance(df,pd.DataFrame):
+                continue
+            file_path = get_file_path(file_name=file_name,data_name=df_name)
+            write(file_path=file_path,df=df,time=now)
+        
+    elif data is None:
+        return None
+    
+    else:
+        raise Exception('Wrong Result Type')
+
+def write_file_simple(data:pd.DataFrame,file_name:str,path_name=None,overwrite=False):
+    path_name = path_name or DIR_PATH
+    file_path = f'{path_name}/{file_name}.csv'
+    if os.path.exists(file_path) and not overwrite:
+        exist_df = pd.read_csv(file_path)
+        save_df  = pd.concat([exist_df,data],axis=0)
+    else:
+        save_df = data
+    save_df.to_csv(file_path,index=False)
+
+def read_file(
+    file_name,
+    data_name,
+    file_path      = None,
+    write_ts_start = None,
+    write_ts_end   = None,
+) -> pd.DataFrame:
+    if file_path is None:
+        file_path = get_file_path(file_name=file_name,data_name=data_name)
+    data = (
+        pd.read_csv(file_path)
+        .assign(write_ts=lambda dt:pd.to_datetime(dt.write_ts))
+        .pivot(index=['write_ts','_index'],columns='variable',values='values')
+        .reset_index(level=1,drop=True)
+        .rename_axis(columns=None)
+    )
+    write_ts_start = data.index.min() if write_ts_start is None else write_ts_start
+    write_ts_end   = data.index.max() if write_ts_end is None else write_ts_end
+    data = (
+        data[write_ts_start:write_ts_end]
+        .reset_index(drop=False)
+        .set_index(['_raw_index','write_ts'])
+    )
+    return data 
+
+
+def get_file_path(file_name,data_name) -> str:
+    wf_type  = get_workflow_type()
+    if data_name is None:
+        file_path = f'{DIR_PATH}/{file_name}_{wf_type}.csv'
+    else:
+        folder_name = f'{file_name}_{wf_type}'
+        file_path   = f'{DIR_PATH}/{folder_name}/{data_name}.csv'
+        # 检查指定父目录下是否存在某个文件夹,如果不存在则创建它
+        folder_path = os.path.join(DIR_PATH, folder_name)
+        if not os.path.exists(folder_path):
+            os.makedirs(folder_path)
+    return file_path
+        
+def get_workflow_type() -> str:
+    # SIMULATOR_JOB_ID           仿真环境
+    # MONITOR_TYPE               监控环境
+    # OPTIM_CALC_LOG_ID          优化回算
+    # OPTIM_JOB_ID               实时优化环境
+    # JOB_SCHEDULE_CALC_TIME_NOW 任务计划
+    # WORKFLOW_DEBUG             调试环境
+    
+    wf_type = 'UNKNOW'
+    for each_type,each_type_code in zip(
+        ['SIMULATOR_JOB_ID','OPTIM_CALC_LOG_ID','OPTIM_JOB_ID','JOB_SCHEDULE_CALC_TIME_NOW','WORKFLOW_DEBUG'],
+        ['FZ','HS','YH','RW','TS']
+    ):
+        if each_type in os.environ:
+            wf_type = each_type_code
+    return wf_type

+ 5 - 2
components/_base_components.py

@@ -6,6 +6,7 @@ except:
 
 from ..model._base._base import BaseModel
 
+
 class BaseComponents(BaseModel):
     
     CONSTANT = {
@@ -17,8 +18,9 @@ class BaseComponents(BaseModel):
         'rho_air'  : 1.184,  # kg/m3 取了25度下的空气密度
     }
     
-    def __init__(self) -> None:
+    def __init__(self,name) -> None:
         super().__init__()
+        self.name = name
     
     def get_func_by_engine(engine:str) -> dict:
         if engine == 'pymc':
@@ -36,4 +38,5 @@ class BaseComponents(BaseModel):
             'WHERE' : WHERE,
             'GT'    : GT,
             'LT'    : LT,
-        }
+        }
+    

+ 21 - 33
components/coil_steam.py

@@ -8,8 +8,8 @@ from ._base_components import BaseComponents
 
 
 class SteamCoilFs(BaseComponents):
-    def __init__(self):
-        super().__init__()
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -24,19 +24,17 @@ class SteamCoilFs(BaseComponents):
         Fs = b1 * (ToutA - TinA) * FA + b2 * TinA + b3
         return {'Fs':Fs}
     
-    @classmethod
-    def prior(cls,name):
+    def prior(self):
         param = {
-            'b1': pm.HalfNormal(f'{name}_b1',sigma=10),
-            'b2': pm.Normal(f'{name}_b2',sigma=10),
-            'b3': pm.Normal(f'{name}_b3',sigma=10),
+            'b1': pm.HalfNormal(f'{self.name}_b1',sigma=10),
+            'b2': pm.Normal(f'{self.name}_b2',sigma=10),
+            'b3': pm.Normal(f'{self.name}_b3',sigma=10),
         }
         return param
 
-
 class SteamCoilFs2(BaseComponents):
-    def __init__(self):
-        super().__init__()
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(cls,TinA,ToutA,FA,param,engine):
@@ -60,43 +58,33 @@ class SteamCoilFs2(BaseComponents):
         )
         return {'FP':FP,'Fs':Fs}
 
-    @classmethod
-    def prior(cls,name):
+    def prior(self):
         param = {
-            'b1'          : pm.Normal(f'{name}_b1',mu=0.08,sigma=0.1,initval=0.08),
-            'b2'          : pm.Normal(f'{name}_b2',mu=3.65,sigma=0.02,initval=3.65),
-            'change_point': pm.Normal(f'{name}_change_point',mu=104,sigma=1,initval=104),
-            'a1'          : pm.Normal(f'{name}_a1',mu=17.5,sigma=1,initval=17.5),
-            'a2'          : pm.Normal(f'{name}_a2',mu=0.2,sigma=0.1,initval=0.2),
-            'a3'          : pm.Normal(f'{name}_a3',mu=1.5,sigma=0.1,initval=1.5),
+            'b1'          : pm.Normal(f'{self.name}_b1',mu=0.08,sigma=0.1,initval=0.08),
+            'b2'          : pm.Normal(f'{self.name}_b2',mu=3.65,sigma=0.02,initval=3.65),
+            'change_point': pm.Normal(f'{self.name}_change_point',mu=104,sigma=1,initval=104),
+            'a1'          : pm.Normal(f'{self.name}_a1',mu=17.5,sigma=1,initval=17.5),
+            'a2'          : pm.Normal(f'{self.name}_a2',mu=0.2,sigma=0.1,initval=0.2),
+            'a3'          : pm.Normal(f'{self.name}_a3',mu=1.5,sigma=0.1,initval=1.5),
         }
         return param
 
-class SteamCoilFs3(BaseComponents):
-    def __init__(self):
-        super().__init__()
+class SteamCoil(BaseComponents):
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
         cls,
         TinA,ToutA,FA,
-        HinA,HoutA,
         param,
         engine
     ):
-        b1   = param['b1']
-        b2   = param['b2']
-        b3   = param['b3']
-        Fs   = b1 * (ToutA - TinA) + b2 * ToutA + b3
+        Fs = (ToutA - TinA) * FA
         return {'Fs':Fs}
     
-    @classmethod
-    def prior(cls,name):
-        param = {
-            'b1': pm.HalfNormal(f'{name}_b1',sigma=10,initval=1),
-            'b2': pm.HalfNormal(f'{name}_b2',sigma=10,initval=1),
-            'b3': pm.Normal(f'{name}_b3',sigma=10,initval=0),
-        }
+    def prior(self):
+        param = {}
         return param
 
 

+ 20 - 21
components/coil_water.py

@@ -5,14 +5,14 @@ except:
     pass
 
 from ._base_components import BaseComponents
-from ..tools.enthalpy import get_Dew_from_HumRatio,get_Enthalpy_from_Tdb_and_HumRatio
+from ..tools.enthalpy import get_Dew_from_HumRatio
 from ..tools.enthalpy import get_HumRatio_from_Dew
 
 
 class CoolingCoil(BaseComponents):
     
-    def __init__(self, engine):
-        super().__init__(engine)
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -61,21 +61,20 @@ class CoolingCoil(BaseComponents):
         
         return {'ToutA':ToutA,'HoutA':HoutA,'DoutA':DoutA} 
 
-    @classmethod
-    def prior(cls,name) -> dict:
+    def prior(self) -> dict:
         param = {
-            'UA_dry'      : pm.HalfNormal(f'{name}_UA_dry',5,initval=10),
-            'UA_wet_ratio': pm.TruncatedNormal(f'{name}_UA_wet_ratio',mu=4,sigma=0.2,lower=1,upper=10,initval=4),
-            'Ts_adj'      : pm.HalfNormal(f'{name}_Ts_adj',5,initval=5),
-            'eta'         : pm.HalfNormal(f'{name}_eta',0.5,initval=0.5),
+            'UA_dry'      : pm.HalfNormal(f'{self.name}_UA_dry',5,initval=10),
+            'UA_wet_ratio': pm.TruncatedNormal(f'{self.name}_UA_wet_ratio',mu=4,sigma=0.2,lower=1,upper=10,initval=4),
+            'Ts_adj'      : pm.HalfNormal(f'{self.name}_Ts_adj',5,initval=5),
+            'eta'         : pm.HalfNormal(f'{self.name}_eta',0.5,initval=0.5),
         }
         return param
 
 
 class CoolingCoil2(BaseComponents):
     
-    def __init__(self, engine):
-        super().__init__(engine)
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -113,16 +112,16 @@ class CoolingCoil2(BaseComponents):
         HoutA    = get_HumRatio_from_Dew(DoutA,engine)
         
         return {'ToutA':ToutA,'HoutA':HoutA,'DoutA':DoutA,'FA':FA} 
-    
-    @classmethod
-    def prior(cls,name) -> dict:
+
+    def prior(self) -> dict:
         param = {
-            'beta1_T'    : pm.TruncatedNormal(f'{name}_beta1_T',mu=1,sigma=0.2,lower=0),
-            'beta2_T'    : pm.TruncatedNormal(f'{name}_beta2_T',mu=5,sigma=3,lower=0),
-            'beta3_T'    : pm.TruncatedNormal(f'{name}_beta3_T',mu=5,sigma=0.3,lower=0),
-            'beta1_D_adj': pm.TruncatedNormal(f'{name}_beta1_D_adj',mu=1,sigma=0.1,lower=0.1),
-            'beta2_D_adj': pm.TruncatedNormal(f'{name}_beta2_D_adj',mu=1,sigma=0.1,lower=0.1),
-            'beta3_D_adj': pm.TruncatedNormal(f'{name}_beta3_D_adj',mu=1,sigma=0.1,lower=0.1),
-            'beta4_D'    : pm.TruncatedNormal(f'{name}_beta4_D',mu=0.01,sigma=0.01,lower=0)
+            'beta1_T'    : pm.TruncatedNormal(f'{self.name}_beta1_T',mu=1,sigma=0.2,lower=0),
+            'beta2_T'    : pm.TruncatedNormal(f'{self.name}_beta2_T',mu=5,sigma=3,lower=0),
+            'beta3_T'    : pm.TruncatedNormal(f'{self.name}_beta3_T',mu=5,sigma=0.3,lower=0),
+            'beta1_D_adj': pm.TruncatedNormal(f'{self.name}_beta1_D_adj',mu=1,sigma=0.1,lower=0.1),
+            'beta2_D_adj': pm.TruncatedNormal(f'{self.name}_beta2_D_adj',mu=1,sigma=0.1,lower=0.1),
+            'beta3_D_adj': pm.TruncatedNormal(f'{self.name}_beta3_D_adj',mu=1,sigma=0.1,lower=0.1),
+            'beta4_D'    : pm.TruncatedNormal(f'{self.name}_beta4_D',mu=0.01,sigma=0.01,lower=0)
         }
         return param
+

+ 3 - 3
components/mixed.py

@@ -3,8 +3,8 @@ from ..tools.enthalpy import get_Dew_from_HumRatio
 
 class Mixed(BaseComponents):
     
-    def __init__(self):
-        super().__init__()
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -23,6 +23,6 @@ class Mixed(BaseComponents):
         return {'ToutA':ToutA,'HoutA':HoutA,'DoutA':DoutA,'FA':FA,'FM':FM}
     
     @classmethod
-    def prior(cls,name) -> dict:
+    def prior(cls) -> dict:
         param = {}
         return param

+ 21 - 22
components/wheel.py

@@ -8,10 +8,11 @@ from ._base_components import BaseComponents
 from ..tools.enthalpy import get_RH_from_Tdb_and_Hr
 from ..tools.enthalpy import get_Dew_from_HumRatio
 
+
 class WheelS2(BaseComponents):
     
-    def __init__(self):
-        super().__init__()
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -57,21 +58,20 @@ class WheelS2(BaseComponents):
             'ToutR':ToutR_mu,'HoutR':HoutR_mu,'DoutR':DoutR_mu,
         }
     
-    @classmethod
-    def prior(cls,name):
+    def prior(self):
         param = {
-            'beta_Q1': pm.HalfNormal(f'{name}_beta_Q1',sigma=10),
-            'beta_H1': pm.HalfNormal(f'{name}_beta_H1',sigma=10),
-            'beta_H2': pm.HalfNormal(f'{name}_beta_H2',sigma=10),
-            'beta_H3': pm.HalfNormal(f'{name}_beta_H3',sigma=10),
-            'beta_H4': pm.Uniform(f'{name}_beta_H4',lower=0,upper=1),
+            'beta_Q1': pm.HalfNormal(f'{self.name}_beta_Q1',sigma=10),
+            'beta_H1': pm.HalfNormal(f'{self.name}_beta_H1',sigma=10),
+            'beta_H2': pm.HalfNormal(f'{self.name}_beta_H2',sigma=10),
+            'beta_H3': pm.HalfNormal(f'{self.name}_beta_H3',sigma=10),
+            'beta_H4': pm.Uniform(f'{self.name}_beta_H4',lower=0,upper=1),
         }
         return param
 
 
 class WheelS3(BaseComponents):
-    def __init__(self):
-        super().__init__()
+    def __init__(self, name):
+        super().__init__(name)
     
     @classmethod
     def model(
@@ -131,17 +131,16 @@ class WheelS3(BaseComponents):
             'ToutC':ToutC,'HoutC':HoutC,'DoutC':DoutC,'FC':FC,
         } 
         
-    @classmethod
-    def prior(cls,name):
+    def prior(self):
         param = {
-            'beta_P1': pm.TruncatedNormal(f'{name}_beta_P1',mu=5,sigma=10,initval=5,lower=0),
-            'beta_P2': pm.TruncatedNormal(f'{name}_beta_P2',mu=0.5,sigma=1,initval=0.02,lower=0),
-            'beta_P3': pm.TruncatedNormal(f'{name}_beta_P3',mu=1,sigma=2,initval=1.5,lower=0),
-            'beta_P4': pm.TruncatedNormal(f'{name}_beta_P4',mu=1,sigma=0.3,initval=1,lower=0),
-            'beta_P5': pm.TruncatedNormal(f'{name}_beta_P5',mu=5,sigma=2,initval=5,lower=0),
-            'beta_C1': pm.TruncatedNormal(f'{name}_beta_C1',mu=60,sigma=10,initval=60,lower=10),
-            'beta_C2': pm.TruncatedNormal(f'{name}_beta_C2',mu=30,sigma=10,initval=30,lower=1),
-            'beta_C3': pm.TruncatedNormal(f'{name}_beta_C3',mu=0.05,sigma=0.1,initval=0.05,lower=0),
-            'beta_C4': pm.TruncatedNormal(f'{name}_beta_C4',mu=1,sigma=1,initval=1,lower=0),
+            'beta_P1': pm.TruncatedNormal(f'{self.name}_beta_P1',mu=5,sigma=10,initval=5,lower=0),
+            'beta_P2': pm.TruncatedNormal(f'{self.name}_beta_P2',mu=0.5,sigma=1,initval=0.02,lower=0),
+            'beta_P3': pm.TruncatedNormal(f'{self.name}_beta_P3',mu=1,sigma=2,initval=1.5,lower=0),
+            'beta_P4': pm.TruncatedNormal(f'{self.name}_beta_P4',mu=1,sigma=0.3,initval=1,lower=0),
+            'beta_P5': pm.TruncatedNormal(f'{self.name}_beta_P5',mu=5,sigma=2,initval=5,lower=0),
+            'beta_C1': pm.TruncatedNormal(f'{self.name}_beta_C1',mu=60,sigma=10,initval=60,lower=10),
+            'beta_C2': pm.TruncatedNormal(f'{self.name}_beta_C2',mu=30,sigma=10,initval=30,lower=1),
+            'beta_C3': pm.TruncatedNormal(f'{self.name}_beta_C3',mu=0.05,sigma=0.1,initval=0.05,lower=0),
+            'beta_C4': pm.TruncatedNormal(f'{self.name}_beta_C4',mu=1,sigma=1,initval=1,lower=0),
         }
         return param

+ 0 - 0
doc/整体框架.drawio → doc/DHU_A.drawio


+ 40 - 0
doc/框架.drawio

@@ -0,0 +1,40 @@
+<mxfile host="65bd71144e">
+    <diagram id="nKO4jLVDNhHVAdHXDorT" name="Page-1">
+        <mxGraphModel dx="1382" dy="1951" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="850" pageHeight="1100" math="0" shadow="0">
+            <root>
+                <mxCell id="0"/>
+                <mxCell id="1" parent="0"/>
+                <mxCell id="2" value="配置文件" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="260" y="-110" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="20" style="edgeStyle=none;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" source="3" target="13">
+                    <mxGeometry relative="1" as="geometry"/>
+                </mxCell>
+                <mxCell id="3" value="加载数据&lt;br&gt;(模型输入)" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="160" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="8" style="edgeStyle=none;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" source="5" target="7">
+                    <mxGeometry relative="1" as="geometry"/>
+                </mxCell>
+                <mxCell id="5" value="训练模型" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="220" y="320" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="7" value="保存模型" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="220" y="420" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="13" value="模型监控" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="400" y="210" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="15" value="实时优化" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="30" y="210" width="120" height="60" as="geometry"/>
+                </mxCell>
+                <mxCell id="21" style="edgeStyle=none;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;" edge="1" parent="1" source="19" target="13">
+                    <mxGeometry relative="1" as="geometry"/>
+                </mxCell>
+                <mxCell id="19" value="加载数据&lt;br&gt;(模型输出)" style="rounded=0;whiteSpace=wrap;html=1;" vertex="1" parent="1">
+                    <mxGeometry x="400" width="120" height="60" as="geometry"/>
+                </mxCell>
+            </root>
+        </mxGraphModel>
+    </diagram>
+</mxfile>

+ 71 - 44
model/DHU/DHU_A.py

@@ -5,12 +5,13 @@ import pytensor.tensor as pt
 
 from .._base._base_device import BaseDevice
 from ...components.coil_water import CoolingCoil2
-from ...components.coil_steam import SteamCoilFs,SteamCoilFs2
+from ...components.coil_steam import SteamCoilFs,SteamCoilFs2,SteamCoil
 from ...components.wheel import WheelS3
 from ...components.mixed import Mixed
 from ..utils.fit_utils import (
     observe,record,reorder_posterior,get_fitted_result
 )
+from ...tools.optimizer import optimizer
 
 
 
@@ -19,8 +20,8 @@ class DHU_A(BaseDevice):
     model_input_data_columns = {
         'Tin_F'       : 'coil_1_ToutA',
         'Hin_F'       : 'coil_1_HoutA',
-        'HzP'         : 'HzP',
-        'HzR'         : 'HzR',
+        'fan_1_Hz'    : 'fan_1_Hz',
+        'fan_2_Hz'    : 'fan_2_Hz',
         'coil_1_TinW' : 'coil_1_TinW',
         'coil_2_TinW' : 'coil_2_TinW',
         'coil_3_TinW' : 'coil_3_TinW',
@@ -53,16 +54,19 @@ class DHU_A(BaseDevice):
     
     def __init__(self) -> None:
         super().__init__()
-        self.components = {
-            'wheel_1'    : WheelS3,
-            'wheel_2'    : WheelS3,
-            'coil_2'     : CoolingCoil2,
-            'coil_3'     : CoolingCoil2,
-            'steamcoil_1': SteamCoilFs2,
-            'steamcoil_2': SteamCoilFs,
-            'mixed_1'    : Mixed,
-            'mixed_2'    : Mixed,
-        }
+        self.components = [
+            WheelS3('wheel_1'),
+            WheelS3('wheel_2'),
+            CoolingCoil2('coil_2'),
+            CoolingCoil2('coil_3'),
+            # SteamCoil('steamcoil_1'),
+            # SteamCoil('steamcoil_2'),
+            SteamCoilFs2('steamcoil_1'),
+            SteamCoilFs('steamcoil_2'),
+            Mixed('mixed_1'),
+            Mixed('mixed_2'),
+        ]
+        self.components = {comp.name:comp for comp in self.components}
         
     def fit(
         self,
@@ -72,7 +76,7 @@ class DHU_A(BaseDevice):
         plot_TVP     : bool = True
     ):
         with pm.Model() as self.MODEL_PYMC:
-            param_prior = {name:comp.prior(name) for name,comp in self.components.items()}
+            param_prior = {name:comp.prior() for name,comp in self.components.items()}
             param_prior['F_air'] = AirFlow.prior(rw_FA_val=rw_FA_val,N=len(input_data))
             
             res = DHU_A.model(
@@ -93,18 +97,12 @@ class DHU_A(BaseDevice):
             observe('wheel_2_ToutP',res['wheel_2']['ToutP'],observed=observed_data)
             observe('wheel_2_DoutP',res['wheel_2']['DoutP'],observed=observed_data)
             observe('wheel_2_ToutR',res['wheel_2']['ToutR'],observed=observed_data)
-            if {'steamcoil_1_FP','steamcoil_1_Fs','steamcoil_2_Fs'}.issubset(observed_data.columns):
-                # 在有传感器的情况下估计蒸气流量
-                observe('steamcoil_1_FP',res['steamcoil_1']['FP'],observed=observed_data,sigma=1000)
-                observe('steamcoil_1_Fs',res['steamcoil_1']['Fs'],observed=observed_data,sigma=20)
-                observe('steamcoil_2_Fs',res['steamcoil_2']['Fs'],observed=observed_data,sigma=20)
-            elif {'steamcoil_1_Val','steamcoil_2_Val'}.issubset(observed_data.columns):
-                observe('steamcoil_1_Val',res['steamcoil_1']['Val'],observed=observed_data,sigma=20)
-                observe('steamcoil_2_Val',res['steamcoil_2']['Val'],observed=observed_data,sigma=20)
-                record('steamcoil_1_Fs',res['steamcoil_1']['Fs'])
-                record('steamcoil_2_Fs',res['steamcoil_2']['Fs'])
-            else:
-                raise Exception('WRONG')
+            
+            observe('steamcoil_1_FP',res['steamcoil_1']['FP'],observed=observed_data,sigma=1000)
+            observe('steamcoil_1_Fs',res['steamcoil_1']['Fs'],observed=observed_data,sigma=20)
+            observe('steamcoil_2_Fs',res['steamcoil_2']['Fs'],observed=observed_data,sigma=20)
+            # record('steamcoil_1_Fs',res['steamcoil_1']['Fs'])
+            # record('steamcoil_2_Fs',res['steamcoil_2']['Fs'])
             
             record('wheel_2_ToutC',res['wheel_2']['ToutC'])
             record('mixed_2_ToutA',res['mixed_2']['ToutA'])
@@ -116,16 +114,16 @@ class DHU_A(BaseDevice):
             record('F_air_S',res['Fa']['Fa_S'])
             record('F_air_H',res['Fa']['Fa_H'])
             record('F_air_X',res['Fa']['Fa_X'])
-            
             self.param_posterior = pm.find_MAP(maxeval=50000,include_transformed=False)
-            self.record_model(
-                model_name   = 'DHU',
-                model        = reorder_posterior(param_prior,self.param_posterior),
-                train_data   = {'x':np.array([1])},
-                train_metric = {'R2':1,'MAE':1,'MAPE':1}
-            )
-            self.TVP_data,self.TVP_metric = get_fitted_result(self.param_posterior,observed_data,plot_TVP)
-            return self
+            
+        self.record_model(
+            model_name   = 'DHU',
+            model        = reorder_posterior(param_prior,self.param_posterior),
+            train_data   = {'x':np.array([1])},
+            train_metric = {'R2':1,'MAE':1,'MAPE':1}
+        )
+        self.TVP_data,self.TVP_metric = get_fitted_result(self.param_posterior,observed_data,plot_TVP)
+        return self
     
     def predict(self,input_data:pd.DataFrame) -> dict:
         param_posterior = self.model_info['model_DHU']
@@ -146,15 +144,44 @@ class DHU_A(BaseDevice):
         system_output       = pd.DataFrame(system_output)
         system_output['Fs'] = system_output.steamcoil_1_Fs + system_output.steamcoil_2_Fs
         return system_output
-        
+    
+    def optimize(
+        self,
+        cur_input_data : pd.DataFrame,
+        wheel_1_TinR_ub: float = 120,
+        wheel_1_TinR_lb: float = 70,
+        wheel_2_TinR_ub: float = 120,
+        wheel_2_TinR_lb: float = 70,
+        constrains     : list  = None
+    ) -> list:
+        constrains = [] if constrains is None else constrains
+        cur_input_data = cur_input_data.iloc[[0],:]
+        opt_var_boundary  = {
+            'wheel_1_TinR':{'lb':wheel_1_TinR_lb,'ub':wheel_1_TinR_ub},
+            'wheel_2_TinR':{'lb':wheel_2_TinR_lb,'ub':wheel_2_TinR_ub},
+        }
+        opt_var_value = cur_input_data.loc[:,list(opt_var_boundary.keys())]
+        oth_var_value = (
+            cur_input_data
+            .loc[:,list(self.model_input_data_columns.values())]
+            .drop(opt_var_value.columns,axis=1)
+        )
+        opt_res = optimizer(
+            model            = self,
+            opt_var_boundary = opt_var_boundary,
+            opt_var_value    = opt_var_value,
+            oth_var_value    = oth_var_value,
+            constrains       = constrains
+        )
+        return opt_res
     
     @classmethod
     def model(
         cls,
         Tin_F,        # 前表冷后温度
         Hin_F,        # 前表冷后湿度
-        HzP,          # 处理侧风机频率 
-        HzR,          # 再生侧风机频率
+        fan_1_Hz,     # 处理侧风机频率 
+        fan_2_Hz,     # 再生侧风机频率
         coil_1_TinW,  # 前表冷进水温度
         coil_2_TinW,  # 中表冷进水温度
         coil_3_TinW,  # 后表冷进水温度
@@ -176,7 +203,7 @@ class DHU_A(BaseDevice):
         coil_2_FW = coil_2_Val / 100
         coil_3_FW = coil_3_Val / 100
         # 空气的质量流量
-        air_flow = AirFlow.model(HzP=HzP,HzR=HzR,param=param)
+        air_flow = AirFlow.model(fan_1_Hz=fan_1_Hz,fan_2_Hz=fan_2_Hz,param=param)
         
         # 前转轮
         wheel_1_res = components['wheel_1'].model(
@@ -317,7 +344,7 @@ class DHU_A(BaseDevice):
 class AirFlow:
     
     @classmethod
-    def model(cls,HzP,HzR,param):
+    def model(cls,fan_1_Hz,fan_2_Hz,param):
         # 空气的质量流量
         F_air_HzP_H = param['F_air']['HzP_H']
         F_air_HzP_X = param['F_air']['HzP_X']
@@ -335,10 +362,10 @@ class AirFlow:
         F_air_H_base_adj = F_air_H_base - F_air_val_rw * F_air_val_pct
         F_air_B_base_adj = F_air_B_base - F_air_val_rw * (1 - F_air_val_pct)
         
-        Fa_S = F_air_S_base + F_air_HzP_S * (HzP / 50)            
-        Fa_H = F_air_H_base_adj + F_air_HzP_H * (HzP / 50)            
-        Fa_X = F_air_X_base_adj + F_air_HzP_X * (HzP / 50)
-        Fa_B = F_air_B_base_adj + F_air_HzR_B * (HzR / 50)           
+        Fa_S = F_air_S_base + F_air_HzP_S * (fan_1_Hz / 50)            
+        Fa_H = F_air_H_base_adj + F_air_HzP_H * (fan_1_Hz / 50)            
+        Fa_X = F_air_X_base_adj + F_air_HzP_X * (fan_1_Hz / 50)
+        Fa_B = F_air_B_base_adj + F_air_HzR_B * (fan_2_Hz / 50)           
         Fa_P = Fa_B + Fa_X + Fa_H - Fa_S  
         
         wheel_1_FaP    = Fa_S - Fa_H

+ 0 - 0
model/DHU/SDHU_A.py


+ 68 - 1
model/_base/_base_device.py

@@ -1,3 +1,7 @@
+import numpy as np
+import pandas as pd
+import plotnine as gg
+
 from ._base import BaseModel
 
 class BaseDevice(BaseModel):
@@ -5,4 +9,67 @@ class BaseDevice(BaseModel):
     def __init__(self) -> None:
         super().__init__()
     
-    
+    def curve(
+        self,
+        input_data: pd.DataFrame,
+        x         : str,
+        y         : str,
+        color     : str = None,
+        grid_x    : str = None,
+        grid_y    : str = None,
+    ):
+        
+        if x not in input_data.columns: 
+            raise Exception(f'{x} is not in input_data')
+        
+        product = [np.linspace(input_data.loc[:,x].min(),input_data.loc[:,x].max(),100)]
+        names   = [x]
+        
+        if color is not None:
+            if color not in input_data.columns: 
+                raise Exception(f'{color} is not in input_data')
+            product.append(np.quantile(input_data.loc[:,color],q=[0.25,0.5,0.75]))
+            names.append(color)
+        if grid_x is not None:
+            if grid_x not in input_data.columns: 
+                raise Exception(f'{grid_x} is not in input_data')
+            product.append(np.quantile(input_data.loc[:,grid_x],q=[0.25,0.5,0.75]))
+            names.append(grid_x)
+        if grid_y is not None:
+            if grid_y not in input_data.columns: 
+                raise Exception(f'{grid_y} is not in input_data')
+            product.append(np.quantile(input_data.loc[:,grid_y],q=[0.25,0.5,0.75]))
+            names.append(grid_y)
+        
+        curve_input = (
+            pd.MultiIndex.from_product(
+                product,
+                names=names
+            )
+            .to_frame(index=False)
+        )
+        curve_input_all = curve_input.copy(deep=True)
+        
+        for col in input_data.columns:
+            if col not in curve_input_all:
+                curve_input_all.loc[:,col] = input_data.loc[:,col].median()
+        pred_data  = self.predict_system(curve_input_all)
+        if y not in pred_data.columns:
+            raise Exception(f'{y} is not in Prediction')
+        
+        curve_data = pd.concat([curve_input,pred_data],axis=1)
+        plot = (
+            curve_data
+            .round(2)
+            .pipe(gg.ggplot)
+            + gg.aes(x=x,y=y)
+            + gg.geom_line()
+        )
+        if color is not None:
+            plot += gg.aes(color=f'factor({color})',group=color)
+            plot += gg.labs(color=color)
+        
+        if grid_x is not None or grid_y is not None:
+            plot += gg.facet_grid(rows=grid_x,cols=grid_y,labeller='label_both')
+        
+        return plot

+ 44 - 0
tools/config_reader.py

@@ -0,0 +1,44 @@
+from typing import Union
+
+import pandas as pd
+
+class ConfigReader:
+    
+    def __init__(self,path):
+        self.path  = path
+        self.equp  = pd.read_excel(self.path,sheet_name='设备')
+        self.point = pd.read_excel(self.path,sheet_name='点位')
+    
+    @property
+    def all_equp_names(self):
+        return self.equp.loc[:,'编号'].to_list()
+    
+    def get_equp_info(self,equp_name) -> dict:
+        self.check_equp_is_exist(equp_name)
+        equp_info = self.equp.loc[self.equp.loc[:,'编号']==equp_name,'名称':].to_dict(orient='list')
+        equp_info = {k:v[0] for k,v in equp_info.items()}
+        return equp_info
+    
+    def get_equp_point(self,equp_name,equp_class:Union[str,list]=None) -> dict:
+        
+        # 点位类型
+        # A: 模型输入
+        # B: 模型输出
+        
+        self.check_equp_is_exist(equp_name)
+        equp_class = self.point.类型.to_list() if equp_class is None else equp_class
+        equp_class = [equp_class] if isinstance(equp_class,str) else equp_class
+        point_info = self.point.loc[self.point.类型.isin(equp_class)]
+        point_map  = dict(zip(point_info.编号.to_list(),point_info.点位.to_list()))
+        # 根据配置情况更新全局点位
+        if equp_name in self.point.columns:
+            for point_name in point_map.keys():
+                new_point_id = self.point.loc[self.point.编号==point_name,equp_name].iat[0]
+                if isinstance(new_point_id,str):
+                    point_map[point_name] = new_point_id
+        return point_map
+        
+    
+    def check_equp_is_exist(self,equp_name):
+        if equp_name not in self.all_equp_names:
+            raise Exception(f"设备{equp_name}不存在")

+ 122 - 0
tools/data_loader.py

@@ -0,0 +1,122 @@
+import os
+import re
+from pathlib import Path
+import shutil
+from typing import Union
+
+import numpy as np
+import pandas as pd
+import psychrolib
+psychrolib.SetUnitSystem(psychrolib.SI)
+get_Dew = np.vectorize(psychrolib.GetTDewPointFromRelHum)
+get_Hr  = np.vectorize(psychrolib.GetHumRatioFromTDewPoint)
+
+from .._data.main import get_data
+
+class DataLoader:
+    def __init__(self,path,start_time,end_time):
+        self.path        = path
+        self.start_time  = start_time
+        self.end_time    = end_time
+        self.int_time    = 'min'
+        self.date_range  = pd.date_range(start=self.start_time,end=self.end_time,freq=self.int_time)
+        
+    
+    def dowload_equp_data(
+        self,
+        equp_name  : str,
+        point      : dict,
+        url        : str,
+        clean_cache: bool
+    ):
+        equp_path = os.path.join(self.path,equp_name)
+        if clean_cache:
+            shutil.rmtree(equp_path)
+        if not os.path.exists(equp_path): 
+            os.makedirs(equp_path)
+        
+        for point_name,point_class in point.items():
+            point_path  = os.path.join(equp_path,f'{point_name}.pkl')
+            point_class = str(point_class)
+            
+            if point_class in ['/']:
+                continue
+                
+            # 纯数字
+            elif bool(re.match(r'^[-+]?(\d+(\.\d*)?|\.\d+)$', point_class)):
+                point_value = float(point_class)
+                data        = pd.DataFrame({point_name:point_value},index=self.date_range)
+                pd.to_pickle(data,point_path)
+            
+            # 公式:干球温度和相对湿度计算露点
+            elif bool(re.match(r'^Dew\(.*?\)$',point_class)):
+                Tdb, RH     = point_class.strip('Dew(').strip(')').split(',')
+                points_id   = [f'{equp_name}_{Tdb}',f'{equp_name}_{RH}']
+                points_path = [point_path.replace('_D','_T'),point_path.replace('_D','_R')]
+                for point_id,point_path in zip(points_id,points_path):
+                    get_data(
+                        points_id = [point_id],
+                        time_start= self.start_time,
+                        time_end  = self.end_time,
+                        int_time  = 'M',
+                        url       = url,
+                        from_cache= True,
+                        PATH      = Path(point_path)
+                    )
+            
+            # 非该设备的点位
+            elif bool(re.match(r'^\[.*\]$',point_class)):
+                get_data(
+                    points_id = [point_class.replace('[','').replace(']','')],
+                    time_start= self.start_time,
+                    time_end  = self.end_time,
+                    int_time  = 'M',
+                    url       = url,
+                    from_cache= True,
+                    PATH      = Path(point_path)
+                )
+            
+            # 正常点位
+            else:
+                get_data(
+                    points_id = [f'{equp_name}_{point_class}'],
+                    time_start= self.start_time,
+                    time_end  = self.end_time,
+                    int_time  = 'M',
+                    url       = url,
+                    from_cache= True,
+                    PATH      = Path(point_path)
+                )
+        
+        # 补齐未指定的数据
+        all_file_path = os.listdir(equp_path)
+        for file in all_file_path:
+            # 通过干球温度和相对湿度计算露点
+            if '_T' in file and file.replace('_T','_R') in all_file_path:
+                Tdb = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
+                RH  = pd.read_pickle(os.path.join(equp_path,file.replace('_T','_R'))).iloc[:,0].values
+                Dew = pd.DataFrame({file.replace('_T','_D'):get_Dew(Tdb,np.clip(RH,0,100)/100)},index=self.date_range)
+                pd.to_pickle(Dew,os.path.join(equp_path,file.replace('_T','_D')))
+            # 通过露点计算绝对湿度
+            if '_D' in file:
+                Dew = pd.read_pickle(os.path.join(equp_path,file)).iloc[:,0].values
+                Hr  = pd.DataFrame({file.replace('_D','_H'):get_Hr(Dew,101325)},index=self.date_range)
+                pd.to_pickle(Hr,os.path.join(equp_path,file.replace('_D','_H')))
+                
+    
+    def get_equp_data(self,equp_name:str):
+        equp_path     = os.path.join(self.path,equp_name)
+        all_file_path = os.listdir(equp_path)
+        all_data      = []
+        for file in all_file_path:
+            if '.pkl' not in file:
+                continue
+            data_path = os.path.join(equp_path,file)
+            data = (
+                pd.read_pickle(data_path)
+                .set_axis([file.replace('.pkl','')],axis=1)
+                .loc[self.start_time:self.end_time,:]
+            )
+            all_data.append(data)
+        all_data = pd.concat(all_data,axis=1)
+        return all_data

+ 76 - 0
tools/optimizer.py

@@ -0,0 +1,76 @@
+import time 
+
+import pandas as pd
+
+from ..model._base._base_device import BaseDevice
+from .._opt.algorithm.sim_config import simulate_config as sim_opt_config
+from .._opt.algorithm.main import main as main_opt
+from .._opt.boundary.sim_config import simulate_config as sim_config_bound
+from .._opt.boundary.main import main as main_bound
+from .._opt.algorithm.model.model import SystemModel
+
+def optimizer(
+    model           : BaseDevice,
+    opt_var_boundary: dict,
+    opt_var_value   : pd.DataFrame,
+    oth_var_value   : pd.DataFrame,
+    constrains      : list
+):
+    var_name_opt  = []
+    boundary_info = {}
+    for var_name,var_info in opt_var_boundary.items():
+        var_name_opt.append(var_name)
+        boundary_info[var_name] = main_bound(
+            None,None,None,
+            opt_var_value.iloc[[0],:].loc[:,[var_name]],
+            config = sim_config_bound(
+                opt_var    = [var_name],
+                syn_opt    = False,
+                var_type   = True,
+                lb_static  = var_info['lb'],
+                ub_static  = var_info['ub'],
+                var_precis = var_info.get('precis',1)
+            )
+        )
+    var_name_oth = []
+    oth_var_info = {}
+    for var_name in oth_var_value.columns:
+        var_name_oth.append(var_name)
+        oth_var_info[var_name] = oth_var_value.iloc[[0],:].loc[:,[var_name]]
+
+    opt_config = sim_opt_config(
+        target        = 'Fs',
+        dir_min       = True,
+        var_opt       = var_name_opt,
+        var_sys       = var_name_oth,
+        diag_model    = False,
+        algorithm     = 'soea_DE_best_1_L_templet',
+        NIND          = 1000,
+        MAXGEN        = 200,
+        constrains    = constrains,
+        allow_neg_opt = False,
+    )
+    
+    opt_output  = main_opt(
+        *list(boundary_info.values()),
+        *list(oth_var_info.values()),
+        system_model = AirSystem(model=model),
+        config       = opt_config
+    )
+    return opt_output
+
+class AirSystem(SystemModel):
+    def __init__(self,model):
+        super().__init__()
+        self.model = model
+    
+    def predict(self,data:pd.DataFrame) -> pd.DataFrame:
+        time_start   = time.time()
+        self.PENALTY = {}
+        self.index   = data.index
+        sys_out      = self.model.predict_system(data)
+        time_end     = time.time()
+        past_time    = round(time_end-time_start,2)
+        self.PAST_TIME.append(past_time)
+        # print(f'第{len(self.PAST_TIME)}次调用系统模型,本次调用时长为:{past_time}秒 \n')    
+        return sys_out