| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- from ipaddress import ip_address
- from typing import Literal
- from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
- SUPPORTED_FUNCTION_CODES = {1, 2, 3, 4}
- SUPPORTED_POINT_TYPES = {
- "bool",
- "int16",
- "uint16",
- "int32",
- "uint32",
- "int64",
- "uint64",
- "float32",
- "float64",
- }
- class ModbusBaseRequest(BaseModel):
- model_config = ConfigDict(extra="forbid")
- device_type: Literal["ModbusTCP"] = "ModbusTCP"
- ip: str = Field(min_length=1)
- port: int = Field(ge=1, le=65535)
- word_byte_order: Literal["ABCD", "BADC", "CDAB", "DCBA"] = "ABCD"
- address_base: int = Field(default=0, ge=0)
- slave_id: int = Field(ge=0, le=247)
- @field_validator("ip")
- @classmethod
- def validate_ip(cls, value: str) -> str:
- try:
- ip_address(value)
- except ValueError as exc:
- raise ValueError("ip must be a valid IP address") from exc
- return value
- class ModbusReadSpec(BaseModel):
- model_config = ConfigDict(extra="forbid")
- function_code: int
- address: int = Field(ge=0)
- quantity: int = Field(ge=1, le=125)
- @field_validator("function_code")
- @classmethod
- def validate_function_code(cls, value: int) -> int:
- if value not in SUPPORTED_FUNCTION_CODES:
- raise ValueError("function_code must be one of 1, 2, 3, 4")
- return value
- class ModbusRawReadRequest(ModbusBaseRequest):
- read: ModbusReadSpec
- class ModbusPointSpec(BaseModel):
- model_config = ConfigDict(extra="forbid")
- function_code: int
- address: int = Field(ge=0)
- type: str
- bit: int | None = Field(default=None, ge=0, le=15)
- @field_validator("function_code")
- @classmethod
- def validate_function_code(cls, value: int) -> int:
- if value not in SUPPORTED_FUNCTION_CODES:
- raise ValueError("function_code must be one of 1, 2, 3, 4")
- return value
- @field_validator("type")
- @classmethod
- def validate_type(cls, value: str) -> str:
- normalized = value.lower()
- if normalized not in SUPPORTED_POINT_TYPES:
- raise ValueError("type must be one of bool, int16, uint16, int32, uint32, int64, uint64, float32, float64")
- return normalized
- @model_validator(mode="after")
- def validate_point(self) -> "ModbusPointSpec":
- if self.function_code in {1, 2} and self.type != "bool":
- raise ValueError("function_code 1 and 2 only support bool points")
- if self.function_code in {1, 2} and self.bit is not None:
- raise ValueError("bit is only supported for register points")
- return self
- class ModbusPointReadRequest(ModbusBaseRequest):
- points: list[ModbusPointSpec] = Field(min_length=1)
|