from datetime import datetime from workflowlib.utils import cache as workflow_cache class WfCache: datetime_fmt = "%Y-%m-%d %H:%M:%S" def __init__(self) -> None: return def convert_data_to_str(self,data): if isinstance(data,(str,int,float)): str_data = str(data) elif isinstance(data,datetime): str_data = datetime.strftime(data,self.datetime_fmt) elif data is None: str_data = '' return str_data def convert_str_to_data(self,str_data,data_type): if str_data == '': data = None return data if str_data is None: return None if data_type == 'int': data = int(str_data) elif data_type == 'float': data = float(str_data) elif data_type == 'str': data = str_data elif data_type == 'datetime': data = datetime.strptime(str_data,self.datetime_fmt) return data def read(self,key,data_type): str_data = workflow_cache.get(key) data = self.convert_str_to_data(str_data,data_type) return data def write(self,key,data): str_data = self.convert_data_to_str(data) workflow_cache.set(key,str_data)