| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import os
- import numpy as np
- from typing import Union
- from datetime import datetime
- import pandas as pd
- DIR_PATH = '/mnt/workflow_data'
- def write_file(
- data : Union[pd.DataFrame,dict,None],
- file_name : str,
- keep_n : Union[int,None] = None,
- keep_index: bool = True,
- ) -> None:
-
- def write(
- file_path: str,
- df : pd.DataFrame,
- time : datetime
- ):
- if keep_index:
- df = df.rename_axis(index='_raw_index').reset_index(drop=False)
- else:
- df = df.reset_index(drop=True)
-
- current_df = (
- df
- .assign(_index=np.arange(len(df)))
- .melt(id_vars='_index',var_name='variable',value_name='values')
- .assign(write_ts=time)
- )
-
- if os.path.exists(file_path):
- exist_df = pd.read_csv(file_path)
- save_df = pd.concat([exist_df,current_df],axis=0)
- else:
- save_df = current_df
-
- save_df.write_ts = pd.to_datetime(save_df.write_ts)
-
- if isinstance(keep_n,int):
- keep_time = save_df.write_ts.sort_values().drop_duplicates().nlargest(keep_n).nsmallest(1).iat[0]
- save_df = save_df.loc[lambda dt:dt.write_ts>= keep_time]
- print(f'仅保留{keep_time}以后的数据')
-
- save_df.to_csv(file_path,index=False)
- print(f'文件已保存:{file_path}')
-
- now = datetime.now()
-
- if isinstance(data,pd.DataFrame):
- file_path = get_file_path(file_name=file_name,data_name=None)
- write(file_path=file_path,df=data,time=now)
-
- elif isinstance(data,dict):
- for df_name,df in data.items():
- if not isinstance(df,pd.DataFrame):
- continue
- file_path = get_file_path(file_name=file_name,data_name=df_name)
- write(file_path=file_path,df=df,time=now)
-
- elif data is None:
- return None
-
- else:
- raise Exception('Wrong Result Type')
- def write_file_simple(data:pd.DataFrame,file_name:str,path_name=None,overwrite=False):
- path_name = path_name or DIR_PATH
- file_path = f'{path_name}/{file_name}.csv'
- if os.path.exists(file_path) and not overwrite:
- exist_df = pd.read_csv(file_path)
- save_df = pd.concat([exist_df,data],axis=0)
- else:
- save_df = data
- save_df.to_csv(file_path,index=False)
- def read_file(
- file_name,
- data_name,
- file_path = None,
- write_ts_start = None,
- write_ts_end = None,
- ) -> pd.DataFrame:
- if file_path is None:
- file_path = get_file_path(file_name=file_name,data_name=data_name)
- data = (
- pd.read_csv(file_path)
- .assign(write_ts=lambda dt:pd.to_datetime(dt.write_ts))
- .pivot(index=['write_ts','_index'],columns='variable',values='values')
- .reset_index(level=1,drop=True)
- .rename_axis(columns=None)
- )
- write_ts_start = data.index.min() if write_ts_start is None else write_ts_start
- write_ts_end = data.index.max() if write_ts_end is None else write_ts_end
- data = (
- data[write_ts_start:write_ts_end]
- .reset_index(drop=False)
- .set_index(['_raw_index','write_ts'])
- )
- return data
- def get_file_path(file_name,data_name) -> str:
- wf_type = get_workflow_type()
- if data_name is None:
- file_path = f'{DIR_PATH}/{file_name}_{wf_type}.csv'
- else:
- folder_name = f'{file_name}_{wf_type}'
- file_path = f'{DIR_PATH}/{folder_name}/{data_name}.csv'
- # 检查指定父目录下是否存在某个文件夹,如果不存在则创建它
- folder_path = os.path.join(DIR_PATH, folder_name)
- if not os.path.exists(folder_path):
- os.makedirs(folder_path)
- return file_path
-
- def get_workflow_type() -> str:
- # SIMULATOR_JOB_ID 仿真环境
- # MONITOR_TYPE 监控环境
- # OPTIM_CALC_LOG_ID 优化回算
- # OPTIM_JOB_ID 实时优化环境
- # JOB_SCHEDULE_CALC_TIME_NOW 任务计划
- # WORKFLOW_DEBUG 调试环境
-
- wf_type = 'UNKNOW'
- for each_type,each_type_code in zip(
- ['SIMULATOR_JOB_ID','OPTIM_CALC_LOG_ID','OPTIM_JOB_ID','JOB_SCHEDULE_CALC_TIME_NOW','WORKFLOW_DEBUG'],
- ['FZ','HS','YH','RW','TS']
- ):
- if each_type in os.environ:
- wf_type = each_type_code
- return wf_type
|