import asyncio
import logging
from collections.abc import Coroutine, Callable
import grpc
from .config import ModbusConfig, ModbusType, ManyModbusConfig
from ...models.generated.modbus import modbus_iface_pb2, modbus_iface_pb2_grpc
from ..grpc_interface import GRPCInterface
from ...utils import call_maybe_async
from ...cli.decorators import command as cli_command
from ...config import Schema
log = logging.getLogger(__name__)
ReadRegisterSubscriptionCallback = (
Callable[[list[int]], None] | Coroutine[[list[int]], None]
)
def two_words_to_32bit_float(word1: int, word2: int, swap: bool = False):
"""Convert two 16-bit words to a 32-bit float."""
if swap:
word1, word2 = word2, word1
return word1 + (word2 * 65536)
[docs]
class ModbusInterface(GRPCInterface):
"""ModbusInterface is a gRPC interface for interacting with modbus devices.
It allows for opening and closing modbus buses, reading and writing registers, and subscribing to register updates.
It is designed to be used with the modbus_iface gRPC service.
Attributes
----------
config : Schema
Configuration schema for the modbus interface, containing modbus bus definitions.
This is loaded from application config automatically and should be specified in your `app_config.py` file.
"""
stub = modbus_iface_pb2_grpc.modbusIfaceStub
def __init__(
self,
app_key: str,
modbus_uri: str = "127.0.0.1:50054",
service_name: str = "doover.ModbusInterface",
timeout: int = 7,
config: Schema = None,
):
super().__init__(app_key, modbus_uri, service_name, timeout)
self.subscription_tasks = []
self._setup_task = None
self.config = config
self.config_complete = False
async def setup(self):
self.config_complete = False
try:
config = self.config.modbus_config
except AttributeError:
log.info("No modbus interfaces defined in config")
self.config_complete = True
return
if isinstance(config, ModbusConfig):
elems: list[ModbusConfig] = [config]
elif isinstance(config, ManyModbusConfig):
elems = list(config.elements)
else:
log.warning(f"Unsupported modbus config type: {type(config)}")
self.config_complete = True
return
for elem in elems:
log.info(f"Setting up modbus bus: {elem}")
match ModbusType(elem.type.value):
case ModbusType.SERIAL:
await self.open_bus(
elem.type.value,
elem.name.value,
elem.serial_port.value,
elem.serial_baud.value,
elem.serial_method.value,
elem.serial_bits.value,
elem.serial_parity.value,
elem.serial_stop.value,
elem.serial_timeout.value,
)
case ModbusType.TCP:
await self.open_bus(
elem.type.value,
elem.name.value,
tcp_uri=elem.tcp_uri.value,
tcp_timeout=elem.tcp_timeout.value,
)
case default:
log.warning(
f"Invalid bus type: {default}. Expected 'serial' or 'tcp'."
)
self.config_complete = True
def process_response(self, stub_call: str, response, *args, **kwargs):
# On a successful (or empty) response defer to the base implementation,
# which returns the response or raises for an empty one.
if response is None or response.response_header.success:
return super().process_response(stub_call, response, *args, **kwargs)
# Response was not successful. For read/write register calls the bus may
# have been dropped (e.g. the modbus container was restarted mid-session),
# so try to recreate it rather than raising. This recovers the bus for the
# next call, not the current one.
#
# NOTE: this must run *instead of* super().process_response() for these
# calls — the base raises on a failed response, which would otherwise
# prevent us from ever reconfiguring the bus.
try:
configure_bus = kwargs["configure_bus"]
bus_id = kwargs["bus_id"]
except KeyError:
# not a bus-bound call (e.g. openBus, busStatus) — use the base's
# raising behaviour.
return super().process_response(stub_call, response, *args, **kwargs)
self.ensure_bus_available(bus_id, response.response_header, configure_bus)
return response
def ensure_bus_available(self, bus_id, response_header, configure: bool = True):
## if not config_complete, a setup() is already in progress — wait for it
if not self.config_complete:
log.debug("Modbus setup in progress, skipping reconfigure")
return False
## if the bus is present and open there is nothing to do
for b in response_header.bus_status:
if b.bus_id == bus_id:
if b.open:
return True
break
## bus is missing or closed (e.g. the modbus container was restarted) —
## recreate it. Guard against spawning overlapping setup tasks and keep a
## reference so the task isn't garbage collected before it completes.
log.warning(f"Bus {bus_id} not available, reconfiguring modbus interface")
if configure and (self._setup_task is None or self._setup_task.done()):
self._setup_task = asyncio.create_task(self.setup())
return False
async def close(self):
log.info("Closing modbus interface")
for task in self.subscription_tasks:
task.cancel()
@staticmethod
def _get_bus_request(
bus_type="serial",
name="default",
serial_port="/dev/ttyS0",
serial_baud=9600,
serial_method="rtu",
serial_bits=8,
serial_parity="N",
serial_stop=1,
serial_timeout=0.3,
tcp_uri="127.0.0.1:5000",
tcp_timeout=2,
):
if bus_type not in ("serial", "tcp"):
log.error("Invalid bus type: " + str(bus_type))
return None
kwargs = {"bus_id": str(name)}
if bus_type == "serial":
kwargs["serial_settings"] = modbus_iface_pb2.serialBusSettings(
port=serial_port,
baud=serial_baud,
modbus_method=serial_method,
data_bits=serial_bits,
parity=serial_parity,
stop_bits=serial_stop,
timeout=serial_timeout,
)
elif bus_type == "tcp":
ip, port = tcp_uri.split(":")
kwargs["ethernet_settings"] = modbus_iface_pb2.ethernetBusSettings(
ip=ip, port=int(port), timeout=tcp_timeout
)
else:
log.error("Invalid bus type: " + str(bus_type))
return None
return modbus_iface_pb2.openBusRequest(**kwargs)
@cli_command()
async def open_bus(
self,
bus_type="serial",
name="default",
serial_port="/dev/ttyS0",
serial_baud=9600,
serial_method="rtu",
serial_bits=8,
serial_parity="N",
serial_stop=1,
serial_timeout=0.3,
tcp_uri="127.0.0.1:5000",
tcp_timeout=2,
) -> bool:
req = self._get_bus_request(
bus_type,
name,
serial_port,
serial_baud,
serial_method,
serial_bits,
serial_parity,
serial_stop,
serial_timeout,
tcp_uri,
tcp_timeout,
)
if req is None:
return False
resp = await self.make_request("openBus", req)
return resp.response_header.success
@cli_command()
async def close_bus(self, bus_id: str = "default") -> bool:
req = modbus_iface_pb2.closeBusRequest(bus_id=str(bus_id))
resp = await self.make_request("closeBus", req)
return resp.response_header.success and resp.bus_status.open
def _validate_read_register_resp(self, resp, bus_id, configure_bus):
try:
if not resp.response_header.success:
log.error("Error reading registers from bus " + str(bus_id))
return False
# return self.ensure_bus_availabe(bus_id, resp.response_header, configure_bus)
return True
except Exception as e:
log.error("Error validating read register response: " + str(e))
return False
[docs]
@cli_command()
async def fetch_bus_status(self, bus_id: str = "default") -> bool:
"""Get the status of a modbus bus.
Parameters
----------
bus_id : str, optional
The bus ID to fetch an OK status for
Returns
-------
bool
True if the bus is open, False otherwise.
"""
req = modbus_iface_pb2.busStatusRequest(bus_id=str(bus_id))
resp = await self.make_request("busStatus", req)
return resp.response_header.success and resp.bus_status.open
@staticmethod
def _parse_register_output(values):
if len(values) == 0:
return None
if len(values) == 1:
return values[0]
return values
[docs]
@cli_command()
async def read_registers(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
num_registers: int = 1,
register_type: int = 4,
configure_bus: bool = True,
) -> int | list[int] | None:
"""Read registers from a modbus bus.
Examples
--------
>>> self.modbus_iface.read_registers(bus_id="default", modbus_id=1, start_address=0, num_registers=10)
Parameters
----------
bus_id : str, optional
The bus ID to read registers from (default is "default")
modbus_id : int, optional
The modbus ID of the device to read registers from (default is 1)
start_address : int, optional
The starting address of the registers to read (default is 0)
num_registers : int, optional
The number of registers to read (default is 1)
register_type : int, optional
The type of registers to read (default is 4, which is typically holding registers)
configure_bus : bool, optional
Whether to configure the bus if it is not available (default is True)
Returns
-------
int | list[int] | None
The values read from the registers.
If only one register is read, returns an int.
If multiple registers are read, returns a list of ints.
If the response failed, returns None.
"""
req = modbus_iface_pb2.readRegisterRequest(
bus_id=str(bus_id),
modbus_id=modbus_id,
register_type=register_type,
address=start_address,
count=num_registers,
)
resp = await self.make_request(
"readRegisters", req, bus_id=bus_id, configure_bus=configure_bus
)
return resp and self._parse_register_output(resp.values)
[docs]
@cli_command()
async def write_registers(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
values: list[int] = None,
register_type: int = 4,
configure_bus: bool = True,
) -> bool:
"""Write values to registers on a modbus bus.
Examples
--------
>>> self.modbus_iface.write_registers(
... bus_id="my_bus",
... modbus_id=1,
... start_address=0,
... values=[100, 200, 300],
... register_type=4,
... configure_bus=True,
... )
Parameters
----------
bus_id: str
The bus ID to write registers to (default is "default")
modbus_id: int
The modbus ID of the device to write registers to (default is 1)
start_address: int
The starting address of the registers to write (default is 0)
values: list[int]
Register values to write
register_type: int
The type of registers to write (default is 4, which is typically holding registers)
configure_bus: bool
Whether to configure the bus if it is not available (default is True)
Returns
-------
bool
True if the write operation was successful, False otherwise.
"""
values = values or []
req = modbus_iface_pb2.writeRegisterRequest(
bus_id=str(bus_id),
modbus_id=modbus_id,
register_type=register_type,
address=start_address,
values=values,
)
resp = await self.make_request(
"writeRegisters", req, bus_id=bus_id, configure_bus=configure_bus
)
return resp and self._validate_read_register_resp(resp, bus_id, configure_bus)
[docs]
def add_read_register_subscription(
self,
bus_id: str = "default",
modbus_id: int = 1,
start_address: int = 0,
num_registers: int = 1,
register_type: int = 4,
poll_secs: int = 3,
callback: ReadRegisterSubscriptionCallback = None,
):
"""Add a subscription to read registers from a modbus bus.
This method creates a subcscription that will periodically read registers from the specified modbus device and
invoke the provided callback when a read request succeeds.
The provided callback can be a regular function or a coroutine.
Examples
--------
>>> def my_callback(values: list[int]):
... print("Received new register values:", values)
>>> self.modbus_iface.add_read_register_subscription(
... bus_id="my_bus",
... modbus_id=1,
... start_address=0,
... num_registers=10,
... callback=my_callback,
... )
Parameters
----------
bus_id : str, optional
The bus ID to read registers from (default is "default")
modbus_id : int, optional
The modbus ID of the device to read registers from (default is 1)
start_address : int, optional
The starting address of the registers to read (default is 0)
num_registers : int, optional
The number of registers to read (default is 1)
register_type : int, optional
The type of registers to read (default is 4, which is typically holding registers)
poll_secs : int, optional
The polling interval in seconds for the subscription (default is 3 seconds)
callback : Callback
The callback function to invoke when a read request succeeds.
This accepts a list of integers representing the register values.
If only one register is read, this will be a single integer.
This callback can be a regular function or a coroutine.
"""
if callback is None:
log.error("No callback provided for read register subscription")
return None
try:
new_task = asyncio.create_task(
self.run_read_register_subscription_task(
bus_id=str(bus_id),
modbus_id=modbus_id,
start_address=start_address,
num_registers=num_registers,
register_type=register_type,
poll_secs=poll_secs,
callback=callback,
)
)
self.subscription_tasks.append(new_task)
new_task.add_done_callback(self.subscription_tasks.remove)
return new_task
except Exception as e:
log.error("Error adding read register subscription: " + str(e))
return None
async def run_read_register_subscription_task(
self,
bus_id: str,
modbus_id: int,
start_address: int,
num_registers: int,
register_type: int,
poll_secs: int,
callback: ReadRegisterSubscriptionCallback,
configure_bus: bool = True,
):
try:
async with grpc.aio.insecure_channel(self.uri) as channel:
stub = modbus_iface_pb2_grpc.modbusIfaceStub(channel)
request = modbus_iface_pb2.readRegisterSubscriptionRequest(
bus_id=str(bus_id),
modbus_id=modbus_id,
register_type=register_type,
address=start_address,
count=num_registers,
poll_secs=poll_secs,
)
try:
async for response in stub.readRegisterSubscription(request):
success = response.response_header.success
if not self._validate_read_register_resp(
response, bus_id, configure_bus
):
values = None
elif len(response.values) == 1:
values = response.values[0]
else:
values = response.values
log.debug(
f"Received new modbus subscription result on bus {bus_id}, for modbus_id {modbus_id}, result={success}"
)
if callback is not None:
await call_maybe_async(callback, values)
except Exception as e:
log.error("Error in read register subscription task: " + str(e))
return None
except Exception as e:
log.error("Error in read register subscription task: " + str(e))
return None
[docs]
@cli_command()
async def test_comms(self, message: str = "Comms Check Message") -> str | None:
"""Test connection by sending a basic echo response to modbus interface container.
Parameters
----------
message : str
Message to send to modbus interface to have echo'd as a response
Returns
-------
str
The response from modbus interface.
"""
return await self.make_request(
"testComms",
modbus_iface_pb2.testCommsRequest(message=message),
response_field="response",
)
modbus_iface = ModbusInterface