wf_cache.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from datetime import datetime
  2. from workflowlib.utils import cache as workflow_cache
  3. class WfCache:
  4. datetime_fmt = "%Y-%m-%d %H:%M:%S"
  5. def __init__(self) -> None:
  6. return
  7. def convert_data_to_str(self,data):
  8. if isinstance(data,(str,int,float)):
  9. str_data = str(data)
  10. elif isinstance(data,datetime):
  11. str_data = datetime.strftime(data,self.datetime_fmt)
  12. elif data is None:
  13. str_data = ''
  14. return str_data
  15. def convert_str_to_data(self,str_data,data_type):
  16. if str_data == '':
  17. data = None
  18. return data
  19. if str_data is None:
  20. return None
  21. if data_type == 'int':
  22. data = int(str_data)
  23. elif data_type == 'float':
  24. data = float(str_data)
  25. elif data_type == 'str':
  26. data = str_data
  27. elif data_type == 'datetime':
  28. data = datetime.strptime(str_data,self.datetime_fmt)
  29. return data
  30. def read(self,key,data_type):
  31. str_data = workflow_cache.get(key)
  32. data = self.convert_str_to_data(str_data,data_type)
  33. return data
  34. def write(self,key,data):
  35. str_data = self.convert_data_to_str(data)
  36. workflow_cache.set(key,str_data)