modbus_service.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from __future__ import annotations
  2. from collections.abc import Callable
  3. from typing import Any
  4. from pymodbus.client import ModbusTcpClient
  5. from app.response import response_payload
  6. from app.schemas.modbus import ModbusPointReadRequest, ModbusRawReadRequest
  7. from app.services.modbus_codec import convert_register_value, register_quantity_for_type
  8. class ModbusCommunicationError(RuntimeError):
  9. def __init__(self, message: str) -> None:
  10. super().__init__(message)
  11. class PacketTrace:
  12. def __init__(self) -> None:
  13. self._sequence = 0
  14. self._messages: list[str] = []
  15. def capture(self, sending: bool, data: bytes) -> bytes:
  16. self._sequence += 1
  17. direction = "Tx" if sending else "Rx"
  18. payload = " ".join(f"{value:02X}" for value in data)
  19. self._messages.append(f"{direction}:{self._sequence:03d}-{payload}")
  20. return data
  21. def as_list(self) -> list[str]:
  22. return list(self._messages)
  23. def calculate_protocol_address(address: int, address_base: int) -> int:
  24. return address + address_base
  25. def function_name(function_code: int) -> str:
  26. return {
  27. 1: "read_coils",
  28. 2: "read_discrete_inputs",
  29. 3: "read_holding_registers",
  30. 4: "read_input_registers",
  31. }[function_code]
  32. def create_client(ip: str, port: int, trace_packet: Callable[[bool, bytes], bytes]) -> ModbusTcpClient:
  33. client = ModbusTcpClient(host=ip, port=port, trace_packet=trace_packet)
  34. if not client.connect():
  35. raise ModbusCommunicationError(f"failed to connect to {ip}:{port}")
  36. return client
  37. def read_modbus_values(
  38. client: ModbusTcpClient,
  39. function_code: int,
  40. protocol_address: int,
  41. quantity: int,
  42. slave_id: int,
  43. ) -> list[int] | list[bool]:
  44. if function_code == 1:
  45. response = client.read_coils(address=protocol_address, count=quantity, device_id=slave_id)
  46. elif function_code == 2:
  47. response = client.read_discrete_inputs(address=protocol_address, count=quantity, device_id=slave_id)
  48. elif function_code == 3:
  49. response = client.read_holding_registers(address=protocol_address, count=quantity, device_id=slave_id)
  50. elif function_code == 4:
  51. response = client.read_input_registers(address=protocol_address, count=quantity, device_id=slave_id)
  52. else:
  53. raise ValueError("function_code must be one of 1, 2, 3, 4")
  54. if response.isError():
  55. raise ModbusCommunicationError(str(response))
  56. if function_code in {1, 2}:
  57. return [bool(value) for value in response.bits[:quantity]]
  58. return [int(value) for value in response.registers[:quantity]]
  59. def device_payload(request: ModbusRawReadRequest | ModbusPointReadRequest) -> dict[str, Any]:
  60. return {
  61. "device_type": request.device_type,
  62. "ip": request.ip,
  63. "port": request.port,
  64. "word_byte_order": request.word_byte_order,
  65. "address_base": request.address_base,
  66. "slave_id": request.slave_id,
  67. }
  68. def read_raw(request: ModbusRawReadRequest) -> dict[str, Any]:
  69. trace = PacketTrace()
  70. client: ModbusTcpClient | None = None
  71. try:
  72. client = create_client(request.ip, request.port, trace.capture)
  73. read_modbus_values(
  74. client=client,
  75. function_code=request.read.function_code,
  76. protocol_address=calculate_protocol_address(request.read.address, request.address_base),
  77. quantity=request.read.quantity,
  78. slave_id=request.slave_id,
  79. )
  80. except ModbusCommunicationError as exc:
  81. return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()})
  82. except Exception as exc:
  83. return response_payload(1, str(exc), {"device": device_payload(request), "communication": trace.as_list()})
  84. finally:
  85. if client is not None:
  86. client.close()
  87. return response_payload(0, "success", {"device": device_payload(request), "communication": trace.as_list()})
  88. def read_points(request: ModbusPointReadRequest) -> dict[str, Any]:
  89. trace = PacketTrace()
  90. client: ModbusTcpClient | None = None
  91. points: list[dict[str, Any]] = []
  92. try:
  93. client = create_client(request.ip, request.port, trace.capture)
  94. for point in request.points:
  95. quantity = register_quantity_for_type(point.type)
  96. protocol_address = calculate_protocol_address(point.address, request.address_base)
  97. raw_values = read_modbus_values(
  98. client=client,
  99. function_code=point.function_code,
  100. protocol_address=protocol_address,
  101. quantity=quantity,
  102. slave_id=request.slave_id,
  103. )
  104. if point.function_code in {1, 2}:
  105. value = bool(raw_values[0])
  106. else:
  107. value = convert_register_value(
  108. registers=[int(value) for value in raw_values],
  109. point_type=point.type,
  110. word_byte_order=request.word_byte_order,
  111. bit=point.bit,
  112. )
  113. result = {
  114. "function_code": point.function_code,
  115. "address": point.address,
  116. "type": point.type,
  117. "value": value,
  118. }
  119. if point.bit is not None:
  120. result["bit"] = point.bit
  121. points.append(result)
  122. except ModbusCommunicationError as exc:
  123. return response_payload(1, str(exc), {"device": device_payload(request), "points": points})
  124. except Exception as exc:
  125. return response_payload(1, str(exc), {"device": device_payload(request), "points": points})
  126. finally:
  127. if client is not None:
  128. client.close()
  129. return response_payload(0, "success", {"device": device_payload(request), "points": points})