| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import json
- import os
- import re
- import urllib.error
- import urllib.request
- BASE_URL = os.getenv("DATA_COLLECTOR_BASE_URL", "http://127.0.0.1:8000").rstrip("/")
- REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "20"))
- DEVICE_IP = os.getenv("MODBUS_DEVICE_IP", "192.168.75.240")
- MODBUS_TCP_PORT = int(os.getenv("MODBUS_TCP_PORT", "505"))
- SLAVE_ID = int(os.getenv("MODBUS_SLAVE_ID", "1"))
- FUNC_CODE_COIL = 1
- FUNC_CODE_DISCRETE_INPUT = 2
- FUNC_CODE_HOLDING_REGISTER = 3
- FUNC_CODE_INPUT_REGISTER = 4
- REGISTER_POINT_SAMPLES = [
- {"address": 0, "type": "bool", "bit": 0},
- {"address": 1, "type": "float32"},
- {"address": 3, "type": "float64"},
- {"address": 7, "type": "int16"},
- {"address": 8, "type": "uint16"},
- {"address": 9, "type": "int32"},
- {"address": 11, "type": "uint32"},
- {"address": 13, "type": "int64"},
- {"address": 17, "type": "uint64"},
- ]
- FRAME_PATTERN = re.compile(r"^(Tx|Rx):\d{3}-[0-9A-F]{2}(?: [0-9A-F]{2})*$")
- def post_json(path: str, payload: dict) -> tuple[int, dict]:
- data = json.dumps(payload).encode("utf-8")
- request = urllib.request.Request(
- f"{BASE_URL}{path}",
- data=data,
- headers={"Content-Type": "application/json"},
- method="POST",
- )
- try:
- with urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT) as response:
- body = response.read().decode("utf-8")
- return response.status, json.loads(body)
- except urllib.error.HTTPError as exc:
- body = exc.read().decode("utf-8")
- return exc.code, json.loads(body)
- def raw_read_payload(function_code: int, address: int, quantity: int) -> dict:
- return {
- "device_type": "ModbusTCP",
- "ip": DEVICE_IP,
- "port": MODBUS_TCP_PORT,
- "word_byte_order": "ABCD",
- "address_base": 0,
- "slave_id": SLAVE_ID,
- "read": {
- "function_code": function_code,
- "address": address,
- "quantity": quantity,
- },
- }
- def point_read_payload(points: list[dict]) -> dict:
- return {
- "device_type": "ModbusTCP",
- "ip": DEVICE_IP,
- "port": MODBUS_TCP_PORT,
- "word_byte_order": "ABCD",
- "address_base": 0,
- "slave_id": SLAVE_ID,
- "points": points,
- }
- def all_type_point_specs() -> list[dict]:
- points = []
- for function_code in [FUNC_CODE_COIL, FUNC_CODE_DISCRETE_INPUT]:
- for address in [0, 1, 16, 17]:
- points.append({"function_code": function_code, "address": address, "type": "bool"})
- for function_code in [FUNC_CODE_HOLDING_REGISTER, FUNC_CODE_INPUT_REGISTER]:
- for sample in REGISTER_POINT_SAMPLES:
- point = {"function_code": function_code, "address": sample["address"], "type": sample["type"]}
- if "bit" in sample:
- point["bit"] = sample["bit"]
- points.append(point)
- return points
- def parse_frame(frame: str) -> tuple[str, list[int]]:
- direction, payload = frame.split("-", 1)
- return direction[:2], [int(item, 16) for item in payload.split()]
- def assert_response_contract(testcase, status: int, data: dict) -> None:
- testcase.assertEqual(200, status, data)
- testcase.assertIn("code", data)
- testcase.assertIn("msg", data)
- testcase.assertIn("data", data)
- testcase.assertIsInstance(data["data"], dict)
- def assert_frame_format(testcase, communication: list[str]) -> None:
- testcase.assertTrue(communication, "communication should not be empty")
- for frame in communication:
- testcase.assertRegex(frame, FRAME_PATTERN)
|