| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- from datetime import datetime
- from workflowlib.utils import cache as workflow_cache
- class WfCache:
-
- datetime_fmt = "%Y-%m-%d %H:%M:%S"
-
- def __init__(self) -> None:
- return
-
- def convert_data_to_str(self,data):
- if isinstance(data,(str,int,float)):
- str_data = str(data)
- elif isinstance(data,datetime):
- str_data = datetime.strftime(data,self.datetime_fmt)
- elif data is None:
- str_data = ''
-
- return str_data
-
- def convert_str_to_data(self,str_data,data_type):
- if str_data == '':
- data = None
- return data
- if str_data is None:
- return None
-
- if data_type == 'int':
- data = int(str_data)
- elif data_type == 'float':
- data = float(str_data)
- elif data_type == 'str':
- data = str_data
- elif data_type == 'datetime':
- data = datetime.strptime(str_data,self.datetime_fmt)
-
- return data
-
- def read(self,key,data_type):
- str_data = workflow_cache.get(key)
- data = self.convert_str_to_data(str_data,data_type)
- return data
-
- def write(self,key,data):
- str_data = self.convert_data_to_str(data)
- workflow_cache.set(key,str_data)
|