import os import numpy as np from typing import Union from datetime import datetime import pandas as pd DIR_PATH = '/mnt/workflow_data' def write_file( data : Union[pd.DataFrame,dict,None], file_name : str, keep_n : Union[int,None] = None, keep_index: bool = True, ) -> None: def write( file_path: str, df : pd.DataFrame, time : datetime ): if keep_index: df = df.rename_axis(index='_raw_index').reset_index(drop=False) else: df = df.reset_index(drop=True) current_df = ( df .assign(_index=np.arange(len(df))) .melt(id_vars='_index',var_name='variable',value_name='values') .assign(write_ts=time) ) if os.path.exists(file_path): exist_df = pd.read_csv(file_path) save_df = pd.concat([exist_df,current_df],axis=0) else: save_df = current_df save_df.write_ts = pd.to_datetime(save_df.write_ts) if isinstance(keep_n,int): keep_time = save_df.write_ts.sort_values().drop_duplicates().nlargest(keep_n).nsmallest(1).iat[0] save_df = save_df.loc[lambda dt:dt.write_ts>= keep_time] print(f'仅保留{keep_time}以后的数据') save_df.to_csv(file_path,index=False) print(f'文件已保存:{file_path}') now = datetime.now() if isinstance(data,pd.DataFrame): file_path = get_file_path(file_name=file_name,data_name=None) write(file_path=file_path,df=data,time=now) elif isinstance(data,dict): for df_name,df in data.items(): if not isinstance(df,pd.DataFrame): continue file_path = get_file_path(file_name=file_name,data_name=df_name) write(file_path=file_path,df=df,time=now) elif data is None: return None else: raise Exception('Wrong Result Type') def write_file_simple(data:pd.DataFrame,file_name:str,path_name=None,overwrite=False): path_name = path_name or DIR_PATH file_path = f'{path_name}/{file_name}.csv' if os.path.exists(file_path) and not overwrite: exist_df = pd.read_csv(file_path) save_df = pd.concat([exist_df,data],axis=0) else: save_df = data save_df.to_csv(file_path,index=False) def read_file( file_name, data_name, file_path = None, write_ts_start = None, write_ts_end = None, ) -> pd.DataFrame: if file_path is None: file_path = get_file_path(file_name=file_name,data_name=data_name) data = ( pd.read_csv(file_path) .assign(write_ts=lambda dt:pd.to_datetime(dt.write_ts)) .pivot(index=['write_ts','_index'],columns='variable',values='values') .reset_index(level=1,drop=True) .rename_axis(columns=None) ) write_ts_start = data.index.min() if write_ts_start is None else write_ts_start write_ts_end = data.index.max() if write_ts_end is None else write_ts_end data = ( data[write_ts_start:write_ts_end] .reset_index(drop=False) .set_index(['_raw_index','write_ts']) ) return data def get_file_path(file_name,data_name) -> str: wf_type = get_workflow_type() if data_name is None: file_path = f'{DIR_PATH}/{file_name}_{wf_type}.csv' else: folder_name = f'{file_name}_{wf_type}' file_path = f'{DIR_PATH}/{folder_name}/{data_name}.csv' # 检查指定父目录下是否存在某个文件夹,如果不存在则创建它 folder_path = os.path.join(DIR_PATH, folder_name) if not os.path.exists(folder_path): os.makedirs(folder_path) return file_path def get_workflow_type() -> str: # SIMULATOR_JOB_ID 仿真环境 # MONITOR_TYPE 监控环境 # OPTIM_CALC_LOG_ID 优化回算 # OPTIM_JOB_ID 实时优化环境 # JOB_SCHEDULE_CALC_TIME_NOW 任务计划 # WORKFLOW_DEBUG 调试环境 wf_type = 'UNKNOW' for each_type,each_type_code in zip( ['SIMULATOR_JOB_ID','OPTIM_CALC_LOG_ID','OPTIM_JOB_ID','JOB_SCHEDULE_CALC_TIME_NOW','WORKFLOW_DEBUG'], ['FZ','HS','YH','RW','TS'] ): if each_type in os.environ: wf_type = each_type_code return wf_type