Source code for pydoover.docker.platform.platform

import asyncio
import logging
import time
import json

from collections.abc import Iterable, Coroutine, Callable

import grpc

from ...models.generated.platform import platform_iface_pb2, platform_iface_pb2_grpc
from .platform_types import Location, Event
from ..grpc_interface import GRPCInterface
from ...utils import call_maybe_async, deprecated
from ...cli.decorators import command as cli_command

log = logging.getLogger(__name__)
PulseCounterCallback = (
    Callable[[int, bool, int, int, str], None]
    | Coroutine[[int, bool, int, int, str], None]
)


class PulseCounter:
    """PulseCounter object for counting pulses on a digital input pin.

    This is generally not created manually, but rather through the PlatformInterface's `get_new_pulse_counter` method.

    Attributes
    ----------
    platform_iface : PlatformInterface
        The platform interface to use for communication.
    pin : int
        The digital input pin number to listen for pulses on.
    edge : str
        The edge to listen for pulses on. Can be "rising", "falling", or "both".
    callback : Callable[[int, bool, int, int, str], Any]
        A callback function to call when a pulse is received. The function should take the following arguments:
            - pin: int - The pin number the pulse was received on.
            - di_value: bool - The value of the digital input pin (1 for high, 0 for low).
            - dt_secs: int - The time since the last pulse in seconds.
            - count: int - The total number of pulses received.
            - edge: str - The edge that the pulse was received on ("rising" or "falling").
        This function can be synchronous or asynchronous.
    rate_window_secs : int
        The size of the window in seconds for which the rate of pulses is calculated.
    count : int
        The total number of pulses received.
    pulse_timestamps : list[float]
        A list of timestamps for each pulse received.
    receiving_pulses : bool
        Whether the PulseCounter is currently receiving pulses.

    """

    def __init__(
        self,
        plt_iface: "PlatformInterface",
        pin: int,
        edge: str = "rising",
        callback: PulseCounterCallback = None,
        rate_window_secs: int = 60,
        auto_start: bool = True,
    ):
        self.platform_iface = plt_iface
        self.pin = pin
        self.edge = edge
        self.callback = callback
        self.rate_window_secs = rate_window_secs
        self.count = 0

        self.start_time = time.time()
        self.pulse_grace_period = (
            0.2  # Need to ignore pulses for a short period after starting
        )
        self.pulse_timestamps = []

        self.receiving_pulses = False
        self.receiving_events = False

        if auto_start:
            self.start_listener_pulses()

    @deprecated("Replaced with PulseCounter.start_listener_pulses")
    def start_listener(self):
        self.start_listener_pulses()

    def start_listener_pulses(self):
        """Start listening for pulses on the digital input pin."""
        # Check each pulse counter is only used for a single type of pulse
        if self.receiving_events:
            log.error("Using a pulse counter for both pulses and offline events")
            return
        self.receiving_pulses = True

        self.start_time = time.time()
        self.platform_iface.start_di_pulse_listener(
            self.pin, self.receive_pulse, edge=self.edge, start_count=self.count
        )

    def update_events(self):
        """Listen for offline events for a digital input pin.

        This is **not** compatible with a PulseCounter that is already receiving pulses.
        """
        # Check each pulse counter is only used for a single type of pulse
        if self.receiving_pulses:
            log.error("Using a pulse counter for both pulses and offline events")
            return
        self.receiving_events = True
        self.receive_events(
            self.platform_iface.fetch_di_events(
                self.pin, self.edge, include_system_events=True
            )
        )

    def add_existing_events(self, time_stamps):
        # Check each pulse counter is only used for a single type of pulse
        if self.receiving_pulses:
            log.error("Using a pulse counter for both pulses and offline events")
            return
        self.receiving_events = True

        self.pulse_timestamps += time_stamps
        self.count = len(self.pulse_timestamps)

    async def receive_pulse(self, di, di_value, dt_secs, counter, edge):
        """Receive active pulses on the digital input pin.

        This is **not** compatible with a PulseCounter that is already receiving events.
        """
        # Check each pulse counter is only used for a single type of pulse
        if self.receiving_events:
            log.error("Using a pulse counter for both pulses and offline events")
            return
        self.receiving_pulses = True

        if time.time() - self.start_time < self.pulse_grace_period:
            log.info(f"Ignoring pulse on di={di} with dt={dt_secs}s")
            return

        log.debug(f"Received pulse on di={di} with dt={dt_secs}s")
        self.count += 1
        self.pulse_timestamps += [time.time()]
        if self.callback is not None:
            await call_maybe_async(
                self.callback, self.pin, di_value, dt_secs, self.count, edge
            )

    def receive_events(self, events):
        # Check each pulse counter is only used for a single type of pulse
        if self.receiving_pulses:
            log.error("Using a pulse counter for both pulses and offline events")
            return
        self.receiving_events = True

        for event in events:
            edge = ""
            di_value = 0
            if event.event == "DI_R":
                di_value = 1
                edge = "rising"
            elif event.event == "DI_F":
                edge = "falling"
            elif event.event == "VI":
                event = "VI"
            else:  # Could be a system event
                self.handle_system_event(event)
                continue

            timestamp = event.time / 1000 or time.time()
            dt_secs = 0
            if len(self.pulse_timestamps) > 0:
                if timestamp <= self.pulse_timestamps[-1] + 0.01:
                    log.warning(
                        f"Ignoring old event on di={event.pin} t={timestamp} latest event: {self.pulse_timestamps[-1]}"
                    )
                    continue
                dt_secs = timestamp - self.pulse_timestamps[-1]
            log.info(f"Received event on di={event.pin} with t={dt_secs}s")
            self.count += 1
            self.pulse_timestamps += [timestamp]
            if self.callback is not None:
                self.callback(self.pin, di_value, dt_secs, timestamp, self.count, edge)

    def handle_system_event(self, event: str):
        """This is called when a system event occurs. You can override this to handle system events as needed."""

    @deprecated("Use get_pulses_in_window to not damage record of pulses/events")
    def clean_pulse_timestamps(self):
        if len(self.pulse_timestamps) == 0:
            return

        ## Remove timestamps older than the rate window
        while (
            len(self.pulse_timestamps) > 0
            and self.pulse_timestamps[0] < time.time() - self.rate_window_secs
        ):
            self.pulse_timestamps.pop(0)

    def get_pulses_in_window(self):
        if len(self.pulse_timestamps) == 0:
            return []

        pulses = []
        ## Remove timestamps older than the rate window
        for timestamp in self.pulse_timestamps:
            if timestamp > self.pulse_timestamps[-1] - self.rate_window_secs:
                pulses.append(timestamp)
        return pulses

    def set_rate_window(self, rate_window_secs):
        self.rate_window_secs = rate_window_secs

    def get_rate_window(self):
        return self.rate_window_secs

    def get_pulses_per_minute(self):
        pulses = self.get_pulses_in_window()
        return len(pulses) * 60 / self.rate_window_secs

    def set_counter(self, counter):
        self.count = counter

    def get_counter(self):
        return self.count


[docs] class PlatformInterface(GRPCInterface): """Docker interface for interacting with the platform interface container. This interface allows you to interact with the platform interface gRPC service, providing access to device IO. Some implementations are platform-specific, and it is your responsibility to ensure that all hardware that your application is compatible with implements the methods you are trying to fetch. Most methods will return `None` if they are not supported or you pass a bad input. An example of bad input is requesting Digital Input #10 on a Doovit that only supports 4. """ stub = platform_iface_pb2_grpc.platformIfaceStub def __init__( self, app_key: str, plt_uri: str = "localhost:50053", service_name: str = "doover.PlatformInterface", ): super().__init__(app_key, plt_uri, service_name) self.pulse_counter_listeners = [] async def close(self): log.info("Closing platform interface...") for listener in self.pulse_counter_listeners: listener.cancel() def process_response(self, stub_call: str, response, **kwargs): response = super().process_response(stub_call, response, **kwargs) try: response_field = kwargs.pop("response_field") except KeyError: return response res = getattr(response, response_field, None) if isinstance(res, Iterable) and not isinstance( res, str ): # don't iterate over strings res = list(res) if isinstance(res, list) and len(res) == 1: return res[0] return res
[docs] def get_new_pulse_counter( self, di: int, edge: str = "rising", callback: PulseCounterCallback = None, rate_window_secs: int = 20, auto_start: bool = True, ) -> PulseCounter: """Create a new Pulse Counter for counting pulses on a digital input pin. Examples -------- Basic pulse counter:: def pulse_callback(di, di_value, dt_secs, count, edge): print(f"Pulse on di={di} with value={di_value}, dt={dt_secs}s, count={count}, edge={edge}") counter = self.platform_interface.get_new_pulse_counter(0, "rising", callback=pulse_callback) Parameters ---------- di: int Digital input pin to listen for pulses on. edge: "rising" or "falling" or "both" The edge to listen for pulses on. callback : Callable Callback function to call when a pulse is received. The function should take the following arguments - di: int - The pin number the pulse was received on. - di_value: bool - The value of the digital input pin (1 for high, 0 for low). - dt_secs: int - The time since the last pulse in seconds. - count: int - The total number of pulses received. - edge: str - The edge that the pulse was received on ("rising" or "falling"). The callback can be synchronous or asynchronous. rate_window_secs: int The size of window for which the rate of pulses is calculated. Default is 20. auto_start: bool Whether to automatically start listening for pulses. Default is True. """ return PulseCounter( self, di, edge=edge, callback=callback, rate_window_secs=rate_window_secs, auto_start=auto_start, )
[docs] def get_new_event_counter( self, di: int, edge: str = "rising", callback: PulseCounterCallback = None, rate_window_secs: int = 20, auto_collect: bool = True, ) -> PulseCounter: """Create a new Pulse Counter for counting events. Examples -------- Basic event counter:: counter = self.platform_interface.get_new_event_counter(0, "rising") print(counter.get_counter()) print(counter.get_pulses_per_minute()) Parameters ---------- di : int Pin number to check events for. edge : "rising" or "falling" or "both" The edge to listen to evenets on. callback : PulseCounterCallback Callback called when an event is processed. rate_window_secs : int = 20 The size of window for which the rate of events is calculated. auto_collect : bool = True Whether to automatically collect the events from the platform interface. Returns ------- PulseCounter The pulse counter object for the given pin. """ counter = PulseCounter( self, di, edge=edge, callback=callback, rate_window_secs=rate_window_secs, auto_start=False, ) if auto_collect: counter.update_events() return counter
def start_di_pulse_listener( self, di: int, callback, edge: str = "rising", start_count: int = 0 ): ## Callback should be a function that takes the following arguments: ## di, di_value, dt_secs, counter, edge listener = asyncio.create_task( self.recv_di_pulses(di, callback, edge=edge, start_count=start_count) ) self.pulse_counter_listeners.append(listener) listener.add_done_callback(self.pulse_counter_listeners.remove) async def recv_di_pulses( self, di: int, callback, edge: str = "rising", start_count: int = 0 ): counter = start_count active_callbacks = set() while True: try: # Setup the connection to the platform interface async with grpc.aio.insecure_channel(self.uri) as channel: channel_stream = platform_iface_pb2_grpc.platformIfaceStub( channel ).startPulseCounter( platform_iface_pb2.pulseCounterRequest(di=di, edge=edge) ) while True: response: platform_iface_pb2.pulseCounterResponse = ( await channel_stream.read() ) if response is None or response == grpc.aio.EOF: log.info(f"pulseCounter for di={di} ended.") break log.debug(f"Received response from pulseCounter for di={di}") if ( hasattr(response, "dt_secs") and response.dt_secs is not None and response.dt_secs > 0 ): ## Increment the counter counter += 1 ## Call the callback function with the response task = await call_maybe_async( callback, di, response.value, response.dt_secs, counter, edge, as_task=True, ) if task: active_callbacks.add(task) task.add_done_callback(active_callbacks.remove) except asyncio.CancelledError: log.info(f"pulseCounter for di={di} cancelled.") break except StopAsyncIteration: log.info(f"pulseCounter for di={di} ended.") break except Exception as e: log.error(f"Error receiving pulse for di={di}: {e}", exc_info=e) # await asyncio.sleep(1) ## Loop again await asyncio.sleep(1) ## Wait for all active callbacks to finish while active_callbacks: await asyncio.wait(active_callbacks, timeout=1) @staticmethod def _cast_pins(pins): if isinstance(pins, int): return [pins] if isinstance(pins, float) and pins.is_integer(): return [int(pins)] if not isinstance(pins, Iterable): raise ValueError("Pins must be iterable or integer.") result = [] for p in pins: result.extend(PlatformInterface._cast_pins(p)) return result @staticmethod def _cast_values(values): if isinstance(values, (bool, int)): return [bool(values)] if not isinstance(values, Iterable): raise ValueError("Values must be iterable, bool or integer.") result = [] for p in values: result.extend(PlatformInterface._cast_values(p)) return result @staticmethod def _cast_ao_values(values): if isinstance(values, int): return [float(values)] elif isinstance(values, float): return [values] elif isinstance(values, list): return [float(v) for v in values] else: raise ValueError( f"Invalid type for values: {type(values)}. Must be float or list." ) def _cast_ao_pin_values(self, pins, values): pins = self._cast_pins(pins) values = self._cast_ao_values(values) if len(pins) != len(values): if len(values) == 1: values = [values[0]] * len(pins) else: raise ValueError( "Analogue output and value lists are not the same length." ) return pins, values def _cast_pin_values(self, pins, values): pins = self._cast_pins(pins) values = self._cast_values(values) if len(pins) != len(values): if len(values) == 1: values = [values[0]] * len(pins) else: raise ValueError( "Digital output and value lists are not the same length." ) return pins, values
[docs] @cli_command() async def test_comms(self, message: str = "Comms Check Message") -> str | None: """Test connection by sending a basic echo response to platform interface container. Parameters ---------- message : str Message to send to platform interface to have echo'd as a response Returns ------- str The response from platform interface. """ return await self.make_request( "TestComms", platform_iface_pb2.TestCommsRequest(message=message), response_field="response", )
[docs] @cli_command() async def fetch_di(self, *di: int) -> bool | list[bool]: """Get digital input values. Examples -------- Get a single digital input pin value:: pin1 = await self.platform_iface.fetch_di(1) Get digital input pins 1, 2 and 3 in the same transaction:: pin1, pin2, pin3 = await self.platform_iface.fetch_di(1, 2, 3) Parameters ---------- *di Pin numbers to get the values of. Can be one or more integers. Returns ------- bool | list[bool] Returns one or more booleans where True means the pin is high (1) and False means the pin is low (0). If you requested one pin, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed. """ pins = self._cast_pins(di) return await self.make_request( "getDI", platform_iface_pb2.getDIRequest(di=pins), response_field="di" )
[docs] @cli_command() async def fetch_ai(self, *ai: int) -> float | list[float]: """Get analogue input values. Examples -------- Get a single analogue input pin value:: pin1 = await self.platform_iface.fetch_ai(1) Get analogue input pins 1, 2 and 3 in the same transaction:: pin1, pin2, pin3 = await self.platform_iface.fetch_ai(1, 2, 3) Parameters ---------- *ai Pin numbers to get the values of. Can be one or more integer pin mumber. Returns ------- float | list[float] If you requested one analog input, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed. """ # Above section is to facilitate the following: # fetch_ai(1) # fetch_ai([1,4,2]) # Proposal: fetch_ai(*pins) # allows for fetch_ai(1, 2, 3) or fetch_ai(1) or fetch_ai(*[1, 2, 3]) pins = self._cast_pins(ai) return await self.make_request( "getAI", platform_iface_pb2.getAIRequest(ai=pins), response_field="ai" )
[docs] @cli_command() async def fetch_do(self, *do: int) -> list[bool] | None: """Get digital output values. Examples -------- Get a single digital output pin value:: await self.platform_iface.fetch_do(1) Get digital output pins 1, 2 and 3 in the same transaction:: await self.platform_iface.fetch_do(1, 2, 3) Parameters ---------- *do Pin numbers to get the values of. Can be one or more integers. Returns ------- bool | list[bool] If you requested one, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed. """ pins = self._cast_pins(do) return await self.make_request( "getDO", platform_iface_pb2.getDORequest(do=pins), response_field="do" )
[docs] @cli_command() async def set_do( self, do: int | list[int], value: int | list[int] ) -> list[bool] | None: """Set digital output values. Examples -------- Set a single digital output pin value:: await self.platform_iface.set_do(1, True) Set digital output pin 2 to low and 3 to high in a single transaction:: await self.platform_iface.set_do([2, 3], [False, True]) Parameters ---------- do : Union[int, list[int]] Pin numbers to set the values of. Can be a single pin number or a list of pin numbers. value : Union[int, list[int]] Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value. .. note:: The length of the `do` and `value` lists must be the same! Returns ------- list[bool] A list of digital output values that were set. This should ordinarily return all `True` values. Returns None if the request failed. """ pins, values = self._cast_pin_values(do, value) return await self.make_request( "setDO", platform_iface_pb2.setDORequest(do=pins, value=values), response_field="do", )
[docs] @cli_command() async def schedule_do( self, do: int | list[int], value: bool | list[bool], in_secs: int ) -> None: """Schedule digital output values. This is similar to `set_do`, but schedules the change in a specified number of seconds. Examples -------- Schedule a single digital output pin to be set high in 10 seconds:: await self.platform_iface.schedule_do(1, True, 10) # Set digital output pin 1 to high in 10 seconds Schedule multiple pins in 5 seconds:: await self.platform_iface.schedule_do([2, 3], [False, True], 5) Parameters ---------- do : Union[int, list[int]] Pin numbers to set the values of. Can be a single pin number or a list of pin numbers. value : Union[bool, list[bool]] Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value. in_secs : int Time in seconds to schedule the change in digital output values. Must be positive. """ if not isinstance(in_secs, int) or in_secs < 0: raise ValueError( f"Invalid value for in_secs: {in_secs}. Must be a positive integer." ) pins, values = self._cast_pin_values(do, value) # Above section is to facilitate the following: # schedule_do(1, 1, 1) => [1],[1],1 # schedule_do([1,4,2], 0, 1) => [1,4,2], [0,0,0], 1 # schedule_do([1,4,2], [0,1,0], 1) => [1,4,2], [0,1,0], 1 return await self.make_request( "scheduleDO", platform_iface_pb2.scheduleDORequest( do=pins, value=values, time_secs=in_secs ), response_field="do", )
[docs] @cli_command() async def fetch_ao(self, *ao: int) -> float | list[float]: """Get analogue output values. Examples -------- Get a single analogue output pin value:: pin1 = await self.platform_iface.fetch_ao(1) Get analogue output pins 1, 2 and 3 in the same transaction:: await self.platform_iface.fetch_ao(1, 2, 3) Parameters ---------- *ao Pin numbers to get the values of. Must be an integer. Returns ------- tuple[float] | float If you requested multiple pins, returns a tuple of values. Otherwise, returns a single float. If the request failed, returns None. """ pins = self._cast_pins(ao) return await self.make_request( "getAO", platform_iface_pb2.getAORequest(ao=pins), response_field="ao" )
[docs] @cli_command() async def set_ao( self, ao: int | list[int], value: float | list[float] ) -> list[bool] | None: """Set analogue output values. Examples -------- Set a single analogue output pin 1 to 3.3V:: await self.platform_iface.set_ao(1, 3.3) Set analogue output pins 2 and 3 to 1.5V and 2.5V in a single transaction:: await self.platform_iface.set_ao([2, 3], [1.5, 2.5]) Parameters ---------- ao : int or list[int] Pin numbers to set the values of. Can be a single pin number or a list of pin numbers. value : bool or list[bool] Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value. Returns ------- list[bool] List of boolean values indicating whether the analogue outputs were set successfully. """ # if not isinstance(value, list): # value = [value] pins, values = self._cast_ao_pin_values(ao, value) return await self.make_request( "setAO", platform_iface_pb2.setAORequest(ao=pins, value=values), response_field="ao", )
[docs] @cli_command() async def schedule_ao( self, ao: int | list[int], value: bool | list[bool], in_secs: int ) -> None: """Schedule analogue output values. This is similar to `set_ao`, but schedules the change in a specified number of seconds. Examples -------- Schedule a single analogue output pin 1 to be set to 3.3V in 10 seconds:: await self.platform_iface.schedule_ao(1, 3.3, 10) # Set analogue output pin 1 to 3.3V in 10 seconds Schedule multiple analogue output pins in 5 seconds:: await self.platform_iface.schedule_ao([2, 3], [1.5, 2.5], 5) # Set analogue output pins 2 and 3 to 1.5V and 2.5V respectively. Parameters ---------- ao : int or list[int] Pin numbers to set the values of. Can be a single pin number or a list of pin numbers. value : float or list[float] Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value. in_secs : int Time in seconds to schedule the change in analogue output values. Must be positive. """ if not isinstance(in_secs, int) or in_secs < 0: raise ValueError( f"Invalid value for in_secs: {in_secs}. Must be a positive integer." ) pins, values = self._cast_ao_pin_values(ao, value) # Above section is to facilitate the following: # schedule_ao(1, 1, 1) => [1],[1],1 # schedule_ao([1,4,2], 0, 1) => [1,4,2], [0,0,0], 1 # schedule_ao([1,4,2], [0,1,0], 1) => [1,4,2], [0,1,0], 1 return await self.make_request( "scheduleAO", platform_iface_pb2.scheduleAORequest( ao=pins, value=values, time_secs=in_secs ), response_field="ao", )
[docs] @cli_command() async def fetch_system_voltage(self) -> float: """Get the system input voltage. This is the voltage supplied to the system, typically from a power supply or battery. Examples -------- Get the system input voltage:: voltage = await self.platform_iface.fetch_system_voltage() Returns ------- float The system input voltage in volts. Returns None if the request failed. """ return await self.make_request( "getInputVoltage", platform_iface_pb2.getInputVoltageRequest(), response_field="voltage", )
[docs] @cli_command() async def fetch_system_power(self) -> float: """Get the system input power. This is the power supplied to the system in watts. Examples -------- Get the system input power:: power_watts = await self.platform_iface.fetch_system_power() Returns ------- float The system input power in watts. Returns None if the request failed. """ return await self.make_request( "getSystemPower", platform_iface_pb2.getSystemPowerRequest(), response_field="power_watts", )
[docs] @cli_command() async def fetch_system_temperature(self) -> float: """Get the system temperature. On a Doovit, this is the temperature of the Raspberry Pi CM4. Examples -------- Print the system temperature:: temperature = await self.platform_iface.fetch_system_temperature() print(f"System temperature: {temperature}°C") Returns ------- float The system temperature in degrees Celsius. Returns None if the request failed. """ return await self.make_request( "getTemperature", platform_iface_pb2.getTemperatureRequest(), response_field="temperature", )
[docs] @cli_command() async def fetch_location(self) -> Location: """Get the device location. Doovits with 4G cards generally implement this using the ModemManager (mmcli). Examples -------- Print the current location:: location = await self.platform_iface.fetch_location() print(f"Latitude: {location.latitude}, Longitude: {location.longitude}, Altitude: {location.altitude}") Returns ------- :class:`pydoover.docker.platform.Location` The location of the device. Returns None if the request failed. """ return await self.make_request( "getLocation", platform_iface_pb2.getLocationRequest(), response_field="location", )
[docs] @cli_command() async def reboot(self): """Reboot the device. You should **not** call this method directly, instead see [guide for shutting down](https://docs.doover.com/guide/app-shutdown) for more information on how to safely initiate a shutdown in an application. """ return await self.make_request("reboot", platform_iface_pb2.rebootRequest())
[docs] @cli_command() async def shutdown(self): """Shutdown the device. You should **not** call this method directly, instead see [guide for shutting down](https://docs.doover.com/guide/app-shutdown) for more information on how to safely initiate a shutdown in an application. """ return await self.make_request("shutdown", platform_iface_pb2.shutdownRequest())
[docs] @cli_command() async def fetch_immunity_seconds(self) -> float: """Get the number of seconds the device is immune for. Immunity is the time for which the device will ignore any shutdown requests. Examples -------- Print the number of seconds the device is immune for:: immunity_secs = await self.platform_iface.fetch_immunity_seconds() print(f"Device immune for: {immunity_secs} seconds") Returns ------- float The number of seconds the device is immune for. """ return await self.make_request( "getShutdownImmunity", platform_iface_pb2.getShutdownImmunityRequest(), response_field="immunity_secs", )
[docs] @cli_command() async def set_immunity_seconds(self, immunity_secs: int) -> float: """Set the number of seconds the device is immune for. Immunity is the time for which the device will ignore any shutdown requests. Examples -------- Set the number of seconds the device is immune for:: immunity_secs = await self.platform_iface.set_immunity_seconds(120) Returns ------- float The number of seconds the device is immune for. """ return await self.make_request( "setShutdownImmunity", platform_iface_pb2.setShutdownImmunityRequest(immunity_secs=immunity_secs), response_field="immunity_secs", )
[docs] @cli_command() async def fetch_wake_on_voltage(self) -> float | None: """Get the input voltage threshold at which the device wakes from shutdown. Returns ------- float The wake-on-voltage threshold, in volts. """ return await self.make_request( "getWakeOnVoltage", platform_iface_pb2.getWakeOnVoltageRequest(), response_field="voltage", )
[docs] @cli_command() async def set_wake_on_voltage(self, voltage: float | None) -> float | None: """Set the input voltage threshold at which the device wakes from shutdown. Returns ------- float The wake-on-voltage threshold, in volts. """ return await self.make_request( "setWakeOnVoltage", platform_iface_pb2.setWakeOnVoltageRequest(voltage=voltage), response_field="voltage", )
[docs] @cli_command() async def fetch_wake_reason(self) -> str | None: """Get the reason the device was most recently woken from shutdown. Examples -------- Print why the device last woke:: reason = await self.platform_iface.fetch_wake_reason() print(f"Last wake reason: {reason}") Returns ------- str The wake reason: one of ``rpc``, ``button``, ``voltage``, ``di_<pin>_event``, ``scheduled``, ``max_off``, ``external`` or ``reboot``. Returns None if the device has never been woken or the request failed. """ return await self.make_request( "getWakeReason", platform_iface_pb2.getWakeReasonRequest(), response_field="wake_reason", )
[docs] @cli_command() async def fetch_sleep_log( self, since: int = 0 ) -> list[platform_iface_pb2.SleepLogEntry]: """Get system-status snapshots captured while the device was asleep. While the compute module is powered off the platform periodically records a snapshot of system voltage, current and IO state (see :meth:`set_sleep_log_interval`). This returns those snapshots. Parameters ---------- since : int = 0 Only return snapshots captured at or after this time (epoch milliseconds). 0 returns all stored snapshots (capped at 100, oldest dropped first). Returns ------- list of :class:`SleepLogEntry` Snapshots in ascending order (oldest first). Each entry has ``timestamp`` (epoch ms), ``input_voltage``, ``system_current`` and the ``di``/``do``/ ``ai``/``ao`` IO readings. """ resp: platform_iface_pb2.getSleepLogResponse = await self.make_request( "getSleepLog", platform_iface_pb2.getSleepLogRequest(since=int(since)), ) if resp is None: return [] return list(resp.entries)
[docs] @cli_command() async def fetch_sleep_log_interval(self) -> int | None: """Get the interval between sleep-log snapshots, in seconds. Returns ------- int The snapshot interval in seconds. 0 means sleep logging is disabled. """ return await self.make_request( "getSleepLogInterval", platform_iface_pb2.getSleepLogIntervalRequest(), response_field="interval_secs", )
[docs] @cli_command() async def set_sleep_log_interval(self, interval_secs: int) -> int | None: """Set the interval between sleep-log snapshots, in seconds. Parameters ---------- interval_secs : int Seconds between snapshots while the device is asleep. Pass 0 to disable sleep logging. Returns ------- int The interval that was set, in seconds. """ return await self.make_request( "setSleepLogInterval", platform_iface_pb2.setSleepLogIntervalRequest( interval_secs=int(interval_secs) ), response_field="interval_secs", )
async def schedule_startup(self, time_secs: int) -> None: return await self.make_request( "scheduleStartup", platform_iface_pb2.scheduleStartupRequest(time_secs=time_secs), response_field="time_secs", ) @cli_command() async def schedule_shutdown(self, time_secs: int) -> None: return await self.make_request( "scheduleShutdown", platform_iface_pb2.scheduleShutdownRequest(time_secs=time_secs), response_field="time_secs", ) @cli_command() async def fetch_io_table(self): res = await self.make_request( "getIoTable", platform_iface_pb2.getIoTableRequest(), response_field="io_table", ) # result = json.loads("".join(await self.make_request("getIoTable", platform_iface_pb2.getIoTableRequest()))) if res is None: return None string = "" for i in res: string += i result = json.loads(string) return result
[docs] @cli_command() async def sync_rtc(self): """Synchronize the real-time clock (RTC) with the system (network) time. For Doovits, you shouldn't need to do this as this is handled automatically by `doovitd`. """ return await self.make_request( "syncRtcTime", platform_iface_pb2.syncRtcTimeRequest() )
[docs] async def fetch_events(self, events_from: int = 0): """Get all events. Parameters ---------- events_from : None or int Starting event id or timestamp (in milliseconds), defaults to all availible. Returns ------- List[:class:`pydoover.docker.platform.Event`] List of events. """ return await self.make_request( "getEvents", platform_iface_pb2.getEventsRequest(events_from=events_from), response_field="events", )
[docs] @cli_command() async def fetch_di_events( self, di_pin: int, edge: str, include_system_events: bool = False, events_from: int = 0, ) -> (bool, list[Event]): """Get digital input events. Parameters ---------- di_pin : int Pin number to check events for. edge : "rising" or "falling" or "both" The edge to listen to events on. include_system_events : bool = False Whether to include system events like for a doovit the cm4 turning on and off or the io board starting up. events_from : None or int Starting event id or timestamp (in milliseconds), defaults to all availible. Returns ------- bool, List[:class:`pydoover.docker.platform.Event`] Whether the events are synced and a list of events for the given digital input pin. """ rising = False falling = False if edge == "rising": rising = True elif edge == "falling": falling = True elif edge == "both": rising = True falling = True resp: platform_iface_pb2.getDIEventsResponse = await self.make_request( "getDIEvents", platform_iface_pb2.getDIEventsRequest( pin=int(di_pin), rising=rising, falling=falling, include_system_events=include_system_events, events_from=int(events_from), ), ) if resp: return resp.events_synced, resp.events return None, []
[docs] @cli_command() async def fetch_di_config(self, pin: int) -> platform_iface_pb2.DIConfig | None: """Get the stored configuration for a digital input pin. Parameters ---------- pin : int The digital input pin number. Returns ------- :class:`DIConfig` The pin configuration with ``pnp_mode``, ``irq_edge``, ``debounce_ms`` and ``wake_on_event`` attributes. Returns None if the request failed. """ return await self.make_request( "getDIConfig", platform_iface_pb2.getDIConfigRequest(pin=int(pin)), response_field="config", )
[docs] @cli_command() async def set_di_config( self, pin: int, pnp_mode: bool | None = None, irq_edge: str | None = None, debounce_ms: int | None = None, wake_on_event: bool | None = None, ) -> platform_iface_pb2.DIConfig | None: """Update the configuration for a digital input pin. Only the parameters you pass are changed; any left as None keep their existing stored value. Examples -------- Enable waking the device when an event fires on DI 0, leaving other settings as-is:: await self.platform_iface.set_di_config(0, wake_on_event=True) Parameters ---------- pin : int The digital input pin number to configure. pnp_mode : bool, optional Whether the input is in PNP (sourcing) mode. irq_edge : "rising" or "falling" or "both", optional The edge(s) that trigger an interrupt/event on this pin. debounce_ms : int, optional Debounce time in milliseconds. wake_on_event : bool, optional Whether an event on this pin should wake the device from shutdown. Returns ------- :class:`DIConfig` The resulting pin configuration after the update. """ kwargs = {"pin": int(pin)} if pnp_mode is not None: kwargs["pnp_mode"] = pnp_mode if irq_edge is not None: kwargs["irq_edge"] = irq_edge if debounce_ms is not None: kwargs["debounce_ms"] = int(debounce_ms) if wake_on_event is not None: kwargs["wake_on_event"] = wake_on_event return await self.make_request( "setDIConfig", platform_iface_pb2.setDIConfigRequest(**kwargs), response_field="config", )
platform_iface = PlatformInterface pulse_counter = PulseCounter