| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- from __future__ import annotations
- from collections.abc import Callable
- from typing import Any
- from pymodbus.client import ModbusTcpClient
- from app.response import response_payload
- from app.schemas.modbus import ModbusPointReadRequest, ModbusRawReadRequest
- from app.services.modbus_codec import convert_register_value, register_quantity_for_type
- class ModbusCommunicationError(RuntimeError):
- def __init__(self, message: str) -> None:
- super().__init__(message)
- class PacketTrace:
- def __init__(self) -> None:
- self._sequence = 0
- self._messages: list[str] = []
- def capture(self, sending: bool, data: bytes) -> bytes:
- self._sequence += 1
- direction = "Tx" if sending else "Rx"
- payload = " ".join(f"{value:02X}" for value in data)
- self._messages.append(f"{direction}:{self._sequence:03d}-{payload}")
- return data
- def as_list(self) -> list[str]:
- return list(self._messages)
- def calculate_protocol_address(address: int, address_base: int) -> int:
- return address + address_base
- def function_name(function_code: int) -> str:
- return {
- 1: "read_coils",
- 2: "read_discrete_inputs",
- 3: "read_holding_registers",
- 4: "read_input_registers",
- }[function_code]
- def create_client(ip: str, port: int, trace_packet: Callable[[bool, bytes], bytes]) -> ModbusTcpClient:
- client = ModbusTcpClient(host=ip, port=port, trace_packet=trace_packet)
- if not client.connect():
- raise ModbusCommunicationError(f"failed to connect to {ip}:{port}")
- return client
- def read_modbus_values(
- client: ModbusTcpClient,
- function_code: int,
- protocol_address: int,
- quantity: int,
- slave_id: int,
- ) -> list[int] | list[bool]:
- if function_code == 1:
- response = client.read_coils(address=protocol_address, count=quantity, device_id=slave_id)
- elif function_code == 2:
- response = client.read_discrete_inputs(address=protocol_address, count=quantity, device_id=slave_id)
- elif function_code == 3:
- response = client.read_holding_registers(address=protocol_address, count=quantity, device_id=slave_id)
- elif function_code == 4:
- response = client.read_input_registers(address=protocol_address, count=quantity, device_id=slave_id)
- else:
- raise ValueError("function_code must be one of 1, 2, 3, 4")
- if response.isError():
- raise ModbusCommunicationError(str(response))
- if function_code in {1, 2}:
- return [bool(value) for value in response.bits[:quantity]]
- return [int(value) for value in response.registers[:quantity]]
- def device_payload(request: ModbusRawReadRequest | ModbusPointReadRequest) -> dict[str, Any]:
- return {
- "device_type": request.device_type,
- "ip": request.ip,
- "port": request.port,
- "word_byte_order": request.word_byte_order,
- "address_base": request.address_base,
- "slave_id": request.slave_id,
- }
- def read_raw(request: ModbusRawReadRequest) -> dict[str, Any]:
- trace = PacketTrace()
- client: ModbusTcpClient | None = None
- try:
- client = create_client(request.ip, request.port, trace.capture)
- read_modbus_values(
- client=client,
- function_code=request.read.function_code,
- protocol_address=calculate_protocol_address(request.read.address, request.address_base),
- quantity=request.read.quantity,
- slave_id=request.slave_id,
- )
- except ModbusCommunicationError as exc:
- return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()})
- except Exception as exc:
- return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()})
- finally:
- if client is not None:
- client.close()
- return response_payload(0, "success", {"device": device_payload(request), "communication": trace.as_list()})
- def read_points(request: ModbusPointReadRequest) -> dict[str, Any]:
- trace = PacketTrace()
- client: ModbusTcpClient | None = None
- points: list[dict[str, Any]] = []
- try:
- client = create_client(request.ip, request.port, trace.capture)
- for point in request.points:
- quantity = register_quantity_for_type(point.type)
- protocol_address = calculate_protocol_address(point.address, request.address_base)
- raw_values = read_modbus_values(
- client=client,
- function_code=point.function_code,
- protocol_address=protocol_address,
- quantity=quantity,
- slave_id=request.slave_id,
- )
- if point.function_code in {1, 2}:
- value = bool(raw_values[0])
- else:
- value = convert_register_value(
- registers=[int(value) for value in raw_values],
- point_type=point.type,
- word_byte_order=request.word_byte_order,
- bit=point.bit,
- )
- result = {
- "function_code": point.function_code,
- "address": point.address,
- "type": point.type,
- "value": value,
- }
- if point.bit is not None:
- result["bit"] = point.bit
- points.append(result)
- except ModbusCommunicationError as exc:
- return response_payload(1, str(exc), {"device": device_payload(request), "points": points})
- except Exception as exc:
- return response_payload(1, str(exc), {"device": device_payload(request), "points": points})
- finally:
- if client is not None:
- client.close()
- return response_payload(0, "success", {"device": device_payload(request), "points": points})
|