testframework.utils.modbus_utils module

Provides functions to read and write Modbus devices with pymodbus

class testframework.utils.modbus_utils.DataType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: str, Enum

The β€œraw” type of data in a register (or sequence of registers).

Can be further refined using the transform on the register (sequence).

RAW = 'RAW'
S16 = 'S16'
S32 = 'S32'
STRING = 'STRING'
U16 = 'U16'
U32 = 'U32'
exception testframework.utils.modbus_utils.DecodeError

Bases: Exception

Decoding failed.

exception testframework.utils.modbus_utils.EncodeError

Bases: Exception

Encoding failed.

class testframework.utils.modbus_utils.ModbusFunctionCodesEnum(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

Enum for defining different types of access

READ_COILS = 1
READ_DISCRETE_INPUT = 2
READ_HOLDING_REGISTER = 3
READ_INPUT_REGISTER = 4
UNDEFINED = 0
WRITE_MULTIPLE_COILS = 15
WRITE_MULTIPLE_HOLDING_REGISTERS = 16
WRITE_SINGLE_COIL = 5
WRITE_SINGLE_HOLDING_REGISTER = 6
class testframework.utils.modbus_utils.RegisterDefinition(address: int, quantity: int, data_type: DataType, unit_type: UnitType, writeable: bool = False, readable: bool = True)

Bases: object

Base class for register definitions.

address: int

β€œThe start address for the modbus definition

data_type: DataType

β€œThe data type for this register

quantity: int

β€œThe quantity of addresses to use for this register

readable: bool = True

β€œWeather the register is readable

unit_type: UnitType

β€œThe data type for this register

writeable: bool = False

β€œWeather the register is writable

exception testframework.utils.modbus_utils.SlaveFailureException

Bases: Exception

Possibly fatal exception while trying to read from device

exception testframework.utils.modbus_utils.WriteException(*args, modbus_exception_code: int | None = None, **kwargs)

Bases: Exception

Exception writing register to device

testframework.utils.modbus_utils.convert_string_to_registers(string_to_convert: str, byteorder: Endian) List[int]

Converts a string into a list of Modbus register values using ASCII encoding. Each register contains two characters. Can handle both little-endian and big-endian.

Parameters:
  • string_to_convert – The string to convert.

  • big_endian – Boolean indicating if big-endian encoding should be used.

Returns:

A list of integers for Modbus registers.

testframework.utils.modbus_utils.create_modbus_client(connection_type: CommType, args: Dict[str, Any]) ModbusTcpClient | AsyncModbusSerialClient

create Modbus Client

testframework.utils.modbus_utils.decode_modbus_response(response: ModbusResponse, register_def: RegisterDefinition, byteorder: Endian, wordorder: Endian) int | str | Tuple[int, ...] | bool

Take a raw modbus response and decode it

testframework.utils.modbus_utils.get_decoding_function(register_data_type: DataType, decoder: BinaryPayloadDecoder) Callable

Finds the correct decoding function based on the unit type

testframework.utils.modbus_utils.get_result_from_client(client: ModbusBaseSyncClient, byte_order: Endian, word_order: Endian, node_address: int, function_code, write_value: None | List[int] | int | str, register_definition)

gets result from client based on function code

testframework.utils.modbus_utils.int_to_modbus_registers(value, byteorder: Endian) List[int]

Splits a large integer into 16-bit chunks for Modbus registers.

Parameters:
  • value – The large integer to split.

  • big_endian – If True, split in big-endian order, else little-endian.

Returns:

A list of integers, each fitting in a 16-bit Modbus register.

testframework.utils.modbus_utils.read_coils(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int) bool

Read coil from client

testframework.utils.modbus_utils.read_discrete_inputs(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int) bool

Read discrete input from client

async testframework.utils.modbus_utils.read_holding_register_multiple_values(client: AsyncModbusSerialClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int) int | str | Tuple[int, ...] | float

Read mulitple values from holding registers

testframework.utils.modbus_utils.read_holding_register_value(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int) int | str | Tuple[int, ...] | float

Read single value from holding register

testframework.utils.modbus_utils.read_input_registers(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int) int | str | Tuple[int, ...] | float

Read input register from client

testframework.utils.modbus_utils.write_multiple_coils(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int, values: List[bool]) int | str | Tuple[int, ...]

write multiple coil to client

testframework.utils.modbus_utils.write_multiple_holding_registers(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, node_address: int, values: List[int]) ModbusResponse

write multiple holding registers to client

testframework.utils.modbus_utils.write_single_coil(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, byteorder: Endian, wordorder: Endian, node_address: int, value: bool) int | str | Tuple[int, ...]

write single coil to client

testframework.utils.modbus_utils.write_single_holding_register(client: ModbusBaseSyncClient, register_definition: RegisterDefinition, node_address: int, value: int) ModbusResponse

write single holding register to client