from __future__ import annotations from collections.abc import Callable from typing import Any from pymodbus.client import ModbusTcpClient from app.response import response_payload from app.schemas.modbus import ModbusPointReadRequest, ModbusRawReadRequest from app.services.modbus_codec import convert_register_value, register_quantity_for_type class ModbusCommunicationError(RuntimeError): def __init__(self, message: str) -> None: super().__init__(message) class PacketTrace: def __init__(self) -> None: self._sequence = 0 self._messages: list[str] = [] def capture(self, sending: bool, data: bytes) -> bytes: self._sequence += 1 direction = "Tx" if sending else "Rx" payload = " ".join(f"{value:02X}" for value in data) self._messages.append(f"{direction}:{self._sequence:03d}-{payload}") return data def as_list(self) -> list[str]: return list(self._messages) def calculate_protocol_address(address: int, address_base: int) -> int: return address + address_base def function_name(function_code: int) -> str: return { 1: "read_coils", 2: "read_discrete_inputs", 3: "read_holding_registers", 4: "read_input_registers", }[function_code] def create_client(ip: str, port: int, trace_packet: Callable[[bool, bytes], bytes]) -> ModbusTcpClient: client = ModbusTcpClient(host=ip, port=port, trace_packet=trace_packet) if not client.connect(): raise ModbusCommunicationError(f"failed to connect to {ip}:{port}") return client def read_modbus_values( client: ModbusTcpClient, function_code: int, protocol_address: int, quantity: int, slave_id: int, ) -> list[int] | list[bool]: if function_code == 1: response = client.read_coils(address=protocol_address, count=quantity, device_id=slave_id) elif function_code == 2: response = client.read_discrete_inputs(address=protocol_address, count=quantity, device_id=slave_id) elif function_code == 3: response = client.read_holding_registers(address=protocol_address, count=quantity, device_id=slave_id) elif function_code == 4: response = client.read_input_registers(address=protocol_address, count=quantity, device_id=slave_id) else: raise ValueError("function_code must be one of 1, 2, 3, 4") if response.isError(): raise ModbusCommunicationError(str(response)) if function_code in {1, 2}: return [bool(value) for value in response.bits[:quantity]] return [int(value) for value in response.registers[:quantity]] def device_payload(request: ModbusRawReadRequest | ModbusPointReadRequest) -> dict[str, Any]: return { "device_type": request.device_type, "ip": request.ip, "port": request.port, "word_byte_order": request.word_byte_order, "address_base": request.address_base, "slave_id": request.slave_id, } def read_raw(request: ModbusRawReadRequest) -> dict[str, Any]: trace = PacketTrace() client: ModbusTcpClient | None = None try: client = create_client(request.ip, request.port, trace.capture) read_modbus_values( client=client, function_code=request.read.function_code, protocol_address=calculate_protocol_address(request.read.address, request.address_base), quantity=request.read.quantity, slave_id=request.slave_id, ) except ModbusCommunicationError as exc: return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()}) except Exception as exc: return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()}) finally: if client is not None: client.close() return response_payload(0, "success", {"device": device_payload(request), "communication": trace.as_list()}) def read_points(request: ModbusPointReadRequest) -> dict[str, Any]: trace = PacketTrace() client: ModbusTcpClient | None = None points: list[dict[str, Any]] = [] try: client = create_client(request.ip, request.port, trace.capture) for point in request.points: quantity = register_quantity_for_type(point.type) protocol_address = calculate_protocol_address(point.address, request.address_base) raw_values = read_modbus_values( client=client, function_code=point.function_code, protocol_address=protocol_address, quantity=quantity, slave_id=request.slave_id, ) if point.function_code in {1, 2}: value = bool(raw_values[0]) else: value = convert_register_value( registers=[int(value) for value in raw_values], point_type=point.type, word_byte_order=request.word_byte_order, bit=point.bit, ) result = { "function_code": point.function_code, "address": point.address, "type": point.type, "value": value, } if point.bit is not None: result["bit"] = point.bit points.append(result) except ModbusCommunicationError as exc: return response_payload(1, str(exc), {"device": device_payload(request), "points": points}) except Exception as exc: return response_payload(1, str(exc), {"device": device_payload(request), "points": points}) finally: if client is not None: client.close() return response_payload(0, "success", {"device": device_payload(request), "points": points})