import json import os import re import urllib.error import urllib.request BASE_URL = os.getenv("DATA_COLLECTOR_BASE_URL", "http://127.0.0.1:8000").rstrip("/") REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "20")) DEVICE_IP = os.getenv("MODBUS_DEVICE_IP", "192.168.75.240") MODBUS_TCP_PORT = int(os.getenv("MODBUS_TCP_PORT", "505")) SLAVE_ID = int(os.getenv("MODBUS_SLAVE_ID", "1")) FUNC_CODE_COIL = 1 FUNC_CODE_DISCRETE_INPUT = 2 FUNC_CODE_HOLDING_REGISTER = 3 FUNC_CODE_INPUT_REGISTER = 4 REGISTER_POINT_SAMPLES = [ {"address": 0, "type": "bool", "bit": 0}, {"address": 1, "type": "float32"}, {"address": 3, "type": "float64"}, {"address": 7, "type": "int16"}, {"address": 8, "type": "uint16"}, {"address": 9, "type": "int32"}, {"address": 11, "type": "uint32"}, {"address": 13, "type": "int64"}, {"address": 17, "type": "uint64"}, ] FRAME_PATTERN = re.compile(r"^(Tx|Rx):\d{3}-[0-9A-F]{2}(?: [0-9A-F]{2})*$") def post_json(path: str, payload: dict) -> tuple[int, dict]: data = json.dumps(payload).encode("utf-8") request = urllib.request.Request( f"{BASE_URL}{path}", data=data, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(request, timeout=REQUEST_TIMEOUT) as response: body = response.read().decode("utf-8") return response.status, json.loads(body) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8") return exc.code, json.loads(body) def raw_read_payload(function_code: int, address: int, quantity: int) -> dict: return { "device_type": "ModbusTCP", "ip": DEVICE_IP, "port": MODBUS_TCP_PORT, "word_byte_order": "ABCD", "address_base": 0, "slave_id": SLAVE_ID, "read": { "function_code": function_code, "address": address, "quantity": quantity, }, } def point_read_payload(points: list[dict]) -> dict: return { "device_type": "ModbusTCP", "ip": DEVICE_IP, "port": MODBUS_TCP_PORT, "word_byte_order": "ABCD", "address_base": 0, "slave_id": SLAVE_ID, "points": points, } def all_type_point_specs() -> list[dict]: points = [] for function_code in [FUNC_CODE_COIL, FUNC_CODE_DISCRETE_INPUT]: for address in [0, 1, 16, 17]: points.append({"function_code": function_code, "address": address, "type": "bool"}) for function_code in [FUNC_CODE_HOLDING_REGISTER, FUNC_CODE_INPUT_REGISTER]: for sample in REGISTER_POINT_SAMPLES: point = {"function_code": function_code, "address": sample["address"], "type": sample["type"]} if "bit" in sample: point["bit"] = sample["bit"] points.append(point) return points def parse_frame(frame: str) -> tuple[str, list[int]]: direction, payload = frame.split("-", 1) return direction[:2], [int(item, 16) for item in payload.split()] def assert_response_contract(testcase, status: int, data: dict) -> None: testcase.assertEqual(200, status, data) testcase.assertIn("code", data) testcase.assertIn("msg", data) testcase.assertIn("data", data) testcase.assertIsInstance(data["data"], dict) def assert_frame_format(testcase, communication: list[str]) -> None: testcase.assertTrue(communication, "communication should not be empty") for frame in communication: testcase.assertRegex(frame, FRAME_PATTERN)