modbus_codec.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from __future__ import annotations
  2. import struct
  3. from typing import Any
  4. def register_quantity_for_type(point_type: str) -> int:
  5. if point_type in {"bool", "int16", "uint16"}:
  6. return 1
  7. if point_type in {"int64", "uint64", "float64"}:
  8. return 4
  9. return 2
  10. def ordered_bytes(registers: list[int], word_byte_order: str) -> bytes:
  11. raw = b"".join(register.to_bytes(2, byteorder="big", signed=False) for register in registers)
  12. words = [raw[index : index + 2] for index in range(0, len(raw), 2)]
  13. if word_byte_order in {"BADC", "DCBA"}:
  14. words = [word[::-1] for word in words]
  15. if word_byte_order in {"CDAB", "DCBA"}:
  16. words = list(reversed(words))
  17. return b"".join(words)
  18. def convert_register_value(registers: list[int], point_type: str, word_byte_order: str, bit: int | None) -> Any:
  19. if point_type == "bool":
  20. value = registers[0]
  21. if bit is None:
  22. return value != 0
  23. return ((value >> bit) & 1) == 1
  24. payload = ordered_bytes(registers, word_byte_order)
  25. if point_type == "int16":
  26. return int.from_bytes(payload, byteorder="big", signed=True)
  27. if point_type == "uint16":
  28. return int.from_bytes(payload, byteorder="big", signed=False)
  29. if point_type == "int32":
  30. return int.from_bytes(payload, byteorder="big", signed=True)
  31. if point_type == "uint32":
  32. return int.from_bytes(payload, byteorder="big", signed=False)
  33. if point_type == "int64":
  34. return int.from_bytes(payload, byteorder="big", signed=True)
  35. if point_type == "uint64":
  36. return int.from_bytes(payload, byteorder="big", signed=False)
  37. if point_type == "float32":
  38. return struct.unpack(">f", payload)[0]
  39. if point_type == "float64":
  40. return struct.unpack(">d", payload)[0]
  41. raise ValueError(f"unsupported point type: {point_type}")