|
|
@@ -0,0 +1,671 @@
|
|
|
+import json
|
|
|
+import datetime
|
|
|
+import time
|
|
|
+import pandas as pd
|
|
|
+import traceback
|
|
|
+import sys
|
|
|
+import os
|
|
|
+from functools import partial
|
|
|
+from dateutil import tz
|
|
|
+
|
|
|
+try:
|
|
|
+ from workflowlib import requests
|
|
|
+except:
|
|
|
+ import requests
|
|
|
+
|
|
|
+
|
|
|
+urlcfg = {
|
|
|
+ 'getpointdata_url' : "data/getpointdata",
|
|
|
+ 'getpointsdata_url' : "data/getpointsdata",
|
|
|
+ 'getpointsdataforai_url': "data/getpointsdataforai",
|
|
|
+ 'getpointsruntime_url' : "data/getpointsruntime",
|
|
|
+ 'getcurrdata_url' : "ai/getcurrdata",
|
|
|
+ 'gethisdata_url' : "ai/gethisdata",
|
|
|
+ 'putaidata_url' : "ai/putaidata",
|
|
|
+ 'uploadaifile_url' : "ai/uploadaifile",
|
|
|
+ 'addpointdatum_url' : "ai/addpointdatum",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+class PointReader:
|
|
|
+
|
|
|
+ root_url = os.environ.get('DATA_ROOT_URL')
|
|
|
+ upload_url = os.environ.get('DATA_UPLOAD_URL')
|
|
|
+ getpointdata_url = urlcfg["getpointdata_url"]
|
|
|
+ getpointsdata_url = urlcfg["getpointsdata_url"]
|
|
|
+ getpointsdataforai_url = urlcfg["getpointsdataforai_url"]
|
|
|
+ getpointsruntime_url = urlcfg["getpointsruntime_url"]
|
|
|
+ getcurrdata_url = urlcfg["getcurrdata_url"]
|
|
|
+ gethisdata_url = urlcfg["gethisdata_url"]
|
|
|
+ putaidata_url = urlcfg["putaidata_url"]
|
|
|
+ uploadaifile_url = urlcfg["uploadaifile_url"]
|
|
|
+ addpointdatum_url = urlcfg["addpointdatum_url"]
|
|
|
+
|
|
|
+ dtfromts = partial(datetime.datetime.fromtimestamp, tz=tz.gettz('Asia/Shanghai'))
|
|
|
+ # 最大连接重试次数,连接失败等待时间
|
|
|
+ max_try = 10
|
|
|
+ post_sleep = 1
|
|
|
+
|
|
|
+ def __init__(self,url=None) -> None:
|
|
|
+ if url is not None:
|
|
|
+ self.url = url
|
|
|
+ print(f'使用临时url:{self.url}')
|
|
|
+ elif self.root_url is None:
|
|
|
+ raise Exception('未在环境变量中获取到 DATA_ROOT_URL')
|
|
|
+ else:
|
|
|
+ self.url = self.root_url + self.getpointsdata_url
|
|
|
+
|
|
|
+ # 传入包含多个point id的list,返回一个dataframe里面包含了ts,point_id,value
|
|
|
+ def get_points_data(self,point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None, return_type='dict'):
|
|
|
+ """
|
|
|
+ :param point_ids: list
|
|
|
+ :param from_time: datetime 开始时间
|
|
|
+ :param to_time: datetime 结束时间
|
|
|
+ :param interval: int=1 时间间隔
|
|
|
+ :param type_: =3 后端业务要求
|
|
|
+ :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+ :param return_type: str in {'dict', 'df', 'dfcol'} default='dict' 指定返回的数据结构
|
|
|
+ 'dict' 返回 {point_id: DataFrame} (原始结构)
|
|
|
+ 'df' 返回各点位加入 point_id 列再按行拼合后的结果
|
|
|
+ 'dfcol' 返回各点位以时间戳为索引,按列拼合,并用 point_id 作为 value 的列名,有 value 时才生效
|
|
|
+ :return: DataFrame
|
|
|
+ """
|
|
|
+ post_data = {
|
|
|
+ "point_ids": point_ids,
|
|
|
+ "begin" : round(from_time.timestamp()),
|
|
|
+ "end" : round(to_time.timestamp()),
|
|
|
+ "interval" : interval,
|
|
|
+ "type" : type_,
|
|
|
+ }
|
|
|
+ rem_try = self.max_try
|
|
|
+ while rem_try > 0:
|
|
|
+ try:
|
|
|
+ resp = requests.post(url=self.url, data=json.dumps(post_data),timeout=60)
|
|
|
+ data = resp.json()['data']
|
|
|
+ if data:
|
|
|
+ res = dict()
|
|
|
+ for point in data:
|
|
|
+ res[point['point_id']] = pd.DataFrame(point['data'])
|
|
|
+ if ts2dt_col is not None:
|
|
|
+ res[point['point_id']] = self.ts2dt(res[point['point_id']], ts2dt_col)
|
|
|
+ # res[point['point_id']].set_index(['ts'], inplace=True)
|
|
|
+ if return_type == 'dict':
|
|
|
+ return res
|
|
|
+ elif return_type == 'df':
|
|
|
+ for point in res.keys():
|
|
|
+ res[point]['point_id'] = point
|
|
|
+ return pd.concat(res.values(), axis=0)
|
|
|
+ elif return_type == 'dfcol':
|
|
|
+ res_df = pd.DataFrame()
|
|
|
+ for point_id, df_ in res.items():
|
|
|
+ res_df = pd.concat(
|
|
|
+ [res_df, df_.set_index('ts').rename(columns={'value': point_id})],
|
|
|
+ axis=1)
|
|
|
+ return res_df.reset_index()
|
|
|
+ else:
|
|
|
+ rem_try -= 1
|
|
|
+ time.sleep(self.post_sleep)
|
|
|
+ except Exception as e:
|
|
|
+ self.error_print(sys._getframe().f_code.co_name)
|
|
|
+ rem_try -= 1
|
|
|
+ time.sleep(self.post_sleep)
|
|
|
+ if rem_try == 0:
|
|
|
+ print("\nget_points_data failed")
|
|
|
+
|
|
|
+ # 内部函数:打印报错信息
|
|
|
+ def error_print(self,func_name):
|
|
|
+ print()
|
|
|
+ print(f"{self.dtfromts(time.time())}:")
|
|
|
+ print(f"function {func_name} error!")
|
|
|
+ print(f"Exception Info:")
|
|
|
+ e_type, e_value, e_traceback = sys.exc_info()
|
|
|
+ print(e_type)
|
|
|
+ print(e_value)
|
|
|
+ traceback.print_tb(e_traceback)
|
|
|
+ print()
|
|
|
+
|
|
|
+ # 内部函数:将timestamp转换成datetime
|
|
|
+ def ts2dt(self,df: pd.DataFrame, cols):
|
|
|
+ for col in cols:
|
|
|
+ df[col] = pd.Series(map(self.dtfromts, df[col]))
|
|
|
+ return df
|
|
|
+
|
|
|
+class PointWriter:
|
|
|
+ upload_url = os.environ.get('DATA_UPLOAD_URL')
|
|
|
+ addpointdatum_url = urlcfg.get('addpointdatum_url')
|
|
|
+ # 最大连接重试次数,连接失败等待时间
|
|
|
+ max_try = 10
|
|
|
+ post_sleep = 1
|
|
|
+
|
|
|
+ def __init__(self,url=None) -> None:
|
|
|
+ if url is not None:
|
|
|
+ self.url = url
|
|
|
+ print(f'使用临时url:{self.url}')
|
|
|
+ elif self.upload_url is None:
|
|
|
+ raise Exception('未在环境变量中获取到 DATA_UPLOAD_URL')
|
|
|
+ else:
|
|
|
+ self.url = self.upload_url + self.addpointdatum_url
|
|
|
+
|
|
|
+ # 上传数据至点位数据库
|
|
|
+ def ai_add_point_data(self, point_id, ts, value):
|
|
|
+ """
|
|
|
+ :param point_id: str 数据点位,需要预先在点位库中创建好
|
|
|
+ :param timestamp: str 当前时刻的时间 datetime
|
|
|
+ :param value: str 点位数值
|
|
|
+ :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
|
|
|
+ """
|
|
|
+ url = self.upload_url + self.addpointdatum_url
|
|
|
+ post_data = [
|
|
|
+ {
|
|
|
+ "point_id": str(point_id),
|
|
|
+ "data":[{"ts": int(ts.timestamp()), "value": str(value)}]
|
|
|
+ }
|
|
|
+ ]
|
|
|
+
|
|
|
+ rem_try = self.max_try
|
|
|
+ while rem_try > 0:
|
|
|
+ try:
|
|
|
+ resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data),timeout=60)
|
|
|
+ state = resp.json()['state']
|
|
|
+ if state == 0:
|
|
|
+ print(f"\nput {point_id} data success!")
|
|
|
+ return resp.json()
|
|
|
+ else:
|
|
|
+ print(f"strange resp: {resp.json()}")
|
|
|
+ rem_try -= 1
|
|
|
+ time.sleep(self.post_sleep)
|
|
|
+ resp.close()
|
|
|
+ except Exception as e:
|
|
|
+ self.error_print(sys._getframe().f_code.co_name)
|
|
|
+ rem_try -= 1
|
|
|
+ time.sleep(self.post_sleep)
|
|
|
+ if rem_try == 0:
|
|
|
+ print("\nai_add_point_data failed")
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 内部函数:打印报错信息
|
|
|
+ def error_print(self,func_name):
|
|
|
+ print()
|
|
|
+ print(f"{self.dtfromts(time.time())}:")
|
|
|
+ print(f"function {func_name} error!")
|
|
|
+ print(f"Exception Info:")
|
|
|
+ e_type, e_value, e_traceback = sys.exc_info()
|
|
|
+ print(e_type)
|
|
|
+ print(e_value)
|
|
|
+ traceback.print_tb(e_traceback)
|
|
|
+ print()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# # 内部函数:将datetime转换成timestamp
|
|
|
+# def dt2ts(df: pd.DataFrame, cols):
|
|
|
+# for col in cols:
|
|
|
+# df[col] = pd.Series([dt.timestamp() for dt in df[col]])
|
|
|
+# return df
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# # 传入一个point id,返回一个dataframe里面包含了ts,point_id,value
|
|
|
+# def get_point_data(point_id, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
|
|
|
+# """
|
|
|
+# :param point_id: string
|
|
|
+# :param from_time: datetime 开始时间
|
|
|
+# :param to_time: datetime 结束时间
|
|
|
+# :param interval: int=1 时间间隔
|
|
|
+# :param type_: =3 后端业务要求
|
|
|
+# :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+# :return: DataFrame 包含时间
|
|
|
+# """
|
|
|
+# url = root_url + getpointdata_url
|
|
|
+# post_data = {
|
|
|
+# "point_id": point_id,
|
|
|
+# "begin": round(from_time.timestamp()),
|
|
|
+# "end": round(to_time.timestamp()),
|
|
|
+# "interval": interval,
|
|
|
+# "type": type_,
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# ts_data = resp.json()['data']
|
|
|
+# if ts_data:
|
|
|
+# res = pd.DataFrame(ts_data)
|
|
|
+# if ts2dt_col is not None:
|
|
|
+# res = ts2dt(res, ts2dt_col)
|
|
|
+# # res.set_index(['ts'], inplace=True)
|
|
|
+# return res
|
|
|
+# else:
|
|
|
+# rem_try -= 1
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nget_point_data failed")
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# # 传入包含多个point id的list,返回一个dataframe里面包含了ts,多个point_id求总后的value
|
|
|
+# def sum_points_by_ts(point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
|
|
|
+# """
|
|
|
+# :param point_ids: list
|
|
|
+# :param from_time: datetime 开始时间
|
|
|
+# :param to_time: datetime 结束时间
|
|
|
+# :param interval: int=1 时间间隔
|
|
|
+# :param type_: =3 后端业务要求
|
|
|
+# :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+# :return: DataFrame
|
|
|
+# """
|
|
|
+# try:
|
|
|
+# data_dict = get_points_data(point_ids, from_time, to_time, interval, type_, ts2dt_col)
|
|
|
+# for point_id in data_dict.keys():
|
|
|
+# data_dict[point_id].set_index(['ts'], inplace=True)
|
|
|
+# return sum(_ for _ in data_dict.values()).reset_index()
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# print("\nsum_points_by_ts failed")
|
|
|
+
|
|
|
+
|
|
|
+# # 传入包含多个point id的list,返回所有point_id最新的一条数据
|
|
|
+# def get_points_run_time(point_ids, ts2dt_col=['ts']):
|
|
|
+# """
|
|
|
+# :param point_ids: list
|
|
|
+# :param ts2dt_col: list=['ts'] timestamp需要转换为datetime的列名
|
|
|
+# :return: DataFrame
|
|
|
+# """
|
|
|
+# url = root_url + getpointsruntime_url
|
|
|
+# post_data = {
|
|
|
+# "point_ids": point_ids,
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# data = resp.json()['data']
|
|
|
+# if data:
|
|
|
+# res = pd.DataFrame(data)
|
|
|
+# if ts2dt_col is not None:
|
|
|
+# res = ts2dt(res, ts2dt_col)
|
|
|
+# return res
|
|
|
+# else:
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nget_points_run_time failed")
|
|
|
+
|
|
|
+
|
|
|
+# # 获取最新的AI数据
|
|
|
+# def ai_get_curr_data(
|
|
|
+# model_id, model_version, algo, algo_version, module_id,
|
|
|
+# ts2dt_col=None,
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+# :return: datetime, DataFrame 时间戳和数据
|
|
|
+# """
|
|
|
+# url = upload_url + getcurrdata_url
|
|
|
+# post_data = {
|
|
|
+# "model_id": model_id,
|
|
|
+# "model_version": model_version,
|
|
|
+# "algo": algo,
|
|
|
+# "algo_version": algo_version,
|
|
|
+# "module_id": module_id,
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# data_resp = resp.json()['data']
|
|
|
+# if data_resp:
|
|
|
+# # res = dict()
|
|
|
+# # for row in data_resp:
|
|
|
+# # res[row['ts']] = pd.DataFrame(json.loads(row['data']))
|
|
|
+# ts_dt = dtfromts(data_resp[0]['ts'])
|
|
|
+# res = pd.DataFrame(json.loads(data_resp[0]['data']))
|
|
|
+# if ts2dt_col is not None:
|
|
|
+# res = ts2dt(res, ts2dt_col)
|
|
|
+# return ts_dt, res
|
|
|
+# else:
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_get_curr_data failed")
|
|
|
+
|
|
|
+
|
|
|
+# # 获取历史AI数据
|
|
|
+# def ai_get_his_data(
|
|
|
+# model_id, model_version, algo, algo_version, module_id,
|
|
|
+# from_time: datetime.datetime,
|
|
|
+# to_time: datetime.datetime,
|
|
|
+# ts2dt_col=None,
|
|
|
+# return_type='df'
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param from_time: datetime 开始时间
|
|
|
+# :param to_time: datetime 结束时间
|
|
|
+# :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+# :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
|
|
|
+# 'dict' 返回 {ts: DataFrame} (原始结构)
|
|
|
+# 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
|
|
|
+# :return: DataFrame里面包含了ts, value
|
|
|
+# """
|
|
|
+# url = upload_url + gethisdata_url
|
|
|
+# post_data = {
|
|
|
+# "model_id": model_id,
|
|
|
+# "model_version": model_version,
|
|
|
+# "algo": algo,
|
|
|
+# "algo_version": algo_version,
|
|
|
+# "module_id": module_id,
|
|
|
+# "begin": round(from_time.timestamp()),
|
|
|
+# "end": round(to_time.timestamp()),
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# data_resp = resp.json()['data']
|
|
|
+# if data_resp:
|
|
|
+# res = dict()
|
|
|
+# for row in data_resp:
|
|
|
+# ts = dtfromts(row['ts'])
|
|
|
+# res[ts] = pd.DataFrame(json.loads(row['data']))
|
|
|
+# if ts2dt_col is not None:
|
|
|
+# res[ts] = ts2dt(res[ts], ts2dt_col)
|
|
|
+# if return_type == 'dict':
|
|
|
+# return res
|
|
|
+# elif return_type == 'df':
|
|
|
+# for ts in res.keys():
|
|
|
+# res[ts]['ts'] = ts
|
|
|
+# return pd.concat(res.values(), axis=0)
|
|
|
+# else:
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_get_his_data failed")
|
|
|
+
|
|
|
+# def ai_get_his_data2(
|
|
|
+# model_id, model_version, algo, algo_version, module_id,
|
|
|
+# from_time: datetime.datetime,
|
|
|
+# to_time: datetime.datetime,
|
|
|
+# ts2dt_col=None,
|
|
|
+# return_type='df'
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param from_time: datetime 开始时间
|
|
|
+# :param to_time: datetime 结束时间
|
|
|
+# :param ts2dt_col: list timestamp需要转换为datetime的列名
|
|
|
+# :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
|
|
|
+# 'dict' 返回 {ts: DataFrame} (原始结构)
|
|
|
+# 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
|
|
|
+# :return: DataFrame里面包含了ts, value
|
|
|
+# """
|
|
|
+# url = root_url + gethisdata_url
|
|
|
+# post_data = {
|
|
|
+# "model_id": model_id,
|
|
|
+# "model_version": model_version,
|
|
|
+# "algo": algo,
|
|
|
+# "algo_version": algo_version,
|
|
|
+# "module_id": module_id,
|
|
|
+# "begin": round(from_time.timestamp()),
|
|
|
+# "end": round(to_time.timestamp()),
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# data_resp = resp.json()['data']
|
|
|
+# if data_resp:
|
|
|
+# res = dict()
|
|
|
+# for row in data_resp:
|
|
|
+# ts = dtfromts(row['ts'])
|
|
|
+# res[ts] = pd.DataFrame(json.loads(row['data']))
|
|
|
+# if ts2dt_col is not None:
|
|
|
+# res[ts] = ts2dt(res[ts], ts2dt_col)
|
|
|
+# if return_type == 'dict':
|
|
|
+# return res
|
|
|
+# elif return_type == 'df':
|
|
|
+# for ts in res.keys():
|
|
|
+# res[ts]['ts'] = ts
|
|
|
+# return pd.concat(res.values(), axis=0)
|
|
|
+# else:
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_get_his_data failed")
|
|
|
+
|
|
|
+
|
|
|
+# # 将 DataFrame 格式数据转换为 jsonlike 的 list 格式数据
|
|
|
+# def df2jsonlike(df: pd.DataFrame):
|
|
|
+# res = []
|
|
|
+# for _, row in df.iterrows():
|
|
|
+# res.append(row.to_dict())
|
|
|
+# return res
|
|
|
+
|
|
|
+
|
|
|
+# # 将模型所预测的数据存入数据库
|
|
|
+# # 点位数据
|
|
|
+# def ai_put_ai_data(
|
|
|
+# model_id, model_version, algo, algo_version, module_id,
|
|
|
+# ts_dt: datetime.datetime,
|
|
|
+# data_df: pd.DataFrame,
|
|
|
+# dt2ts_col=None
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
|
|
|
+# :param data_df: DataFrame 业务数据
|
|
|
+# :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
|
|
|
+# :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
|
|
|
+# """
|
|
|
+# url = upload_url + putaidata_url
|
|
|
+# if dt2ts_col is not None:
|
|
|
+# data_df = dt2ts(data_df, dt2ts_col)
|
|
|
+# post_data = {
|
|
|
+# "model_id": model_id,
|
|
|
+# "model_version": model_version,
|
|
|
+# "algo": algo,
|
|
|
+# "algo_version": algo_version,
|
|
|
+# "module_id": module_id,
|
|
|
+# "ts": round(ts_dt.timestamp()),
|
|
|
+# "data": df2jsonlike(data_df),
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# state = resp.json()['state']
|
|
|
+# if state == 0:
|
|
|
+# print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
|
|
|
+# return resp.json()
|
|
|
+# else:
|
|
|
+# print(f"strange resp: {resp.json()}")
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_put_ai_data failed")
|
|
|
+# return None
|
|
|
+
|
|
|
+
|
|
|
+# # 将模型所预测的数据存入数据库
|
|
|
+# # 能耗基线数据
|
|
|
+# def ai_put_ai_data2(
|
|
|
+# model_id, model_version, algo, algo_version, module_id,
|
|
|
+# ts_dt: datetime.datetime,
|
|
|
+# data_df: pd.DataFrame,
|
|
|
+# dt2ts_col=None
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
|
|
|
+# :param data_df: DataFrame 业务数据
|
|
|
+# :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
|
|
|
+# :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
|
|
|
+# """
|
|
|
+# url = root_url + putaidata_url
|
|
|
+# if dt2ts_col is not None:
|
|
|
+# data_df = dt2ts(data_df, dt2ts_col)
|
|
|
+# post_data = {
|
|
|
+# "model_id" : model_id,
|
|
|
+# "model_version": model_version,
|
|
|
+# "algo" : algo,
|
|
|
+# "algo_version" : algo_version,
|
|
|
+# "module_id" : module_id,
|
|
|
+# "ts" : round(ts_dt.timestamp()),
|
|
|
+# "data" : df2jsonlike(data_df) if isinstance(data_df, pd.DataFrame) else data_df,
|
|
|
+# }
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url, data=json.dumps(post_data))
|
|
|
+# state = resp.json()['state']
|
|
|
+# if state == 0:
|
|
|
+# print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
|
|
|
+# return resp.json()
|
|
|
+# else:
|
|
|
+# print(f"strange resp: {resp.json()}")
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_put_ai_data failed")
|
|
|
+# return None
|
|
|
+
|
|
|
+
|
|
|
+# # 通过接口上传文件
|
|
|
+# def ai_upload_ai_file(
|
|
|
+# model_id, algo, algo_version, module_id,
|
|
|
+# file
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param model_id: str 模型编号
|
|
|
+# :param model_version: str 模型版本
|
|
|
+# :param algo: str 算法名称
|
|
|
+# :param algo_version: str 算法版本
|
|
|
+# :param module_id: str 模块编号
|
|
|
+# :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
|
|
|
+# :param data_df: DataFrame 业务数据
|
|
|
+# :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
|
|
|
+# :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
|
|
|
+# """
|
|
|
+# url = upload_url + uploadaifile_url
|
|
|
+# post_data = {
|
|
|
+# "model_id" : str(model_id),
|
|
|
+# "algo" : str(algo),
|
|
|
+# "algo_version": str(algo_version),
|
|
|
+# "module_id" : str(module_id),
|
|
|
+# "file" : file
|
|
|
+# }
|
|
|
+
|
|
|
+# multipart_encoder = MultipartEncoder(
|
|
|
+# fields=post_data
|
|
|
+# )
|
|
|
+
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url,headers={'Content-Type': multipart_encoder.content_type}, data=multipart_encoder)
|
|
|
+# state = resp.json()['state']
|
|
|
+# if state == 0:
|
|
|
+# print(f"\nput {model_id}_{algo}_{algo_version}_{module_id} data success!")
|
|
|
+# return resp.json()
|
|
|
+# else:
|
|
|
+# print(f"strange resp: {resp.json()}")
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# resp.close()
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_upload_ai_file failed")
|
|
|
+# return None
|
|
|
+
|
|
|
+
|
|
|
+# # 上传数据至点位数据库
|
|
|
+# def ai_add_point_data(
|
|
|
+# point_id, ts, value
|
|
|
+# ):
|
|
|
+# """
|
|
|
+# :param point_id: str 数据点位,需要预先在点位库中创建好
|
|
|
+# :param timestamp: str 当前时刻的时间 datetime
|
|
|
+# :param value: str 点位数值
|
|
|
+# :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
|
|
|
+# """
|
|
|
+# url = root_url + addpointdatum_url
|
|
|
+# post_data = [{"point_id": str(point_id),
|
|
|
+# "data":[{"ts": int(ts.timestamp()),
|
|
|
+# "value": str(value)}]}]
|
|
|
+
|
|
|
+# rem_try = max_try
|
|
|
+# while rem_try > 0:
|
|
|
+# try:
|
|
|
+# resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data))
|
|
|
+# state = resp.json()['state']
|
|
|
+# if state == 0:
|
|
|
+# print(f"\nput {point_id} data success!")
|
|
|
+# return resp.json()
|
|
|
+# else:
|
|
|
+# print(f"strange resp: {resp.json()}")
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# resp.close()
|
|
|
+# except Exception as e:
|
|
|
+# error_print(sys._getframe().f_code.co_name)
|
|
|
+# rem_try -= 1
|
|
|
+# time.sleep(post_sleep)
|
|
|
+# if rem_try == 0:
|
|
|
+# print("\nai_add_point_data failed")
|
|
|
+# return None
|
|
|
+
|
|
|
+
|