| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- import datetime
- from dateutil import tz
- import time
- from functools import wraps
- import traceback
- import sys
- import threading
- # 获取带上海时区的当前时间,可指定以天为单位的延时,用于本地历史数据测试
- def get_now(delay_day=0):
- """
- :param delay_day: int 表示延时天数,只有测试历史数据时使用
- :return: datetime
- """
- return datetime.datetime.now(tz=tz.gettz('Asia/Shanghai')) - datetime.timedelta(days=delay_day)
- # decorator 使被修饰任务整点运行,并处理异常
- def timed_exec(intv_timed: datetime.timedelta, round_mode, sleep=60):
- """
- 被修饰任务的第一个参数必须为 now
- 且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间
- :param intv_timed: timedelta 运行的时间间隔
- :param round_mode: str 时间的取整方式
- 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
- :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
- :return: function
- """
- def timed_exec_deco(func):
- @wraps(func)
- def timed_execute_wraps(*args, **kwargs):
- now = round_time(get_now(), round_mode) - intv_timed
- while True:
- try:
- now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode)
- func(now, *args, **kwargs)
- except Exception as e:
- print()
- print(f"{datetime.datetime.now()}:")
- print(f"when run {now} task an error occur!")
- print(f"Exception Info:")
- e_type, e_value, e_traceback = sys.exc_info()
- print(e_type)
- print(e_value)
- traceback.print_tb(e_traceback)
- print()
- time.sleep(sleep)
- return timed_execute_wraps
- return timed_exec_deco
- # decorator 使被修饰任务整点运行,并处理异常,且告知任务这次和上次的时间
- def timed_exec_last_now(intv_timed: datetime.timedelta, round_mode, bg_last_now='auto', sleep=60):
- """
- 被修饰任务的前两个参数必须为 now,last_now,分别为本次运行时间和上次运行时间
- 且修饰后不用再向任务传入该参数,由装饰器向任务传送两个时间
- :param intv_timed: timedelta 运行的时间间隔
- :param round_mode: str 时间的取整方式
- 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
- :param bg_last_now: datetime 第一次执行时给的 last_now
- default='auto' 自动向前减去 intv_timed 作为 last_now
- :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
- :return: function
- """
- def timed_exec_deco(func):
- @wraps(func)
- def timed_execute_wraps(*args, **kwargs):
- if bg_last_now == 'auto':
- last_now = round_time(get_now(), round_mode) - intv_timed
- else:
- last_now = bg_last_now
- while True:
- try:
- now = round_time(time_block(last_now, intv_timed, sleep=sleep), round_mode)
- func(now, last_now, *args, **kwargs)
- last_now = now
- except Exception as e:
- print()
- print(f"{datetime.datetime.now()}:")
- print(f"when run {now} task an error occur!")
- print(f"Exception Info:")
- e_type, e_value, e_traceback = sys.exc_info()
- print(e_type)
- print(e_value)
- traceback.print_tb(e_traceback)
- print()
- time.sleep(sleep)
- return timed_execute_wraps
- return timed_exec_deco
- # decorator 使被修饰任务整点运行,并处理异常,且在出错时会打印线程信息
- def timed_exec_multhd(intv_timed: datetime.timedelta, round_mode, sleep=60):
- """
- 被修饰任务的第一个参数必须为 now
- 且修饰后不用再向任务传入该参数,由装饰器向任务传送整点时间
- :param intv_timed: timedelta 运行的时间间隔
- :param round_mode: str 时间的取整方式
- 支持 {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
- :param sleep: int=60 当指定间隔未到,或出现异常,休眠指定时间,以秒为单位
- :return: function
- """
- def timed_exec_deco(func):
- @wraps(func)
- def timed_execute_wraps(*args, **kwargs):
- now = round_time(get_now(), round_mode) - intv_timed
- while True:
- try:
- now = round_time(time_block(now, intv_timed, sleep=sleep), round_mode)
- func(now, *args, **kwargs)
- except Exception as e:
- thread_curr = threading.currentThread()
- print()
- print(f"{datetime.datetime.now()}:")
- print(f"In {thread_curr.name} (target: {thread_curr._target}, args: {thread_curr._args})")
- print(f"when run {now} task an error occur!")
- print(f"Exception Info:")
- e_type, e_value, e_traceback = sys.exc_info()
- print(e_type)
- print(e_value)
- traceback.print_tb(e_traceback)
- print()
- time.sleep(sleep)
- return timed_execute_wraps
- return timed_exec_deco
- # 对 datetime 向前取整
- def round_time(t_: datetime.datetime, lvl=None):
- """
- :param t_: datetime
- :param lvl: str=None 取整方式,目前支持
- {None, 'sec', 'min', 'hou', 'day', '10min', '30min'}
- :return: datetime or "No support"
- """
- if lvl is None:
- return t_
- round_lvl = "round_" + lvl
- if round_lvl in globals():
- return globals()[round_lvl](t_)
- else:
- return "No support"
- def round_sec(t_: datetime.datetime):
- return t_ - datetime.timedelta(microseconds=t_.microsecond)
- def round_min(t_: datetime.datetime):
- return round_sec(t_) - datetime.timedelta(seconds=t_.second)
- def round_hou(t_: datetime.datetime):
- return round_min(t_) - datetime.timedelta(minutes=t_.minute)
- def round_day(t_: datetime.datetime):
- return round_hou(t_) - datetime.timedelta(hours=t_.hour)
- def round_10min(t_: datetime.datetime):
- t_ = round_min(t_)
- return t_ - datetime.timedelta(minutes=t_.minute % 10)
- def round_30min(t_: datetime.datetime):
- t_ = round_min(t_)
- if t_.minute < 30:
- return t_ - datetime.timedelta(minutes=t_.minute)
- else:
- return t_ - datetime.timedelta(minutes=t_.minute - 30)
- # 半成品的时间比较函数
- def time_check(t_, st, timed_thre):
- if t_ - st >= timed_thre:
- return True
- else:
- return False
- # 阻塞并不断检查,直到当前时间和指定时间的差大于指定间隔时,释放并返回当前时间
- def time_block(t_: datetime.datetime, timed_thre: datetime.timedelta, sleep=1):
- while True:
- now = get_now()
- if now - t_ >= timed_thre:
- return now
- else:
- time.sleep(sleep)
|