from ipaddress import ip_address from typing import Literal from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator SUPPORTED_FUNCTION_CODES = {1, 2, 3, 4} SUPPORTED_POINT_TYPES = { "bool", "int16", "uint16", "int32", "uint32", "int64", "uint64", "float32", "float64", } class ModbusBaseRequest(BaseModel): model_config = ConfigDict(extra="forbid") device_type: Literal["ModbusTCP"] = "ModbusTCP" ip: str = Field(min_length=1) port: int = Field(ge=1, le=65535) word_byte_order: Literal["ABCD", "BADC", "CDAB", "DCBA"] = "ABCD" address_base: int = Field(default=0, ge=0) slave_id: int = Field(default=1, ge=0, le=247) @field_validator("ip") @classmethod def validate_ip(cls, value: str) -> str: try: ip_address(value) except ValueError as exc: raise ValueError("ip must be a valid IP address") from exc return value class ModbusReadSpec(BaseModel): model_config = ConfigDict(extra="forbid") function_code: int address: int = Field(ge=0) quantity: int = Field(ge=1, le=125) @field_validator("function_code") @classmethod def validate_function_code(cls, value: int) -> int: if value not in SUPPORTED_FUNCTION_CODES: raise ValueError("function_code must be one of 1, 2, 3, 4") return value class ModbusRawReadRequest(ModbusBaseRequest): read: ModbusReadSpec class ModbusPointSpec(BaseModel): model_config = ConfigDict(extra="forbid") function_code: int address: int = Field(ge=0) type: str bit: int | None = Field(default=None, ge=0, le=15) @field_validator("function_code") @classmethod def validate_function_code(cls, value: int) -> int: if value not in SUPPORTED_FUNCTION_CODES: raise ValueError("function_code must be one of 1, 2, 3, 4") return value @field_validator("type") @classmethod def validate_type(cls, value: str) -> str: normalized = value.lower() if normalized not in SUPPORTED_POINT_TYPES: raise ValueError("type must be one of bool, int16, uint16, int32, uint32, int64, uint64, float32, float64") return normalized @model_validator(mode="after") def validate_point(self) -> "ModbusPointSpec": if self.function_code in {1, 2} and self.type != "bool": raise ValueError("function_code 1 and 2 only support bool points") if self.function_code in {1, 2} and self.bit is not None: raise ValueError("bit is only supported for register points") return self class ModbusPointReadRequest(ModbusBaseRequest): points: list[ModbusPointSpec] = Field(min_length=1)