import datetime from dateutil import tz import time from functools import wraps import traceback import sys import threading # 获取带上海时区的当前时间,可指定以天为单位的延时,用于本地历史数据测试 def get_now(delay_day=0): """ :param delay_day: int 表示延时天数,只有测试历史数据时使用 :return: datetime """ return datetime.datetime.now(tz=tz.gettz('Asia/Shanghai')) - datetime.timedelta(days=delay_day) # decorator 使被修饰任务整点运行,并处理异常 def timed_exec(intv_timed: datetime.timedelta, round_mode, sleep=60): """ 被修饰任务的第一个参数必须为 now 且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间 :param intv_timed: timedelta 运行的时间间隔 :param round_mode: str 时间的取整方式 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'} :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位 :return: function """ def timed_exec_deco(func): @wraps(func) def timed_execute_wraps(*args, **kwargs): now = round_time(get_now(), round_mode) - intv_timed while True: try: now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode) func(now, *args, **kwargs) except Exception as e: print() print(f"{datetime.datetime.now()}:") print(f"when run {now} task an error occur!") print(f"Exception Info:") e_type, e_value, e_traceback = sys.exc_info() print(e_type) print(e_value) traceback.print_tb(e_traceback) print() time.sleep(sleep) return timed_execute_wraps return timed_exec_deco # decorator 使被修饰任务整点运行,并处理异常,且告知任务这次和上次的时间 def timed_exec_last_now(intv_timed: datetime.timedelta, round_mode, bg_last_now='auto', sleep=60): """ 被修饰任务的前两个参数必须为 now,last_now,分别为本次运行时间和上次运行时间 且修饰后不用再向任务传入该参数,由装饰器向任务传送两个时间 :param intv_timed: timedelta 运行的时间间隔 :param round_mode: str 时间的取整方式 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'} :param bg_last_now: datetime 第一次执行时给的 last_now default='auto' 自动向前减去 intv_timed 作为 last_now :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位 :return: function """ def timed_exec_deco(func): @wraps(func) def timed_execute_wraps(*args, **kwargs): if bg_last_now == 'auto': last_now = round_time(get_now(), round_mode) - intv_timed else: last_now = bg_last_now while True: try: now = round_time(time_block(last_now, intv_timed, sleep=sleep), round_mode) func(now, last_now, *args, **kwargs) last_now = now except Exception as e: print() print(f"{datetime.datetime.now()}:") print(f"when run {now} task an error occur!") print(f"Exception Info:") e_type, e_value, e_traceback = sys.exc_info() print(e_type) print(e_value) traceback.print_tb(e_traceback) print() time.sleep(sleep) return timed_execute_wraps return timed_exec_deco # decorator 使被修饰任务整点运行,并处理异常,且在出错时会打印线程信息 def timed_exec_multhd(intv_timed: datetime.timedelta, round_mode, sleep=60): """ 被修饰任务的第一个参数必须为 now 且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间 :param intv_timed: timedelta 运行的时间间隔 :param round_mode: str 时间的取整方式 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'} :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位 :return: function """ def timed_exec_deco(func): @wraps(func) def timed_execute_wraps(*args, **kwargs): now = round_time(get_now(), round_mode) - intv_timed while True: try: now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode) func(now, *args, **kwargs) except Exception as e: thread_curr = threading.currentThread() print() print(f"{datetime.datetime.now()}:") print(f"In {thread_curr.name} (target: {thread_curr._target}, args: {thread_curr._args})") print(f"when run {now} task an error occur!") print(f"Exception Info:") e_type, e_value, e_traceback = sys.exc_info() print(e_type) print(e_value) traceback.print_tb(e_traceback) print() time.sleep(sleep) return timed_execute_wraps return timed_exec_deco # 对 datetime 向前取整 def round_time(t_: datetime.datetime, lvl=None): """ :param t_: datetime :param lvl: str=None 取整方式,目前支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'} :return: datetime or "No support" """ if lvl is None: return t_ round_lvl = "round_" + lvl if round_lvl in globals(): return globals()[round_lvl](t_) else: return "No support" def round_sec(t_: datetime.datetime): return t_ - datetime.timedelta(microseconds=t_.microsecond) def round_min(t_: datetime.datetime): return round_sec(t_) - datetime.timedelta(seconds=t_.second) def round_hou(t_: datetime.datetime): return round_min(t_) - datetime.timedelta(minutes=t_.minute) def round_day(t_: datetime.datetime): return round_hou(t_) - datetime.timedelta(hours=t_.hour) def round_10min(t_: datetime.datetime): t_ = round_min(t_) return t_ - datetime.timedelta(minutes=t_.minute % 10) def round_30min(t_: datetime.datetime): t_ = round_min(t_) if t_.minute < 30: return t_ - datetime.timedelta(minutes=t_.minute) else: return t_ - datetime.timedelta(minutes=t_.minute - 30) # 半成品的时间比较函数 def time_check(t_, st, timed_thre): if t_ - st >= timed_thre: return True else: return False # 阻塞并不断检查,直到当前时间和指定时间的差大于指定间隔时,释放并返回当前时间 def time_block(t_: datetime.datetime, timed_thre: datetime.timedelta, sleep=1): while True: now = get_now() if now - t_ >= timed_thre: return now else: time.sleep(sleep)