Modbus Interface

class pydoover.docker.ModbusInterface(app_key: str, modbus_uri: str = '127.0.0.1:50054', service_name: str = 'doover.ModbusInterface', timeout: int = 7, config: Schema = None)[source]

ModbusInterface is a gRPC interface for interacting with modbus devices.

It allows for opening and closing modbus buses, reading and writing registers, and subscribing to register updates. It is designed to be used with the modbus_iface gRPC service.

config

Configuration schema for the modbus interface, containing modbus bus definitions. This is loaded from application config automatically and should be specified in your app_config.py file.

Type:

Schema

add_read_register_subscription(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, num_registers: int = 1, register_type: int = 4, poll_secs: int = 3, callback: Callable[[list[int]], None] | Coroutine[[list[int]], None] = None)[source]

Add a subscription to read registers from a modbus bus.

This method creates a subcscription that will periodically read registers from the specified modbus device and invoke the provided callback when a read request succeeds.

The provided callback can be a regular function or a coroutine.

Examples

>>> def my_callback(values: list[int]):
...     print("Received new register values:", values)
>>> self.modbus_iface.add_read_register_subscription(
...     bus_id="my_bus",
...     modbus_id=1,
...     start_address=0,
...     num_registers=10,
...     callback=my_callback,
... )
Parameters:
  • bus_id (str, optional) – The bus ID to read registers from (default is “default”)

  • modbus_id (int, optional) – The modbus ID of the device to read registers from (default is 1)

  • start_address (int, optional) – The starting address of the registers to read (default is 0)

  • num_registers (int, optional) – The number of registers to read (default is 1)

  • register_type (int, optional) – The type of registers to read (default is 4, which is typically holding registers)

  • poll_secs (int, optional) – The polling interval in seconds for the subscription (default is 3 seconds)

  • callback (Callback) – The callback function to invoke when a read request succeeds. This accepts a list of integers representing the register values. If only one register is read, this will be a single integer. This callback can be a regular function or a coroutine.

fetch_bus_status(bus_id: str = 'default') bool[source]

Get the status of a modbus bus.

Parameters:

bus_id (str, optional) – The bus ID to fetch an OK status for

Returns:

True if the bus is open, False otherwise.

Return type:

bool

read_registers(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, num_registers: int = 1, register_type: int = 4, configure_bus: bool = True) int | list[int] | None[source]

Read registers from a modbus bus.

Examples

>>> self.modbus_iface.read_registers(bus_id="default", modbus_id=1, start_address=0, num_registers=10)
Parameters:
  • bus_id (str, optional) – The bus ID to read registers from (default is “default”)

  • modbus_id (int, optional) – The modbus ID of the device to read registers from (default is 1)

  • start_address (int, optional) – The starting address of the registers to read (default is 0)

  • num_registers (int, optional) – The number of registers to read (default is 1)

  • register_type (int, optional) – The type of registers to read (default is 4, which is typically holding registers)

  • configure_bus (bool, optional) – Whether to configure the bus if it is not available (default is True)

Returns:

The values read from the registers. If only one register is read, returns an int. If multiple registers are read, returns a list of ints. If the response failed, returns None.

Return type:

int | list[int] | None

stub

alias of modbusIfaceStub

test_comms(message: str = 'Comms Check Message') str | None[source]

Test connection by sending a basic echo response to modbus interface container.

Parameters:

message (str) – Message to send to modbus interface to have echo’d as a response

Returns:

The response from modbus interface.

Return type:

str

write_registers(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, values: list[int] = None, register_type: int = 4, configure_bus: bool = True) bool[source]

Write values to registers on a modbus bus.

Examples

>>> self.modbus_iface.write_registers(
...     bus_id="my_bus",
...     modbus_id=1,
...     start_address=0,
...     values=[100, 200, 300],
...     register_type=4,
...     configure_bus=True,
... )
Parameters:
  • bus_id (str) – The bus ID to write registers to (default is “default”)

  • modbus_id (int) – The modbus ID of the device to write registers to (default is 1)

  • start_address (int) – The starting address of the registers to write (default is 0)

  • values (list[int]) – Register values to write

  • register_type (int) – The type of registers to write (default is 4, which is typically holding registers)

  • configure_bus (bool) – Whether to configure the bus if it is not available (default is True)

Returns:

True if the write operation was successful, False otherwise.

Return type:

bool

class pydoover.docker.modbus.ModbusConfig(display_name: str = 'Modbus Config')[source]
class pydoover.docker.modbus.ManyModbusConfig(display_name: str = 'Modbus Config')[source]