import json import datetime import time import pandas as pd import traceback import sys import os from functools import partial from dateutil import tz try: from workflowlib import requests except: import requests urlcfg = { 'getpointdata_url' : "data/getpointdata", 'getpointsdata_url' : "data/getpointsdata", 'getpointsdataforai_url': "data/getpointsdataforai", 'getpointsruntime_url' : "data/getpointsruntime", 'getcurrdata_url' : "ai/getcurrdata", 'gethisdata_url' : "ai/gethisdata", 'putaidata_url' : "ai/putaidata", 'uploadaifile_url' : "ai/uploadaifile", 'addpointdatum_url' : "ai/addpointdatum", } class PointReader: root_url = os.environ.get('DATA_ROOT_URL') upload_url = os.environ.get('DATA_UPLOAD_URL') getpointdata_url = urlcfg["getpointdata_url"] getpointsdata_url = urlcfg["getpointsdata_url"] getpointsdataforai_url = urlcfg["getpointsdataforai_url"] getpointsruntime_url = urlcfg["getpointsruntime_url"] getcurrdata_url = urlcfg["getcurrdata_url"] gethisdata_url = urlcfg["gethisdata_url"] putaidata_url = urlcfg["putaidata_url"] uploadaifile_url = urlcfg["uploadaifile_url"] addpointdatum_url = urlcfg["addpointdatum_url"] dtfromts = partial(datetime.datetime.fromtimestamp, tz=tz.gettz('Asia/Shanghai')) # 最大连接重试次数,连接失败等待时间 max_try = 10 post_sleep = 1 def __init__(self,url=None) -> None: if url is not None: self.url = url print(f'使用临时url:{self.url}') elif self.root_url is None: raise Exception('未在环境变量中获取到 DATA_ROOT_URL') else: self.url = self.root_url + self.getpointsdata_url # 传入包含多个point id的list,返回一个dataframe里面包含了ts,point_id,value def get_points_data(self,point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None, return_type='dict'): """ :param point_ids: list :param from_time: datetime 开始时间 :param to_time: datetime 结束时间 :param interval: int=1 时间间隔 :param type_: =3 后端业务要求 :param ts2dt_col: list timestamp需要转换为datetime的列名 :param return_type: str in {'dict', 'df', 'dfcol'} default='dict' 指定返回的数据结构 'dict' 返回 {point_id: DataFrame} (原始结构) 'df' 返回各点位加入 point_id 列再按行拼合后的结果 'dfcol' 返回各点位以时间戳为索引,按列拼合,并用 point_id 作为 value 的列名,有 value 时才生效 :return: DataFrame """ post_data = { "point_ids": point_ids, "begin" : round(from_time.timestamp()), "end" : round(to_time.timestamp()), "interval" : interval, "type" : type_, } rem_try = self.max_try while rem_try > 0: try: resp = requests.post(url=self.url, data=json.dumps(post_data),timeout=60) data = resp.json()['data'] if data: res = dict() for point in data: res[point['point_id']] = pd.DataFrame(point['data']) if ts2dt_col is not None: res[point['point_id']] = self.ts2dt(res[point['point_id']], ts2dt_col) # res[point['point_id']].set_index(['ts'], inplace=True) if return_type == 'dict': return res elif return_type == 'df': for point in res.keys(): res[point]['point_id'] = point return pd.concat(res.values(), axis=0) elif return_type == 'dfcol': res_df = pd.DataFrame() for point_id, df_ in res.items(): res_df = pd.concat( [res_df, df_.set_index('ts').rename(columns={'value': point_id})], axis=1) return res_df.reset_index() else: rem_try -= 1 time.sleep(self.post_sleep) except Exception as e: self.error_print(sys._getframe().f_code.co_name) rem_try -= 1 time.sleep(self.post_sleep) if rem_try == 0: print("\nget_points_data failed") # 内部函数:打印报错信息 def error_print(self,func_name): print() print(f"{self.dtfromts(time.time())}:") print(f"function {func_name} error!") print(f"Exception Info:") e_type, e_value, e_traceback = sys.exc_info() print(e_type) print(e_value) traceback.print_tb(e_traceback) print() # 内部函数:将timestamp转换成datetime def ts2dt(self,df: pd.DataFrame, cols): for col in cols: df[col] = pd.Series(map(self.dtfromts, df[col])) return df class PointWriter: upload_url = os.environ.get('DATA_UPLOAD_URL') addpointdatum_url = urlcfg.get('addpointdatum_url') # 最大连接重试次数,连接失败等待时间 max_try = 10 post_sleep = 1 def __init__(self,url=None) -> None: if url is not None: self.url = url print(f'使用临时url:{self.url}') elif self.upload_url is None: raise Exception('未在环境变量中获取到 DATA_UPLOAD_URL') else: self.url = self.upload_url + self.addpointdatum_url # 上传数据至点位数据库 def ai_add_point_data(self, point_id, ts, value): """ :param point_id: str 数据点位,需要预先在点位库中创建好 :param timestamp: str 当前时刻的时间 datetime :param value: str 点位数值 :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功 """ url = self.upload_url + self.addpointdatum_url post_data = [ { "point_id": str(point_id), "data":[{"ts": int(ts.timestamp()), "value": str(value)}] } ] rem_try = self.max_try while rem_try > 0: try: resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data),timeout=60) state = resp.json()['state'] if state == 0: print(f"\nput {point_id} data success!") return resp.json() else: print(f"strange resp: {resp.json()}") rem_try -= 1 time.sleep(self.post_sleep) resp.close() except Exception as e: self.error_print(sys._getframe().f_code.co_name) rem_try -= 1 time.sleep(self.post_sleep) if rem_try == 0: print("\nai_add_point_data failed") return None # 内部函数:打印报错信息 def error_print(self,func_name): print() print(f"{self.dtfromts(time.time())}:") print(f"function {func_name} error!") print(f"Exception Info:") e_type, e_value, e_traceback = sys.exc_info() print(e_type) print(e_value) traceback.print_tb(e_traceback) print() # # 内部函数:将datetime转换成timestamp # def dt2ts(df: pd.DataFrame, cols): # for col in cols: # df[col] = pd.Series([dt.timestamp() for dt in df[col]]) # return df # # 传入一个point id,返回一个dataframe里面包含了ts,point_id,value # def get_point_data(point_id, from_time, to_time, interval=1, type_=3, ts2dt_col=None): # """ # :param point_id: string # :param from_time: datetime 开始时间 # :param to_time: datetime 结束时间 # :param interval: int=1 时间间隔 # :param type_: =3 后端业务要求 # :param ts2dt_col: list timestamp需要转换为datetime的列名 # :return: DataFrame 包含时间 # """ # url = root_url + getpointdata_url # post_data = { # "point_id": point_id, # "begin": round(from_time.timestamp()), # "end": round(to_time.timestamp()), # "interval": interval, # "type": type_, # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # ts_data = resp.json()['data'] # if ts_data: # res = pd.DataFrame(ts_data) # if ts2dt_col is not None: # res = ts2dt(res, ts2dt_col) # # res.set_index(['ts'], inplace=True) # return res # else: # rem_try -= 1 # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nget_point_data failed") # # 传入包含多个point id的list,返回一个dataframe里面包含了ts,多个point_id求总后的value # def sum_points_by_ts(point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None): # """ # :param point_ids: list # :param from_time: datetime 开始时间 # :param to_time: datetime 结束时间 # :param interval: int=1 时间间隔 # :param type_: =3 后端业务要求 # :param ts2dt_col: list timestamp需要转换为datetime的列名 # :return: DataFrame # """ # try: # data_dict = get_points_data(point_ids, from_time, to_time, interval, type_, ts2dt_col) # for point_id in data_dict.keys(): # data_dict[point_id].set_index(['ts'], inplace=True) # return sum(_ for _ in data_dict.values()).reset_index() # except Exception as e: # error_print(sys._getframe().f_code.co_name) # time.sleep(post_sleep) # print("\nsum_points_by_ts failed") # # 传入包含多个point id的list,返回所有point_id最新的一条数据 # def get_points_run_time(point_ids, ts2dt_col=['ts']): # """ # :param point_ids: list # :param ts2dt_col: list=['ts'] timestamp需要转换为datetime的列名 # :return: DataFrame # """ # url = root_url + getpointsruntime_url # post_data = { # "point_ids": point_ids, # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # data = resp.json()['data'] # if data: # res = pd.DataFrame(data) # if ts2dt_col is not None: # res = ts2dt(res, ts2dt_col) # return res # else: # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nget_points_run_time failed") # # 获取最新的AI数据 # def ai_get_curr_data( # model_id, model_version, algo, algo_version, module_id, # ts2dt_col=None, # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param ts2dt_col: list timestamp需要转换为datetime的列名 # :return: datetime, DataFrame 时间戳和数据 # """ # url = upload_url + getcurrdata_url # post_data = { # "model_id": model_id, # "model_version": model_version, # "algo": algo, # "algo_version": algo_version, # "module_id": module_id, # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # data_resp = resp.json()['data'] # if data_resp: # # res = dict() # # for row in data_resp: # # res[row['ts']] = pd.DataFrame(json.loads(row['data'])) # ts_dt = dtfromts(data_resp[0]['ts']) # res = pd.DataFrame(json.loads(data_resp[0]['data'])) # if ts2dt_col is not None: # res = ts2dt(res, ts2dt_col) # return ts_dt, res # else: # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_get_curr_data failed") # # 获取历史AI数据 # def ai_get_his_data( # model_id, model_version, algo, algo_version, module_id, # from_time: datetime.datetime, # to_time: datetime.datetime, # ts2dt_col=None, # return_type='df' # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param from_time: datetime 开始时间 # :param to_time: datetime 结束时间 # :param ts2dt_col: list timestamp需要转换为datetime的列名 # :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构 # 'dict' 返回 {ts: DataFrame} (原始结构) # 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果 # :return: DataFrame里面包含了ts, value # """ # url = upload_url + gethisdata_url # post_data = { # "model_id": model_id, # "model_version": model_version, # "algo": algo, # "algo_version": algo_version, # "module_id": module_id, # "begin": round(from_time.timestamp()), # "end": round(to_time.timestamp()), # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # data_resp = resp.json()['data'] # if data_resp: # res = dict() # for row in data_resp: # ts = dtfromts(row['ts']) # res[ts] = pd.DataFrame(json.loads(row['data'])) # if ts2dt_col is not None: # res[ts] = ts2dt(res[ts], ts2dt_col) # if return_type == 'dict': # return res # elif return_type == 'df': # for ts in res.keys(): # res[ts]['ts'] = ts # return pd.concat(res.values(), axis=0) # else: # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_get_his_data failed") # def ai_get_his_data2( # model_id, model_version, algo, algo_version, module_id, # from_time: datetime.datetime, # to_time: datetime.datetime, # ts2dt_col=None, # return_type='df' # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param from_time: datetime 开始时间 # :param to_time: datetime 结束时间 # :param ts2dt_col: list timestamp需要转换为datetime的列名 # :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构 # 'dict' 返回 {ts: DataFrame} (原始结构) # 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果 # :return: DataFrame里面包含了ts, value # """ # url = root_url + gethisdata_url # post_data = { # "model_id": model_id, # "model_version": model_version, # "algo": algo, # "algo_version": algo_version, # "module_id": module_id, # "begin": round(from_time.timestamp()), # "end": round(to_time.timestamp()), # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # data_resp = resp.json()['data'] # if data_resp: # res = dict() # for row in data_resp: # ts = dtfromts(row['ts']) # res[ts] = pd.DataFrame(json.loads(row['data'])) # if ts2dt_col is not None: # res[ts] = ts2dt(res[ts], ts2dt_col) # if return_type == 'dict': # return res # elif return_type == 'df': # for ts in res.keys(): # res[ts]['ts'] = ts # return pd.concat(res.values(), axis=0) # else: # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_get_his_data failed") # # 将 DataFrame 格式数据转换为 jsonlike 的 list 格式数据 # def df2jsonlike(df: pd.DataFrame): # res = [] # for _, row in df.iterrows(): # res.append(row.to_dict()) # return res # # 将模型所预测的数据存入数据库 # # 点位数据 # def ai_put_ai_data( # model_id, model_version, algo, algo_version, module_id, # ts_dt: datetime.datetime, # data_df: pd.DataFrame, # dt2ts_col=None # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳 # :param data_df: DataFrame 业务数据 # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功 # """ # url = upload_url + putaidata_url # if dt2ts_col is not None: # data_df = dt2ts(data_df, dt2ts_col) # post_data = { # "model_id": model_id, # "model_version": model_version, # "algo": algo, # "algo_version": algo_version, # "module_id": module_id, # "ts": round(ts_dt.timestamp()), # "data": df2jsonlike(data_df), # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # state = resp.json()['state'] # if state == 0: # print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!") # return resp.json() # else: # print(f"strange resp: {resp.json()}") # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_put_ai_data failed") # return None # # 将模型所预测的数据存入数据库 # # 能耗基线数据 # def ai_put_ai_data2( # model_id, model_version, algo, algo_version, module_id, # ts_dt: datetime.datetime, # data_df: pd.DataFrame, # dt2ts_col=None # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳 # :param data_df: DataFrame 业务数据 # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功 # """ # url = root_url + putaidata_url # if dt2ts_col is not None: # data_df = dt2ts(data_df, dt2ts_col) # post_data = { # "model_id" : model_id, # "model_version": model_version, # "algo" : algo, # "algo_version" : algo_version, # "module_id" : module_id, # "ts" : round(ts_dt.timestamp()), # "data" : df2jsonlike(data_df) if isinstance(data_df, pd.DataFrame) else data_df, # } # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url, data=json.dumps(post_data)) # state = resp.json()['state'] # if state == 0: # print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!") # return resp.json() # else: # print(f"strange resp: {resp.json()}") # rem_try -= 1 # time.sleep(post_sleep) # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_put_ai_data failed") # return None # # 通过接口上传文件 # def ai_upload_ai_file( # model_id, algo, algo_version, module_id, # file # ): # """ # :param model_id: str 模型编号 # :param model_version: str 模型版本 # :param algo: str 算法名称 # :param algo_version: str 算法版本 # :param module_id: str 模块编号 # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳 # :param data_df: DataFrame 业务数据 # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功 # """ # url = upload_url + uploadaifile_url # post_data = { # "model_id" : str(model_id), # "algo" : str(algo), # "algo_version": str(algo_version), # "module_id" : str(module_id), # "file" : file # } # multipart_encoder = MultipartEncoder( # fields=post_data # ) # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url,headers={'Content-Type': multipart_encoder.content_type}, data=multipart_encoder) # state = resp.json()['state'] # if state == 0: # print(f"\nput {model_id}_{algo}_{algo_version}_{module_id} data success!") # return resp.json() # else: # print(f"strange resp: {resp.json()}") # rem_try -= 1 # time.sleep(post_sleep) # resp.close() # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_upload_ai_file failed") # return None # # 上传数据至点位数据库 # def ai_add_point_data( # point_id, ts, value # ): # """ # :param point_id: str 数据点位,需要预先在点位库中创建好 # :param timestamp: str 当前时刻的时间 datetime # :param value: str 点位数值 # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功 # """ # url = root_url + addpointdatum_url # post_data = [{"point_id": str(point_id), # "data":[{"ts": int(ts.timestamp()), # "value": str(value)}]}] # rem_try = max_try # while rem_try > 0: # try: # resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data)) # state = resp.json()['state'] # if state == 0: # print(f"\nput {point_id} data success!") # return resp.json() # else: # print(f"strange resp: {resp.json()}") # rem_try -= 1 # time.sleep(post_sleep) # resp.close() # except Exception as e: # error_print(sys._getframe().f_code.co_name) # rem_try -= 1 # time.sleep(post_sleep) # if rem_try == 0: # print("\nai_add_point_data failed") # return None