Platform Interface

class pydoover.docker.PlatformInterface(app_key: str, plt_uri: str = 'localhost:50053', service_name: str = 'doover.PlatformInterface')[source]

Docker interface for interacting with the platform interface container.

This interface allows you to interact with the platform interface gRPC service, providing access to device IO.

Some implementations are platform-specific, and it is your responsibility to ensure that all hardware that your application is compatible with implements the methods you are trying to fetch. Most methods will return None if they are not supported or you pass a bad input.

An example of bad input is requesting Digital Input #10 on a Doovit that only supports 4.

fetch_ai(*ai: int) float | list[float][source]

Get analogue input values.

Examples

Get a single analogue input pin value:

pin1 = await self.platform_iface.fetch_ai(1)

Get analogue input pins 1, 2 and 3 in the same transaction:

pin1, pin2, pin3 = await self.platform_iface.fetch_ai(1, 2, 3)
Parameters:

*ai – Pin numbers to get the values of. Can be one or more integer pin mumber.

Returns:

If you requested one analog input, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed.

Return type:

float | list[float]

fetch_ao(*ao: int) float | list[float][source]

Get analogue output values.

Examples

Get a single analogue output pin value:

pin1 = await self.platform_iface.fetch_ao(1)

Get analogue output pins 1, 2 and 3 in the same transaction:

await self.platform_iface.fetch_ao(1, 2, 3)
Parameters:

*ao – Pin numbers to get the values of. Must be an integer.

Returns:

If you requested multiple pins, returns a tuple of values. Otherwise, returns a single float. If the request failed, returns None.

Return type:

tuple[float] | float

fetch_di(*di: int) bool | list[bool][source]

Get digital input values.

Examples

Get a single digital input pin value:

pin1 = await self.platform_iface.fetch_di(1)

Get digital input pins 1, 2 and 3 in the same transaction:

pin1, pin2, pin3 = await self.platform_iface.fetch_di(1, 2, 3)
Parameters:

*di – Pin numbers to get the values of. Can be one or more integers.

Returns:

Returns one or more booleans where True means the pin is high (1) and False means the pin is low (0). If you requested one pin, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed.

Return type:

bool | list[bool]

fetch_di_config(pin: int) DIConfig | None[source]

Get the stored configuration for a digital input pin.

Parameters:

pin (int) – The digital input pin number.

Returns:

The pin configuration with pnp_mode, irq_edge, debounce_ms and wake_on_event attributes. Returns None if the request failed.

Return type:

DIConfig

fetch_di_events(di_pin: int, edge: str, include_system_events: bool = False, events_from: int = 0) -> (<class 'bool'>, list[pydoover.docker.platform.platform_types.Event])[source]

Get digital input events.

Parameters:
  • di_pin (int) – Pin number to check events for.

  • edge ("rising" or "falling" or "both") – The edge to listen to events on.

  • include_system_events (bool = False) – Whether to include system events like for a doovit the cm4 turning on and off or the io board starting up.

  • events_from (None or int) – Starting event id or timestamp (in milliseconds), defaults to all availible.

Returns:

Whether the events are synced and a list of events for the given digital input pin.

Return type:

bool, List[pydoover.docker.platform.Event]

fetch_do(*do: int) list[bool] | None[source]

Get digital output values.

Examples

Get a single digital output pin value:

await self.platform_iface.fetch_do(1)

Get digital output pins 1, 2 and 3 in the same transaction:

await self.platform_iface.fetch_do(1, 2, 3)
Parameters:

*do – Pin numbers to get the values of. Can be one or more integers.

Returns:

If you requested one, returns a single value. If you requested more than one pin, returns a list of values. Returns None if the request failed.

Return type:

bool | list[bool]

async fetch_events(events_from: int = 0)[source]

Get all events.

Parameters:

events_from (None or int) – Starting event id or timestamp (in milliseconds), defaults to all availible.

Returns:

List of events.

Return type:

List[pydoover.docker.platform.Event]

fetch_immunity_seconds() float[source]

Get the number of seconds the device is immune for.

Immunity is the time for which the device will ignore any shutdown requests.

Examples

Print the number of seconds the device is immune for:

immunity_secs = await self.platform_iface.fetch_immunity_seconds()
print(f"Device immune for: {immunity_secs} seconds")
Returns:

The number of seconds the device is immune for.

Return type:

float

fetch_location() Location[source]

Get the device location.

Doovits with 4G cards generally implement this using the ModemManager (mmcli).

Examples

Print the current location:

location = await self.platform_iface.fetch_location()
print(f"Latitude: {location.latitude}, Longitude: {location.longitude}, Altitude: {location.altitude}")
Returns:

The location of the device. Returns None if the request failed.

Return type:

pydoover.docker.platform.Location

fetch_sleep_log(since: int = 0) list[SleepLogEntry][source]

Get system-status snapshots captured while the device was asleep.

While the compute module is powered off the platform periodically records a snapshot of system voltage, current and IO state (see set_sleep_log_interval()). This returns those snapshots.

Parameters:

since (int = 0) – Only return snapshots captured at or after this time (epoch milliseconds). 0 returns all stored snapshots (capped at 100, oldest dropped first).

Returns:

Snapshots in ascending order (oldest first). Each entry has timestamp (epoch ms), input_voltage, system_current and the di/do/ ai/ao IO readings.

Return type:

list of SleepLogEntry

fetch_sleep_log_interval() int | None[source]

Get the interval between sleep-log snapshots, in seconds.

Returns:

The snapshot interval in seconds. 0 means sleep logging is disabled.

Return type:

int

fetch_system_power() float[source]

Get the system input power.

This is the power supplied to the system in watts.

Examples

Get the system input power:

power_watts = await self.platform_iface.fetch_system_power()
Returns:

The system input power in watts. Returns None if the request failed.

Return type:

float

fetch_system_temperature() float[source]

Get the system temperature.

On a Doovit, this is the temperature of the Raspberry Pi CM4.

Examples

Print the system temperature:

temperature = await self.platform_iface.fetch_system_temperature()
print(f"System temperature: {temperature}°C")
Returns:

The system temperature in degrees Celsius. Returns None if the request failed.

Return type:

float

fetch_system_voltage() float[source]

Get the system input voltage.

This is the voltage supplied to the system, typically from a power supply or battery.

Examples

Get the system input voltage:

voltage = await self.platform_iface.fetch_system_voltage()
Returns:

The system input voltage in volts. Returns None if the request failed.

Return type:

float

fetch_wake_on_voltage() float | None[source]

Get the input voltage threshold at which the device wakes from shutdown.

Returns:

The wake-on-voltage threshold, in volts.

Return type:

float

fetch_wake_reason() str | None[source]

Get the reason the device was most recently woken from shutdown.

Examples

Print why the device last woke:

reason = await self.platform_iface.fetch_wake_reason()
print(f"Last wake reason: {reason}")
Returns:

The wake reason: one of rpc, button, voltage, di_<pin>_event, scheduled, max_off, external or reboot. Returns None if the device has never been woken or the request failed.

Return type:

str

get_new_event_counter(di: int, edge: str = 'rising', callback: ~collections.abc.Callable[[int, bool, int, int, str], None] | ~collections.abc.Coroutine[[<class 'int'>, <class 'bool'>, <class 'int'>, <class 'int'>, <class 'str'>], None] = None, rate_window_secs: int = 20, auto_collect: bool = True) PulseCounter[source]

Create a new Pulse Counter for counting events.

Examples

Basic event counter:

counter = self.platform_interface.get_new_event_counter(0, "rising")
print(counter.get_counter())
print(counter.get_pulses_per_minute())
Parameters:
  • di (int) – Pin number to check events for.

  • edge ("rising" or "falling" or "both") – The edge to listen to evenets on.

  • callback (PulseCounterCallback) – Callback called when an event is processed.

  • rate_window_secs (int = 20) – The size of window for which the rate of events is calculated.

  • auto_collect (bool = True) – Whether to automatically collect the events from the platform interface.

Returns:

The pulse counter object for the given pin.

Return type:

PulseCounter

get_new_pulse_counter(di: int, edge: str = 'rising', callback: ~collections.abc.Callable[[int, bool, int, int, str], None] | ~collections.abc.Coroutine[[<class 'int'>, <class 'bool'>, <class 'int'>, <class 'int'>, <class 'str'>], None] = None, rate_window_secs: int = 20, auto_start: bool = True) PulseCounter[source]

Create a new Pulse Counter for counting pulses on a digital input pin.

Examples

Basic pulse counter:

def pulse_callback(di, di_value, dt_secs, count, edge):
    print(f"Pulse on di={di} with value={di_value}, dt={dt_secs}s, count={count}, edge={edge}")

counter = self.platform_interface.get_new_pulse_counter(0, "rising", callback=pulse_callback)
Parameters:
  • di (int) – Digital input pin to listen for pulses on.

  • edge ("rising" or "falling" or "both") – The edge to listen for pulses on.

  • callback (Callable) – Callback function to call when a pulse is received. The function should take the following arguments - di: int - The pin number the pulse was received on. - di_value: bool - The value of the digital input pin (1 for high, 0 for low). - dt_secs: int - The time since the last pulse in seconds. - count: int - The total number of pulses received. - edge: str - The edge that the pulse was received on (“rising” or “falling”). The callback can be synchronous or asynchronous.

  • rate_window_secs (int) – The size of window for which the rate of pulses is calculated. Default is 20.

  • auto_start (bool) – Whether to automatically start listening for pulses. Default is True.

reboot()[source]

Reboot the device.

You should not call this method directly, instead see [guide for shutting down](https://docs.doover.com/guide/app-shutdown) for more information on how to safely initiate a shutdown in an application.

schedule_ao(ao: int | list[int], value: bool | list[bool], in_secs: int) None[source]

Schedule analogue output values.

This is similar to set_ao, but schedules the change in a specified number of seconds.

Examples

Schedule a single analogue output pin 1 to be set to 3.3V in 10 seconds:

await self.platform_iface.schedule_ao(1, 3.3, 10)  # Set analogue output pin 1 to 3.3V in 10 seconds

Schedule multiple analogue output pins in 5 seconds:

await self.platform_iface.schedule_ao([2, 3], [1.5, 2.5], 5)  # Set analogue output pins 2 and 3 to 1.5V and 2.5V respectively.
Parameters:
  • ao (int or list[int]) – Pin numbers to set the values of. Can be a single pin number or a list of pin numbers.

  • value (float or list[float]) – Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value.

  • in_secs (int) – Time in seconds to schedule the change in analogue output values. Must be positive.

schedule_do(do: int | list[int], value: bool | list[bool], in_secs: int) None[source]

Schedule digital output values.

This is similar to set_do, but schedules the change in a specified number of seconds.

Examples

Schedule a single digital output pin to be set high in 10 seconds:

await self.platform_iface.schedule_do(1, True, 10)  # Set digital output pin 1 to high in 10 seconds

Schedule multiple pins in 5 seconds:

await self.platform_iface.schedule_do([2, 3], [False, True], 5)
Parameters:
  • do (Union[int, list[int]]) – Pin numbers to set the values of. Can be a single pin number or a list of pin numbers.

  • value (Union[bool, list[bool]]) – Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value.

  • in_secs (int) – Time in seconds to schedule the change in digital output values. Must be positive.

set_ao(ao: int | list[int], value: float | list[float]) list[bool] | None[source]

Set analogue output values.

Examples

Set a single analogue output pin 1 to 3.3V:

await self.platform_iface.set_ao(1, 3.3)

Set analogue output pins 2 and 3 to 1.5V and 2.5V in a single transaction:

await self.platform_iface.set_ao([2, 3], [1.5, 2.5])
Parameters:
  • ao (int or list[int]) – Pin numbers to set the values of. Can be a single pin number or a list of pin numbers.

  • value (bool or list[bool]) – Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value.

Returns:

List of boolean values indicating whether the analogue outputs were set successfully.

Return type:

list[bool]

set_di_config(pin: int, pnp_mode: bool | None = None, irq_edge: str | None = None, debounce_ms: int | None = None, wake_on_event: bool | None = None) DIConfig | None[source]

Update the configuration for a digital input pin.

Only the parameters you pass are changed; any left as None keep their existing stored value.

Examples

Enable waking the device when an event fires on DI 0, leaving other settings as-is:

await self.platform_iface.set_di_config(0, wake_on_event=True)
Parameters:
  • pin (int) – The digital input pin number to configure.

  • pnp_mode (bool, optional) – Whether the input is in PNP (sourcing) mode.

  • irq_edge ("rising" or "falling" or "both", optional) – The edge(s) that trigger an interrupt/event on this pin.

  • debounce_ms (int, optional) – Debounce time in milliseconds.

  • wake_on_event (bool, optional) – Whether an event on this pin should wake the device from shutdown.

Returns:

The resulting pin configuration after the update.

Return type:

DIConfig

set_do(do: int | list[int], value: int | list[int]) list[bool] | None[source]

Set digital output values.

Examples

Set a single digital output pin value:

await self.platform_iface.set_do(1, True)

Set digital output pin 2 to low and 3 to high in a single transaction:

await self.platform_iface.set_do([2, 3], [False, True])
Parameters:
  • do (Union[int, list[int]]) – Pin numbers to set the values of. Can be a single pin number or a list of pin numbers.

  • value (Union[int, list[int]]) – Values to set the pins to. Can be a single value or a list of values. If a single value is provided, all pins will be set to that value.

  • note:: (..) – The length of the do and value lists must be the same!

Returns:

A list of digital output values that were set. This should ordinarily return all True values. Returns None if the request failed.

Return type:

list[bool]

set_immunity_seconds(immunity_secs: int) float[source]

Set the number of seconds the device is immune for.

Immunity is the time for which the device will ignore any shutdown requests.

Examples

Set the number of seconds the device is immune for:

immunity_secs = await self.platform_iface.set_immunity_seconds(120)
Returns:

The number of seconds the device is immune for.

Return type:

float

set_sleep_log_interval(interval_secs: int) int | None[source]

Set the interval between sleep-log snapshots, in seconds.

Parameters:

interval_secs (int) – Seconds between snapshots while the device is asleep. Pass 0 to disable sleep logging.

Returns:

The interval that was set, in seconds.

Return type:

int

set_wake_on_voltage(voltage: float | None) float | None[source]

Set the input voltage threshold at which the device wakes from shutdown.

Returns:

The wake-on-voltage threshold, in volts.

Return type:

float

shutdown()[source]

Shutdown the device.

You should not call this method directly, instead see [guide for shutting down](https://docs.doover.com/guide/app-shutdown) for more information on how to safely initiate a shutdown in an application.

stub

alias of platformIfaceStub

sync_rtc()[source]

Synchronize the real-time clock (RTC) with the system (network) time.

For Doovits, you shouldn’t need to do this as this is handled automatically by doovitd.

test_comms(message: str = 'Comms Check Message') str | None[source]

Test connection by sending a basic echo response to platform interface container.

Parameters:

message (str) – Message to send to platform interface to have echo’d as a response

Returns:

The response from platform interface.

Return type:

str

class pydoover.docker.platform.Location[source]

Dataclass for a Location object as returned by platform interface.

latitude

Latitude in degrees.

Type:

float

longitude

Longitude in degrees.

Type:

float

altitude_m

Altitude in meters above sea level.

Type:

float

accuracy_m

Accuracy of the location in meters.

Type:

float

speed_mps

Speed in meters per second.

Type:

float

heading_deg

Heading in degrees (0-360).

Type:

float

sat_count

Number of satellites used to determine the location.

Type:

int

timestamp

Timestamp of the location in ISO 8601 format (e.g., “2023-10-01T12:00:00Z”).

Type:

str

class pydoover.docker.platform.Event[source]

Dataclass for an Event object as returned by platform interface.

event_id

Unique identifier for the event.

Type:

int

event

The type of event, e.g., “DI_R” for rising edge, “DI_F” for falling edge.

Type:

str

pin

The digital input pin number the event occurred on.

Type:

int

value

The value of the digital input pin at the time of the event (e.g., “1” for high, “0” for low).

Type:

str

time

The timestamp of the event in milliseconds since epoch.

Type:

int

cm4_online

Whether the CM4 is online at the time of the event. This can be None if not applicable.

Type:

bool | None