modbus.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. from ipaddress import ip_address
  2. from typing import Literal
  3. from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
  4. SUPPORTED_FUNCTION_CODES = {1, 2, 3, 4}
  5. SUPPORTED_POINT_TYPES = {
  6. "bool",
  7. "int16",
  8. "uint16",
  9. "int32",
  10. "uint32",
  11. "int64",
  12. "uint64",
  13. "float32",
  14. "float64",
  15. }
  16. class ModbusBaseRequest(BaseModel):
  17. model_config = ConfigDict(extra="forbid")
  18. device_type: Literal["ModbusTCP"] = "ModbusTCP"
  19. ip: str = Field(min_length=1)
  20. port: int = Field(ge=1, le=65535)
  21. word_byte_order: Literal["ABCD", "BADC", "CDAB", "DCBA"] = "ABCD"
  22. address_base: int = Field(default=0, ge=0)
  23. slave_id: int = Field(default=1, ge=0, le=247)
  24. @field_validator("ip")
  25. @classmethod
  26. def validate_ip(cls, value: str) -> str:
  27. try:
  28. ip_address(value)
  29. except ValueError as exc:
  30. raise ValueError("ip must be a valid IP address") from exc
  31. return value
  32. class ModbusReadSpec(BaseModel):
  33. model_config = ConfigDict(extra="forbid")
  34. function_code: int
  35. address: int = Field(ge=0)
  36. quantity: int = Field(ge=1, le=125)
  37. @field_validator("function_code")
  38. @classmethod
  39. def validate_function_code(cls, value: int) -> int:
  40. if value not in SUPPORTED_FUNCTION_CODES:
  41. raise ValueError("function_code must be one of 1, 2, 3, 4")
  42. return value
  43. class ModbusRawReadRequest(ModbusBaseRequest):
  44. read: ModbusReadSpec
  45. class ModbusPointSpec(BaseModel):
  46. model_config = ConfigDict(extra="forbid")
  47. function_code: int
  48. address: int = Field(ge=0)
  49. type: str
  50. bit: int | None = Field(default=None, ge=0, le=15)
  51. @field_validator("function_code")
  52. @classmethod
  53. def validate_function_code(cls, value: int) -> int:
  54. if value not in SUPPORTED_FUNCTION_CODES:
  55. raise ValueError("function_code must be one of 1, 2, 3, 4")
  56. return value
  57. @field_validator("type")
  58. @classmethod
  59. def validate_type(cls, value: str) -> str:
  60. normalized = value.lower()
  61. if normalized not in SUPPORTED_POINT_TYPES:
  62. raise ValueError("type must be one of bool, int16, uint16, int32, uint32, int64, uint64, float32, float64")
  63. return normalized
  64. @model_validator(mode="after")
  65. def validate_point(self) -> "ModbusPointSpec":
  66. if self.function_code in {1, 2} and self.type != "bool":
  67. raise ValueError("function_code 1 and 2 only support bool points")
  68. if self.function_code in {1, 2} and self.bit is not None:
  69. raise ValueError("bit is only supported for register points")
  70. return self
  71. class ModbusPointReadRequest(ModbusBaseRequest):
  72. points: list[ModbusPointSpec] = Field(min_length=1)