data_service.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. import json
  2. import datetime
  3. import time
  4. import pandas as pd
  5. import traceback
  6. import sys
  7. import os
  8. from functools import partial
  9. from dateutil import tz
  10. try:
  11. from workflowlib import requests
  12. except:
  13. import requests
  14. urlcfg = {
  15. 'getpointdata_url' : "data/getpointdata",
  16. 'getpointsdata_url' : "data/getpointsdata",
  17. 'getpointsdataforai_url': "data/getpointsdataforai",
  18. 'getpointsruntime_url' : "data/getpointsruntime",
  19. 'getcurrdata_url' : "ai/getcurrdata",
  20. 'gethisdata_url' : "ai/gethisdata",
  21. 'putaidata_url' : "ai/putaidata",
  22. 'uploadaifile_url' : "ai/uploadaifile",
  23. 'addpointdatum_url' : "ai/addpointdatum",
  24. }
  25. class PointReader:
  26. root_url = os.environ.get('DATA_ROOT_URL')
  27. upload_url = os.environ.get('DATA_UPLOAD_URL')
  28. getpointdata_url = urlcfg["getpointdata_url"]
  29. getpointsdata_url = urlcfg["getpointsdata_url"]
  30. getpointsdataforai_url = urlcfg["getpointsdataforai_url"]
  31. getpointsruntime_url = urlcfg["getpointsruntime_url"]
  32. getcurrdata_url = urlcfg["getcurrdata_url"]
  33. gethisdata_url = urlcfg["gethisdata_url"]
  34. putaidata_url = urlcfg["putaidata_url"]
  35. uploadaifile_url = urlcfg["uploadaifile_url"]
  36. addpointdatum_url = urlcfg["addpointdatum_url"]
  37. dtfromts = partial(datetime.datetime.fromtimestamp, tz=tz.gettz('Asia/Shanghai'))
  38. # 最大连接重试次数,连接失败等待时间
  39. max_try = 10
  40. post_sleep = 1
  41. def __init__(self,url=None) -> None:
  42. if url is not None:
  43. self.url = url
  44. print(f'使用临时url:{self.url}')
  45. elif self.root_url is None:
  46. raise Exception('未在环境变量中获取到 DATA_ROOT_URL')
  47. else:
  48. self.url = self.root_url + self.getpointsdata_url
  49. # 传入包含多个point id的list,返回一个dataframe里面包含了ts,point_id,value
  50. def get_points_data(self,point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None, return_type='dict'):
  51. """
  52. :param point_ids: list
  53. :param from_time: datetime 开始时间
  54. :param to_time: datetime 结束时间
  55. :param interval: int=1 时间间隔
  56. :param type_: =3 后端业务要求
  57. :param ts2dt_col: list timestamp需要转换为datetime的列名
  58. :param return_type: str in {'dict', 'df', 'dfcol'} default='dict' 指定返回的数据结构
  59. 'dict' 返回 {point_id: DataFrame} (原始结构)
  60. 'df' 返回各点位加入 point_id 列再按行拼合后的结果
  61. 'dfcol' 返回各点位以时间戳为索引,按列拼合,并用 point_id 作为 value 的列名,有 value 时才生效
  62. :return: DataFrame
  63. """
  64. post_data = {
  65. "point_ids": point_ids,
  66. "begin" : round(from_time.timestamp()),
  67. "end" : round(to_time.timestamp()),
  68. "interval" : interval,
  69. "type" : type_,
  70. }
  71. rem_try = self.max_try
  72. while rem_try > 0:
  73. try:
  74. resp = requests.post(url=self.url, data=json.dumps(post_data),timeout=60)
  75. data = resp.json()['data']
  76. if data:
  77. res = dict()
  78. for point in data:
  79. res[point['point_id']] = pd.DataFrame(point['data'])
  80. if ts2dt_col is not None:
  81. res[point['point_id']] = self.ts2dt(res[point['point_id']], ts2dt_col)
  82. # res[point['point_id']].set_index(['ts'], inplace=True)
  83. if return_type == 'dict':
  84. return res
  85. elif return_type == 'df':
  86. for point in res.keys():
  87. res[point]['point_id'] = point
  88. return pd.concat(res.values(), axis=0)
  89. elif return_type == 'dfcol':
  90. res_df = pd.DataFrame()
  91. for point_id, df_ in res.items():
  92. res_df = pd.concat(
  93. [res_df, df_.set_index('ts').rename(columns={'value': point_id})],
  94. axis=1)
  95. return res_df.reset_index()
  96. else:
  97. rem_try -= 1
  98. time.sleep(self.post_sleep)
  99. except Exception as e:
  100. self.error_print(sys._getframe().f_code.co_name)
  101. rem_try -= 1
  102. time.sleep(self.post_sleep)
  103. if rem_try == 0:
  104. print("\nget_points_data failed")
  105. # 内部函数:打印报错信息
  106. def error_print(self,func_name):
  107. print()
  108. print(f"{self.dtfromts(time.time())}:")
  109. print(f"function {func_name} error!")
  110. print(f"Exception Info:")
  111. e_type, e_value, e_traceback = sys.exc_info()
  112. print(e_type)
  113. print(e_value)
  114. traceback.print_tb(e_traceback)
  115. print()
  116. # 内部函数:将timestamp转换成datetime
  117. def ts2dt(self,df: pd.DataFrame, cols):
  118. for col in cols:
  119. df[col] = pd.Series(map(self.dtfromts, df[col]))
  120. return df
  121. class PointWriter:
  122. upload_url = os.environ.get('DATA_UPLOAD_URL')
  123. addpointdatum_url = urlcfg.get('addpointdatum_url')
  124. # 最大连接重试次数,连接失败等待时间
  125. max_try = 10
  126. post_sleep = 1
  127. def __init__(self,url=None) -> None:
  128. if url is not None:
  129. self.url = url
  130. print(f'使用临时url:{self.url}')
  131. elif self.upload_url is None:
  132. raise Exception('未在环境变量中获取到 DATA_UPLOAD_URL')
  133. else:
  134. self.url = self.upload_url + self.addpointdatum_url
  135. # 上传数据至点位数据库
  136. def ai_add_point_data(self, point_id, ts, value):
  137. """
  138. :param point_id: str 数据点位,需要预先在点位库中创建好
  139. :param timestamp: str 当前时刻的时间 datetime
  140. :param value: str 点位数值
  141. :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
  142. """
  143. url = self.upload_url + self.addpointdatum_url
  144. post_data = [
  145. {
  146. "point_id": str(point_id),
  147. "data":[{"ts": int(ts.timestamp()), "value": str(value)}]
  148. }
  149. ]
  150. rem_try = self.max_try
  151. while rem_try > 0:
  152. try:
  153. resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data),timeout=60)
  154. state = resp.json()['state']
  155. if state == 0:
  156. print(f"\nput {point_id} data success!")
  157. return resp.json()
  158. else:
  159. print(f"strange resp: {resp.json()}")
  160. rem_try -= 1
  161. time.sleep(self.post_sleep)
  162. resp.close()
  163. except Exception as e:
  164. self.error_print(sys._getframe().f_code.co_name)
  165. rem_try -= 1
  166. time.sleep(self.post_sleep)
  167. if rem_try == 0:
  168. print("\nai_add_point_data failed")
  169. return None
  170. # 内部函数:打印报错信息
  171. def error_print(self,func_name):
  172. print()
  173. print(f"{self.dtfromts(time.time())}:")
  174. print(f"function {func_name} error!")
  175. print(f"Exception Info:")
  176. e_type, e_value, e_traceback = sys.exc_info()
  177. print(e_type)
  178. print(e_value)
  179. traceback.print_tb(e_traceback)
  180. print()
  181. # # 内部函数:将datetime转换成timestamp
  182. # def dt2ts(df: pd.DataFrame, cols):
  183. # for col in cols:
  184. # df[col] = pd.Series([dt.timestamp() for dt in df[col]])
  185. # return df
  186. # # 传入一个point id,返回一个dataframe里面包含了ts,point_id,value
  187. # def get_point_data(point_id, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
  188. # """
  189. # :param point_id: string
  190. # :param from_time: datetime 开始时间
  191. # :param to_time: datetime 结束时间
  192. # :param interval: int=1 时间间隔
  193. # :param type_: =3 后端业务要求
  194. # :param ts2dt_col: list timestamp需要转换为datetime的列名
  195. # :return: DataFrame 包含时间
  196. # """
  197. # url = root_url + getpointdata_url
  198. # post_data = {
  199. # "point_id": point_id,
  200. # "begin": round(from_time.timestamp()),
  201. # "end": round(to_time.timestamp()),
  202. # "interval": interval,
  203. # "type": type_,
  204. # }
  205. # rem_try = max_try
  206. # while rem_try > 0:
  207. # try:
  208. # resp = requests.post(url=url, data=json.dumps(post_data))
  209. # ts_data = resp.json()['data']
  210. # if ts_data:
  211. # res = pd.DataFrame(ts_data)
  212. # if ts2dt_col is not None:
  213. # res = ts2dt(res, ts2dt_col)
  214. # # res.set_index(['ts'], inplace=True)
  215. # return res
  216. # else:
  217. # rem_try -= 1
  218. # except Exception as e:
  219. # error_print(sys._getframe().f_code.co_name)
  220. # rem_try -= 1
  221. # time.sleep(post_sleep)
  222. # if rem_try == 0:
  223. # print("\nget_point_data failed")
  224. # # 传入包含多个point id的list,返回一个dataframe里面包含了ts,多个point_id求总后的value
  225. # def sum_points_by_ts(point_ids, from_time, to_time, interval=1, type_=3, ts2dt_col=None):
  226. # """
  227. # :param point_ids: list
  228. # :param from_time: datetime 开始时间
  229. # :param to_time: datetime 结束时间
  230. # :param interval: int=1 时间间隔
  231. # :param type_: =3 后端业务要求
  232. # :param ts2dt_col: list timestamp需要转换为datetime的列名
  233. # :return: DataFrame
  234. # """
  235. # try:
  236. # data_dict = get_points_data(point_ids, from_time, to_time, interval, type_, ts2dt_col)
  237. # for point_id in data_dict.keys():
  238. # data_dict[point_id].set_index(['ts'], inplace=True)
  239. # return sum(_ for _ in data_dict.values()).reset_index()
  240. # except Exception as e:
  241. # error_print(sys._getframe().f_code.co_name)
  242. # time.sleep(post_sleep)
  243. # print("\nsum_points_by_ts failed")
  244. # # 传入包含多个point id的list,返回所有point_id最新的一条数据
  245. # def get_points_run_time(point_ids, ts2dt_col=['ts']):
  246. # """
  247. # :param point_ids: list
  248. # :param ts2dt_col: list=['ts'] timestamp需要转换为datetime的列名
  249. # :return: DataFrame
  250. # """
  251. # url = root_url + getpointsruntime_url
  252. # post_data = {
  253. # "point_ids": point_ids,
  254. # }
  255. # rem_try = max_try
  256. # while rem_try > 0:
  257. # try:
  258. # resp = requests.post(url=url, data=json.dumps(post_data))
  259. # data = resp.json()['data']
  260. # if data:
  261. # res = pd.DataFrame(data)
  262. # if ts2dt_col is not None:
  263. # res = ts2dt(res, ts2dt_col)
  264. # return res
  265. # else:
  266. # rem_try -= 1
  267. # time.sleep(post_sleep)
  268. # except Exception as e:
  269. # error_print(sys._getframe().f_code.co_name)
  270. # rem_try -= 1
  271. # time.sleep(post_sleep)
  272. # if rem_try == 0:
  273. # print("\nget_points_run_time failed")
  274. # # 获取最新的AI数据
  275. # def ai_get_curr_data(
  276. # model_id, model_version, algo, algo_version, module_id,
  277. # ts2dt_col=None,
  278. # ):
  279. # """
  280. # :param model_id: str 模型编号
  281. # :param model_version: str 模型版本
  282. # :param algo: str 算法名称
  283. # :param algo_version: str 算法版本
  284. # :param module_id: str 模块编号
  285. # :param ts2dt_col: list timestamp需要转换为datetime的列名
  286. # :return: datetime, DataFrame 时间戳和数据
  287. # """
  288. # url = upload_url + getcurrdata_url
  289. # post_data = {
  290. # "model_id": model_id,
  291. # "model_version": model_version,
  292. # "algo": algo,
  293. # "algo_version": algo_version,
  294. # "module_id": module_id,
  295. # }
  296. # rem_try = max_try
  297. # while rem_try > 0:
  298. # try:
  299. # resp = requests.post(url=url, data=json.dumps(post_data))
  300. # data_resp = resp.json()['data']
  301. # if data_resp:
  302. # # res = dict()
  303. # # for row in data_resp:
  304. # # res[row['ts']] = pd.DataFrame(json.loads(row['data']))
  305. # ts_dt = dtfromts(data_resp[0]['ts'])
  306. # res = pd.DataFrame(json.loads(data_resp[0]['data']))
  307. # if ts2dt_col is not None:
  308. # res = ts2dt(res, ts2dt_col)
  309. # return ts_dt, res
  310. # else:
  311. # rem_try -= 1
  312. # time.sleep(post_sleep)
  313. # except Exception as e:
  314. # error_print(sys._getframe().f_code.co_name)
  315. # rem_try -= 1
  316. # time.sleep(post_sleep)
  317. # if rem_try == 0:
  318. # print("\nai_get_curr_data failed")
  319. # # 获取历史AI数据
  320. # def ai_get_his_data(
  321. # model_id, model_version, algo, algo_version, module_id,
  322. # from_time: datetime.datetime,
  323. # to_time: datetime.datetime,
  324. # ts2dt_col=None,
  325. # return_type='df'
  326. # ):
  327. # """
  328. # :param model_id: str 模型编号
  329. # :param model_version: str 模型版本
  330. # :param algo: str 算法名称
  331. # :param algo_version: str 算法版本
  332. # :param module_id: str 模块编号
  333. # :param from_time: datetime 开始时间
  334. # :param to_time: datetime 结束时间
  335. # :param ts2dt_col: list timestamp需要转换为datetime的列名
  336. # :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
  337. # 'dict' 返回 {ts: DataFrame} (原始结构)
  338. # 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
  339. # :return: DataFrame里面包含了ts, value
  340. # """
  341. # url = upload_url + gethisdata_url
  342. # post_data = {
  343. # "model_id": model_id,
  344. # "model_version": model_version,
  345. # "algo": algo,
  346. # "algo_version": algo_version,
  347. # "module_id": module_id,
  348. # "begin": round(from_time.timestamp()),
  349. # "end": round(to_time.timestamp()),
  350. # }
  351. # rem_try = max_try
  352. # while rem_try > 0:
  353. # try:
  354. # resp = requests.post(url=url, data=json.dumps(post_data))
  355. # data_resp = resp.json()['data']
  356. # if data_resp:
  357. # res = dict()
  358. # for row in data_resp:
  359. # ts = dtfromts(row['ts'])
  360. # res[ts] = pd.DataFrame(json.loads(row['data']))
  361. # if ts2dt_col is not None:
  362. # res[ts] = ts2dt(res[ts], ts2dt_col)
  363. # if return_type == 'dict':
  364. # return res
  365. # elif return_type == 'df':
  366. # for ts in res.keys():
  367. # res[ts]['ts'] = ts
  368. # return pd.concat(res.values(), axis=0)
  369. # else:
  370. # rem_try -= 1
  371. # time.sleep(post_sleep)
  372. # except Exception as e:
  373. # error_print(sys._getframe().f_code.co_name)
  374. # rem_try -= 1
  375. # time.sleep(post_sleep)
  376. # if rem_try == 0:
  377. # print("\nai_get_his_data failed")
  378. # def ai_get_his_data2(
  379. # model_id, model_version, algo, algo_version, module_id,
  380. # from_time: datetime.datetime,
  381. # to_time: datetime.datetime,
  382. # ts2dt_col=None,
  383. # return_type='df'
  384. # ):
  385. # """
  386. # :param model_id: str 模型编号
  387. # :param model_version: str 模型版本
  388. # :param algo: str 算法名称
  389. # :param algo_version: str 算法版本
  390. # :param module_id: str 模块编号
  391. # :param from_time: datetime 开始时间
  392. # :param to_time: datetime 结束时间
  393. # :param ts2dt_col: list timestamp需要转换为datetime的列名
  394. # :param return_type: str='df' in {'dict', 'df'} 指定返回的数据结构
  395. # 'dict' 返回 {ts: DataFrame} (原始结构)
  396. # 'df' 返回各 DataFrame 加入 ts 时间列再按行拼合后的结果
  397. # :return: DataFrame里面包含了ts, value
  398. # """
  399. # url = root_url + gethisdata_url
  400. # post_data = {
  401. # "model_id": model_id,
  402. # "model_version": model_version,
  403. # "algo": algo,
  404. # "algo_version": algo_version,
  405. # "module_id": module_id,
  406. # "begin": round(from_time.timestamp()),
  407. # "end": round(to_time.timestamp()),
  408. # }
  409. # rem_try = max_try
  410. # while rem_try > 0:
  411. # try:
  412. # resp = requests.post(url=url, data=json.dumps(post_data))
  413. # data_resp = resp.json()['data']
  414. # if data_resp:
  415. # res = dict()
  416. # for row in data_resp:
  417. # ts = dtfromts(row['ts'])
  418. # res[ts] = pd.DataFrame(json.loads(row['data']))
  419. # if ts2dt_col is not None:
  420. # res[ts] = ts2dt(res[ts], ts2dt_col)
  421. # if return_type == 'dict':
  422. # return res
  423. # elif return_type == 'df':
  424. # for ts in res.keys():
  425. # res[ts]['ts'] = ts
  426. # return pd.concat(res.values(), axis=0)
  427. # else:
  428. # rem_try -= 1
  429. # time.sleep(post_sleep)
  430. # except Exception as e:
  431. # error_print(sys._getframe().f_code.co_name)
  432. # rem_try -= 1
  433. # time.sleep(post_sleep)
  434. # if rem_try == 0:
  435. # print("\nai_get_his_data failed")
  436. # # 将 DataFrame 格式数据转换为 jsonlike 的 list 格式数据
  437. # def df2jsonlike(df: pd.DataFrame):
  438. # res = []
  439. # for _, row in df.iterrows():
  440. # res.append(row.to_dict())
  441. # return res
  442. # # 将模型所预测的数据存入数据库
  443. # # 点位数据
  444. # def ai_put_ai_data(
  445. # model_id, model_version, algo, algo_version, module_id,
  446. # ts_dt: datetime.datetime,
  447. # data_df: pd.DataFrame,
  448. # dt2ts_col=None
  449. # ):
  450. # """
  451. # :param model_id: str 模型编号
  452. # :param model_version: str 模型版本
  453. # :param algo: str 算法名称
  454. # :param algo_version: str 算法版本
  455. # :param module_id: str 模块编号
  456. # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
  457. # :param data_df: DataFrame 业务数据
  458. # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
  459. # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
  460. # """
  461. # url = upload_url + putaidata_url
  462. # if dt2ts_col is not None:
  463. # data_df = dt2ts(data_df, dt2ts_col)
  464. # post_data = {
  465. # "model_id": model_id,
  466. # "model_version": model_version,
  467. # "algo": algo,
  468. # "algo_version": algo_version,
  469. # "module_id": module_id,
  470. # "ts": round(ts_dt.timestamp()),
  471. # "data": df2jsonlike(data_df),
  472. # }
  473. # rem_try = max_try
  474. # while rem_try > 0:
  475. # try:
  476. # resp = requests.post(url=url, data=json.dumps(post_data))
  477. # state = resp.json()['state']
  478. # if state == 0:
  479. # print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
  480. # return resp.json()
  481. # else:
  482. # print(f"strange resp: {resp.json()}")
  483. # rem_try -= 1
  484. # time.sleep(post_sleep)
  485. # except Exception as e:
  486. # error_print(sys._getframe().f_code.co_name)
  487. # rem_try -= 1
  488. # time.sleep(post_sleep)
  489. # if rem_try == 0:
  490. # print("\nai_put_ai_data failed")
  491. # return None
  492. # # 将模型所预测的数据存入数据库
  493. # # 能耗基线数据
  494. # def ai_put_ai_data2(
  495. # model_id, model_version, algo, algo_version, module_id,
  496. # ts_dt: datetime.datetime,
  497. # data_df: pd.DataFrame,
  498. # dt2ts_col=None
  499. # ):
  500. # """
  501. # :param model_id: str 模型编号
  502. # :param model_version: str 模型版本
  503. # :param algo: str 算法名称
  504. # :param algo_version: str 算法版本
  505. # :param module_id: str 模块编号
  506. # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
  507. # :param data_df: DataFrame 业务数据
  508. # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
  509. # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
  510. # """
  511. # url = root_url + putaidata_url
  512. # if dt2ts_col is not None:
  513. # data_df = dt2ts(data_df, dt2ts_col)
  514. # post_data = {
  515. # "model_id" : model_id,
  516. # "model_version": model_version,
  517. # "algo" : algo,
  518. # "algo_version" : algo_version,
  519. # "module_id" : module_id,
  520. # "ts" : round(ts_dt.timestamp()),
  521. # "data" : df2jsonlike(data_df) if isinstance(data_df, pd.DataFrame) else data_df,
  522. # }
  523. # rem_try = max_try
  524. # while rem_try > 0:
  525. # try:
  526. # resp = requests.post(url=url, data=json.dumps(post_data))
  527. # state = resp.json()['state']
  528. # if state == 0:
  529. # print(f"\nput {model_id}_{model_version}_{algo}_{algo_version}_{module_id} data success!")
  530. # return resp.json()
  531. # else:
  532. # print(f"strange resp: {resp.json()}")
  533. # rem_try -= 1
  534. # time.sleep(post_sleep)
  535. # except Exception as e:
  536. # error_print(sys._getframe().f_code.co_name)
  537. # rem_try -= 1
  538. # time.sleep(post_sleep)
  539. # if rem_try == 0:
  540. # print("\nai_put_ai_data failed")
  541. # return None
  542. # # 通过接口上传文件
  543. # def ai_upload_ai_file(
  544. # model_id, algo, algo_version, module_id,
  545. # file
  546. # ):
  547. # """
  548. # :param model_id: str 模型编号
  549. # :param model_version: str 模型版本
  550. # :param algo: str 算法名称
  551. # :param algo_version: str 算法版本
  552. # :param module_id: str 模块编号
  553. # :param ts_dt: datetime 业务发生时间,取用时的唯一可用时间戳
  554. # :param data_df: DataFrame 业务数据
  555. # :param dt2ts_col: list 业务数据中 datetime 列转换为 timestamp
  556. # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
  557. # """
  558. # url = upload_url + uploadaifile_url
  559. # post_data = {
  560. # "model_id" : str(model_id),
  561. # "algo" : str(algo),
  562. # "algo_version": str(algo_version),
  563. # "module_id" : str(module_id),
  564. # "file" : file
  565. # }
  566. # multipart_encoder = MultipartEncoder(
  567. # fields=post_data
  568. # )
  569. # rem_try = max_try
  570. # while rem_try > 0:
  571. # try:
  572. # resp = requests.post(url=url,headers={'Content-Type': multipart_encoder.content_type}, data=multipart_encoder)
  573. # state = resp.json()['state']
  574. # if state == 0:
  575. # print(f"\nput {model_id}_{algo}_{algo_version}_{module_id} data success!")
  576. # return resp.json()
  577. # else:
  578. # print(f"strange resp: {resp.json()}")
  579. # rem_try -= 1
  580. # time.sleep(post_sleep)
  581. # resp.close()
  582. # except Exception as e:
  583. # error_print(sys._getframe().f_code.co_name)
  584. # rem_try -= 1
  585. # time.sleep(post_sleep)
  586. # if rem_try == 0:
  587. # print("\nai_upload_ai_file failed")
  588. # return None
  589. # # 上传数据至点位数据库
  590. # def ai_add_point_data(
  591. # point_id, ts, value
  592. # ):
  593. # """
  594. # :param point_id: str 数据点位,需要预先在点位库中创建好
  595. # :param timestamp: str 当前时刻的时间 datetime
  596. # :param value: str 点位数值
  597. # :return: dict: 数据服务返回值,可用 dict['state'] == 0 判断存数据是否成功
  598. # """
  599. # url = root_url + addpointdatum_url
  600. # post_data = [{"point_id": str(point_id),
  601. # "data":[{"ts": int(ts.timestamp()),
  602. # "value": str(value)}]}]
  603. # rem_try = max_try
  604. # while rem_try > 0:
  605. # try:
  606. # resp = requests.post(url=url,headers = {'Content-Type': 'application/json; charset=UTF-8'}, data = json.dumps(post_data))
  607. # state = resp.json()['state']
  608. # if state == 0:
  609. # print(f"\nput {point_id} data success!")
  610. # return resp.json()
  611. # else:
  612. # print(f"strange resp: {resp.json()}")
  613. # rem_try -= 1
  614. # time.sleep(post_sleep)
  615. # resp.close()
  616. # except Exception as e:
  617. # error_print(sys._getframe().f_code.co_name)
  618. # rem_try -= 1
  619. # time.sleep(post_sleep)
  620. # if rem_try == 0:
  621. # print("\nai_add_point_data failed")
  622. # return None