Device Agent

class pydoover.docker.DeviceAgentInterface(app_key: str, dda_uri: str = '127.0.0.1:50051', dda_timeout: int = 7, max_conn_attempts: int = 5, time_between_connection_attempts: int = 10, service_name: str = 'doover.DeviceAgent')[source]

Interface for interacting with the Device Agent gRPC service.

dda_timeout

Timeout for requests to the Device Agent service.

Type:

int

max_connection_attempts

Maximum number of attempts to connect to the Device Agent service.

Type:

int

time_between_connection_attempts

Time to wait between connection attempts to the Device Agent service.

Type:

int

is_dda_available

Whether the Device Agent service is available. This is set to True once a successful request has been made to the service.

Type:

bool

is_dda_online

Whether the Device Agent service is currently online.

Type:

bool

has_dda_been_online

Whether the Device Agent service has been online at least once since the interface was created.

Type:

bool

last_channel_message_ts

A dictionary that stores the last time a message was received from each channel.

Type:

dict

add_event_callback(channel_name: str, callback: Callable, events: EventSubscription = <EventSubscription.all: 31>) None[source]

Register a callback for events on a channel.

The callback receives a single event argument, one of MessageCreateEvent, MessageUpdateEvent, AggregateUpdateEvent, or OneShotMessage, filtered by the events parameter.

The channel name is accessible via the event payload itself.

Starts the event stream for the channel if not already running.

Parameters:
  • channel_name (str) – Name of channel to subscribe to.

  • callback (Callable) – An async callback (event) -> None.

  • events (EventSubscription, optional) – Which event types to deliver. Defaults to EventSubscription.all.

fetch_channel_aggregate(channel_name: str) Aggregate[source]

Fetch a channel’s current aggregate payload.

If the channel has been subscribed to via add_event_callback(), the cached aggregate is returned. Otherwise, a gRPC call is made to fetch it.

Examples

>>> aggregate = await self.device_agent.fetch_channel_aggregate("my_channel")
>>> print(aggregate.data)
Parameters:

channel_name (str) – Name of channel to get aggregate from.

Returns:

Aggregate from channel.

Return type:

Aggregate

Raises:
  • NotFoundError – If the channel does not exist.

  • DooverAPIError – If the request fails.

static has_persistent_connection()[source]

For the Device Agent, this always returns True. This method exists to provide interoperability with the API client.

is_channel_synced(channel_name)[source]

Check if a channel is synced with DDA.

During normal operation, this should always return True while DDA is active.

It is only really useful for timing during the startup process.

Parameters:

channel_name (str) – Name of the channel to check.

Returns:

True if the channel is synced, False otherwise.

Return type:

bool

listen_channel(channel_name: str) None[source]

Listen to channel events, printing the output to the console.

Parameters:

channel_name (str) – Name of channel to listen to.

stub

alias of deviceAgentStub

async wait_for_channels_sync(channel_names: list[str], timeout: int = 5, inter_wait: float = 0.2) bool[source]

Wait for all specified channels to be synced with DDA.

This is invoked internally at startup to ensure that all channels are ready before proceeding with operations that depend on them.

You shouldn’t need to use this during normal operation.

Parameters:
  • channel_names (list[str]) – List of channel names to check for sync status.

  • timeout (int) – Maximum time to wait for all channels to sync, in seconds.

  • inter_wait (float) – Time to wait between checks, in seconds.

Returns:

True if all channels are synced within the timeout, False otherwise.

Return type:

bool