common.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import json
  2. import os
  3. import re
  4. import urllib.error
  5. import urllib.request
  6. BASE_URL = os.getenv("DATA_COLLECTOR_BASE_URL", "http://127.0.0.1:8000").rstrip("/")
  7. REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "20"))
  8. DEVICE_IP = os.getenv("MODBUS_DEVICE_IP", "192.168.75.240")
  9. MODBUS_TCP_PORT = int(os.getenv("MODBUS_TCP_PORT", "505"))
  10. SLAVE_ID = int(os.getenv("MODBUS_SLAVE_ID", "1"))
  11. FUNC_CODE_COIL = 1
  12. FUNC_CODE_DISCRETE_INPUT = 2
  13. FUNC_CODE_HOLDING_REGISTER = 3
  14. FUNC_CODE_INPUT_REGISTER = 4
  15. REGISTER_POINT_SAMPLES = [
  16. {"address": 0, "type": "bool", "bit": 0},
  17. {"address": 1, "type": "float32"},
  18. {"address": 3, "type": "float64"},
  19. {"address": 7, "type": "int16"},
  20. {"address": 8, "type": "uint16"},
  21. {"address": 9, "type": "int32"},
  22. {"address": 11, "type": "uint32"},
  23. {"address": 13, "type": "int64"},
  24. {"address": 17, "type": "uint64"},
  25. ]
  26. FRAME_PATTERN = re.compile(r"^(Tx|Rx):\d{3}-[0-9A-F]{2}(?: [0-9A-F]{2})*$")
  27. def post_json(path: str, payload: dict) -> tuple[int, dict]:
  28. data = json.dumps(payload).encode("utf-8")
  29. request = urllib.request.Request(
  30. f"{BASE_URL}{path}",
  31. data=data,
  32. headers={"Content-Type": "application/json"},
  33. method="POST",
  34. )
  35. try:
  36. with urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT) as response:
  37. body = response.read().decode("utf-8")
  38. return response.status, json.loads(body)
  39. except urllib.error.HTTPError as exc:
  40. body = exc.read().decode("utf-8")
  41. return exc.code, json.loads(body)
  42. def raw_read_payload(function_code: int, address: int, quantity: int) -> dict:
  43. return {
  44. "device_type": "ModbusTCP",
  45. "ip": DEVICE_IP,
  46. "port": MODBUS_TCP_PORT,
  47. "word_byte_order": "ABCD",
  48. "address_base": 0,
  49. "slave_id": SLAVE_ID,
  50. "read": {
  51. "function_code": function_code,
  52. "address": address,
  53. "quantity": quantity,
  54. },
  55. }
  56. def point_read_payload(points: list[dict]) -> dict:
  57. return {
  58. "device_type": "ModbusTCP",
  59. "ip": DEVICE_IP,
  60. "port": MODBUS_TCP_PORT,
  61. "word_byte_order": "ABCD",
  62. "address_base": 0,
  63. "slave_id": SLAVE_ID,
  64. "points": points,
  65. }
  66. def all_type_point_specs() -> list[dict]:
  67. points = []
  68. for function_code in [FUNC_CODE_COIL, FUNC_CODE_DISCRETE_INPUT]:
  69. for address in [0, 1, 16, 17]:
  70. points.append({"function_code": function_code, "address": address, "type": "bool"})
  71. for function_code in [FUNC_CODE_HOLDING_REGISTER, FUNC_CODE_INPUT_REGISTER]:
  72. for sample in REGISTER_POINT_SAMPLES:
  73. point = {"function_code": function_code, "address": sample["address"], "type": sample["type"]}
  74. if "bit" in sample:
  75. point["bit"] = sample["bit"]
  76. points.append(point)
  77. return points
  78. def parse_frame(frame: str) -> tuple[str, list[int]]:
  79. direction, payload = frame.split("-", 1)
  80. return direction[:2], [int(item, 16) for item in payload.split()]
  81. def assert_response_contract(testcase, status: int, data: dict) -> None:
  82. testcase.assertEqual(200, status, data)
  83. testcase.assertIn("code", data)
  84. testcase.assertIn("msg", data)
  85. testcase.assertIn("data", data)
  86. testcase.assertIsInstance(data["data"], dict)
  87. def assert_frame_format(testcase, communication: list[str]) -> None:
  88. testcase.assertTrue(communication, "communication should not be empty")
  89. for frame in communication:
  90. testcase.assertRegex(frame, FRAME_PATTERN)