wf_file_io.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import os
  2. import numpy as np
  3. from typing import Union
  4. from datetime import datetime
  5. import pandas as pd
  6. DIR_PATH = '/mnt/workflow_data'
  7. def write_file(
  8. data : Union[pd.DataFrame,dict,None],
  9. file_name : str,
  10. keep_n : Union[int,None] = None,
  11. keep_index: bool = True,
  12. ) -> None:
  13. def write(
  14. file_path: str,
  15. df : pd.DataFrame,
  16. time : datetime
  17. ):
  18. if keep_index:
  19. df = df.rename_axis(index='_raw_index').reset_index(drop=False)
  20. else:
  21. df = df.reset_index(drop=True)
  22. current_df = (
  23. df
  24. .assign(_index=np.arange(len(df)))
  25. .melt(id_vars='_index',var_name='variable',value_name='values')
  26. .assign(write_ts=time)
  27. )
  28. if os.path.exists(file_path):
  29. exist_df = pd.read_csv(file_path)
  30. save_df = pd.concat([exist_df,current_df],axis=0)
  31. else:
  32. save_df = current_df
  33. save_df.write_ts = pd.to_datetime(save_df.write_ts)
  34. if isinstance(keep_n,int):
  35. keep_time = save_df.write_ts.sort_values().drop_duplicates().nlargest(keep_n).nsmallest(1).iat[0]
  36. save_df = save_df.loc[lambda dt:dt.write_ts>= keep_time]
  37. print(f'仅保留{keep_time}以后的数据')
  38. save_df.to_csv(file_path,index=False)
  39. print(f'文件已保存:{file_path}')
  40. now = datetime.now()
  41. if isinstance(data,pd.DataFrame):
  42. file_path = get_file_path(file_name=file_name,data_name=None)
  43. write(file_path=file_path,df=data,time=now)
  44. elif isinstance(data,dict):
  45. for df_name,df in data.items():
  46. if not isinstance(df,pd.DataFrame):
  47. continue
  48. file_path = get_file_path(file_name=file_name,data_name=df_name)
  49. write(file_path=file_path,df=df,time=now)
  50. elif data is None:
  51. return None
  52. else:
  53. raise Exception('Wrong Result Type')
  54. def write_file_simple(data:pd.DataFrame,file_name:str,path_name=None,overwrite=False):
  55. path_name = path_name or DIR_PATH
  56. file_path = f'{path_name}/{file_name}.csv'
  57. if os.path.exists(file_path) and not overwrite:
  58. exist_df = pd.read_csv(file_path)
  59. save_df = pd.concat([exist_df,data],axis=0)
  60. else:
  61. save_df = data
  62. save_df.to_csv(file_path,index=False)
  63. def read_file(
  64. file_name,
  65. data_name,
  66. file_path = None,
  67. write_ts_start = None,
  68. write_ts_end = None,
  69. ) -> pd.DataFrame:
  70. if file_path is None:
  71. file_path = get_file_path(file_name=file_name,data_name=data_name)
  72. data = (
  73. pd.read_csv(file_path)
  74. .assign(write_ts=lambda dt:pd.to_datetime(dt.write_ts))
  75. .pivot(index=['write_ts','_index'],columns='variable',values='values')
  76. .reset_index(level=1,drop=True)
  77. .rename_axis(columns=None)
  78. )
  79. write_ts_start = data.index.min() if write_ts_start is None else write_ts_start
  80. write_ts_end = data.index.max() if write_ts_end is None else write_ts_end
  81. data = (
  82. data[write_ts_start:write_ts_end]
  83. .reset_index(drop=False)
  84. .set_index(['_raw_index','write_ts'])
  85. )
  86. return data
  87. def get_file_path(file_name,data_name) -> str:
  88. wf_type = get_workflow_type()
  89. if data_name is None:
  90. file_path = f'{DIR_PATH}/{file_name}_{wf_type}.csv'
  91. else:
  92. folder_name = f'{file_name}_{wf_type}'
  93. file_path = f'{DIR_PATH}/{folder_name}/{data_name}.csv'
  94. # 检查指定父目录下是否存在某个文件夹,如果不存在则创建它
  95. folder_path = os.path.join(DIR_PATH, folder_name)
  96. if not os.path.exists(folder_path):
  97. os.makedirs(folder_path)
  98. return file_path
  99. def get_workflow_type() -> str:
  100. # SIMULATOR_JOB_ID 仿真环境
  101. # MONITOR_TYPE 监控环境
  102. # OPTIM_CALC_LOG_ID 优化回算
  103. # OPTIM_JOB_ID 实时优化环境
  104. # JOB_SCHEDULE_CALC_TIME_NOW 任务计划
  105. # WORKFLOW_DEBUG 调试环境
  106. wf_type = 'UNKNOW'
  107. for each_type,each_type_code in zip(
  108. ['SIMULATOR_JOB_ID','OPTIM_CALC_LOG_ID','OPTIM_JOB_ID','JOB_SCHEDULE_CALC_TIME_NOW','WORKFLOW_DEBUG'],
  109. ['FZ','HS','YH','RW','TS']
  110. ):
  111. if each_type in os.environ:
  112. wf_type = each_type_code
  113. return wf_type