Modbus Interface
- class pydoover.docker.ModbusInterface(app_key: str, modbus_uri: str = '127.0.0.1:50054', service_name: str = 'doover.ModbusInterface', timeout: int = 7, config: Schema = None)[source]
ModbusInterface is a gRPC interface for interacting with modbus devices.
It allows for opening and closing modbus buses, reading and writing registers, and subscribing to register updates. It is designed to be used with the modbus_iface gRPC service.
- config
Configuration schema for the modbus interface, containing modbus bus definitions. This is loaded from application config automatically and should be specified in your app_config.py file.
- Type:
- add_read_register_subscription(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, num_registers: int = 1, register_type: int = 4, poll_secs: int = 3, callback: Callable[[list[int]], None] | Coroutine[[list[int]], None] = None)[source]
Add a subscription to read registers from a modbus bus.
This method creates a subcscription that will periodically read registers from the specified modbus device and invoke the provided callback when a read request succeeds.
The provided callback can be a regular function or a coroutine.
Examples
>>> def my_callback(values: list[int]): ... print("Received new register values:", values) >>> self.modbus_iface.add_read_register_subscription( ... bus_id="my_bus", ... modbus_id=1, ... start_address=0, ... num_registers=10, ... callback=my_callback, ... )
- Parameters:
bus_id (str, optional) – The bus ID to read registers from (default is “default”)
modbus_id (int, optional) – The modbus ID of the device to read registers from (default is 1)
start_address (int, optional) – The starting address of the registers to read (default is 0)
num_registers (int, optional) – The number of registers to read (default is 1)
register_type (int, optional) – The type of registers to read (default is 4, which is typically holding registers)
poll_secs (int, optional) – The polling interval in seconds for the subscription (default is 3 seconds)
callback (Callback) – The callback function to invoke when a read request succeeds. This accepts a list of integers representing the register values. If only one register is read, this will be a single integer. This callback can be a regular function or a coroutine.
- fetch_bus_status(bus_id: str = 'default') bool[source]
Get the status of a modbus bus.
- Parameters:
bus_id (str, optional) – The bus ID to fetch an OK status for
- Returns:
True if the bus is open, False otherwise.
- Return type:
bool
- read_registers(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, num_registers: int = 1, register_type: int = 4, configure_bus: bool = True) int | list[int] | None[source]
Read registers from a modbus bus.
Examples
>>> self.modbus_iface.read_registers(bus_id="default", modbus_id=1, start_address=0, num_registers=10)
- Parameters:
bus_id (str, optional) – The bus ID to read registers from (default is “default”)
modbus_id (int, optional) – The modbus ID of the device to read registers from (default is 1)
start_address (int, optional) – The starting address of the registers to read (default is 0)
num_registers (int, optional) – The number of registers to read (default is 1)
register_type (int, optional) – The type of registers to read (default is 4, which is typically holding registers)
configure_bus (bool, optional) – Whether to configure the bus if it is not available (default is True)
- Returns:
The values read from the registers. If only one register is read, returns an int. If multiple registers are read, returns a list of ints. If the response failed, returns None.
- Return type:
int | list[int] | None
- stub
alias of
modbusIfaceStub
- test_comms(message: str = 'Comms Check Message') str | None[source]
Test connection by sending a basic echo response to modbus interface container.
- Parameters:
message (str) – Message to send to modbus interface to have echo’d as a response
- Returns:
The response from modbus interface.
- Return type:
str
- write_registers(bus_id: str = 'default', modbus_id: int = 1, start_address: int = 0, values: list[int] = None, register_type: int = 4, configure_bus: bool = True) bool[source]
Write values to registers on a modbus bus.
Examples
>>> self.modbus_iface.write_registers( ... bus_id="my_bus", ... modbus_id=1, ... start_address=0, ... values=[100, 200, 300], ... register_type=4, ... configure_bus=True, ... )
- Parameters:
bus_id (str) – The bus ID to write registers to (default is “default”)
modbus_id (int) – The modbus ID of the device to write registers to (default is 1)
start_address (int) – The starting address of the registers to write (default is 0)
values (list[int]) – Register values to write
register_type (int) – The type of registers to write (default is 4, which is typically holding registers)
configure_bus (bool) – Whether to configure the bus if it is not available (default is True)
- Returns:
True if the write operation was successful, False otherwise.
- Return type:
bool