Docker Applications

class pydoover.docker.Application(app_key: str = None, device_agent: DeviceAgentInterface = None, platform_iface: PlatformInterface = None, modbus_iface: ModbusInterface = None, name: str = None, test_mode: bool = False, config_fp: str = None, healthcheck_port: int = None)[source]

Base class for a Doover application. All apps will inherit from this class, and override the setup and main_loop methods.

You generally don’t need to worry about initiating parameters to this class as that will be done through run_app.

Examples

The following is an incredibly simple example of a Doover application that shows how to initiate this Application class. However, in practice, it is suggested to use the template application repository for a more structured, complex scaffold for building apps.

A basic application:

from pydoover.docker import Application, run_app
from pydoover.config import Schema
from pydoover import ui
from pydoover.tags import Boolean, Tags

class MyTags(Tags):
    ready = Boolean(default=False)

class MyUI(ui.UI):
    ready = ui.BooleanVariable("ready", "Ready", value=MyTags.ready)

class MyApp(Application):
    config_cls = Schema
    tags_cls = MyTags
    ui_cls = MyUI

    def setup(self):
        self.tags.ready.set(True)

    def main_loop(self):
        # Your main loop logic here
        pass

if __name__ == "__main__":
    run_app(MyApp())
config

The configuration schema for the application. See [] for more information about config schemas.

Type:

Schema

device_agent

The interface to the Doover Device Agent, which allows the app to communicate with the Doover cloud and other devices.

Type:

DeviceAgentInterface

platform_iface

The interface to the Doover Platform, which allows the app to interact with the device’s hardware.

Type:

PlatformInterface

modbus_iface

The interface to the Modbus communication protocol, allowing the app to read and write Modbus registers.

Type:

ModbusInterface

ui_manager

The UI manager for the application, which handles the user interface elements and commands.

Type:

UIManager

app_key

The application key for the app, used to identify it in the Doover cloud. This is globally unique.

Type:

str

async check_can_shutdown() bool[source]

Check if the application can shutdown.

This method is called when the application is requested to shutdown, and should be overridden by an application if specific logic is required when a shutdown is requested.

See [https://docs.doover.com/docker/shutdown-behaviour] for a detailed explanation of the shutdown behaviour.

This must be implemented as an asynchronous function, take no parameters and return a boolean value.

A return value of True indicates that the application can shutdown, while False indicates that it cannot.

By default, this method always returns True, meaning the application can shutdown without any checks.

Examples

Simple example that checks if Digital Output 0 (maybe an engine or fan) is Low before returning True:

class MyApplication(Application):
    # setup, main_loop, etc...

    async def check_can_shutdown(self) -> bool:
        if await self.platform_iface.fetch_do(0) == 0:
            log.info("Digital Output 0 is Low. Can shutdown.")
            return True
        else:
            log.warning("Digital Output 0 is High. Cannot shutdown.")
            return False
config_cls

alias of Schema

async create_message(channel_name: str, data: dict[str, Any], files: list[File] = None, timestamp: datetime = None) int[source]

Create a new message on a channel.

Parameters:
  • channel_name (str) – The name of the channel to create the message on.

  • data (dict) – The message data.

  • files (list[File], optional) – Files to attach to the message.

  • timestamp (datetime, optional) – The timestamp for the message. Defaults to now (UTC).

Returns:

The ID of the created message.

Return type:

int

get_global_tag(tag_key: str, default: Any = None) Any | None[source]

Get a global tag value.

Global tags are tags that are not specific to an app, but are shared across all apps.

Warning

Due to namespacing concerns, it’s best practice to use global tags sparingly and only for values that are truly global in nature. For example, you might use a global tag for a shutdown request or a system-wide status indicator. If you need to get a tag for a specific app, use get_tag() instead.

Examples

>>> is_flag_set = self.get_global_tag("my_global_flag")
>>> print(f"Global flag my_global_flag is set to {is_flag_set}")
Parameters:
  • tag_key (str) – The global tag to fetch.

  • default (Any, optional) – The default value to return if the tag does not exist. Defaults to None.

Returns:

The value of the global tag, or None if the tag does not exist.

Return type:

Any

get_tag(tag_key: str, app_key: str = None, default: Any = None) Any | None[source]

Get a tag value for a specific app.

If you want to get a global tag, use get_global_tag() instead.

Examples

>>> tag_value = self.get_tag("other_tag", "some-other-app-1234")
>>> print(f"other-tag is {tag_value} for app some-other-app-1234")
>>> tag_value = self.get_tag("my_tag")
>>> print(f"my-tag is {tag_value} for current app {self.app_key}")
Parameters:
  • tag_key (str) – The tag to fetch.

  • app_key (str, optional) – The app key to get the tag for. This defaults to the current app.

  • default (Any, optional) – The default value to return if the tag does not exist. Defaults to None.

Returns:

The value of the tag, or None if the tag does not exist.

Return type:

Any

property is_agent_open: bool

Whether some user has this agent’s page open.

property is_app_open: bool

Whether some user has this application expanded on the agent page.

property is_being_observed: bool

Whether any user has an active claim on this agent (any bucket).

property is_group_open: bool

Whether some user is viewing a context that renders this agent.

is_live_tag_open(tag_name: str, app_key: str | None = None) bool[source]

Whether some user has the named tag in live mode on this agent.

property is_ready: bool

Check if the application is ready.

The application is ready when all initialization tasks have completed and the UI is set up. In practice, this means your setup method has completed and the application is connected to the cloud.

Returns:

True if the application is ready, False otherwise.

Return type:

bool

async main_loop()[source]

The main loop function for the application.

Your application should override this method to perform the main logic of your application.

Generally, this involves running and checking any state machines, setting tags, reading sensors, etc. depending on your application.

This function is called in a continuous loop, so it should generally not perform any long blocking calls, instead deferring to checking if a result is ready to be processed in a future loop.

You can control the speed at which this loop runs by setting the loop_target_period attribute of the application instance. By default, this is set to a target invocation period of 1 second.

This function can be asynchronous or synchronous, depending on your needs.

You do not need to call super() inside your setup method; this function does nothing by default.

async next()[source]

Increment a main loop iteration. This is only available in test mode.

Normally, the main loop runs in an infinite cycle every loop_target_period seconds.

During testing, it is helpful to be able to control the flow of the main loop, so this method allows you to increment the main loop iteration manually. Simply call this method to run the next iteration of the main loop.

Examples

A simple example:

from pydoover.docker import Application, run_app
from pydoover.config import Schema

async def test_app():
    class MyApp(Application):
        config_cls = Schema

    app = MyApp(test_mode=True)
    asyncio.create_task(run_app(app, start=False))

    # wait for app to start
    await app.wait_until_ready()

    # increment the main loop once
    await app.next()
Raises:

RuntimeError – If this method is called when the app is not in test mode. This method is only available in test mode.

async on_aggregate_update(event: AggregateUpdateEvent)[source]

Called when the aggregate is updated on a subscribed channel.

Override this method in your application to handle aggregate update events.

You do not need to call super().on_aggregate_update() — this method does nothing by default.

Parameters:

event (AggregateUpdateEvent) – The aggregate update event.

async on_channel_sync(event: ChannelSyncEvent)[source]

Called once per channel when the initial aggregate is fetched on subscription.

Override this method in your application to handle the initial channel state.

You do not need to call super().on_channel_sync() — this method does nothing by default.

Parameters:

event (ChannelSyncEvent) – The channel sync event containing the initial aggregate.

async on_message_create(event: MessageCreateEvent)[source]

Called when a new message is created on a subscribed channel.

Override this method in your application to handle message creation events.

You do not need to call super().on_message_create() — this method does nothing by default.

Parameters:

event (MessageCreateEvent) – The message creation event.

async on_message_update(event: MessageUpdateEvent)[source]

Called when a message is updated on a subscribed channel.

Override this method in your application to handle message update events.

You do not need to call super().on_message_update() — this method does nothing by default.

Parameters:

event (MessageUpdateEvent) – The message update event.

async on_oneshot_message(event: OneShotMessage)[source]

Called when a one-shot message is received on a subscribed channel.

Override this method in your application to handle one-shot message events.

You do not need to call super().on_oneshot_message() — this method does nothing by default.

Parameters:

event (OneShotMessage) – The one-shot message event.

async on_shutdown_at(dt: datetime) None[source]

Callback for when a shutdown is scheduled.

See [https://docs.doover.com/docker/shutdown-behaviour] for a detailed explanation of the shutdown behaviour.

This method is called when a shutdown is scheduled, and can be overridden by an application to perform specific actions before the imminent system shutdown.

By default, this method does nothing.

Examples

Simple logging example:

class MyApplication(Application):
    # setup, main_loop, etc...

    async def on_shutdown_at(self, dt: datetime):
        log.info(f"Shutdown scheduled at {dt}. Performing cleanup...")
Parameters:

dt (datetime) – The datetime when the shutdown is scheduled.

async request_shutdown() None[source]

Request a system shutdown.

async send_notification(message: str | Notification, *, title: str | None = None, severity: NotificationSeverity | int | None = None, topic: str | None = None) int[source]

Send a notification via the notifications channel.

The Doover cloud fans this out to any notification subscriptions (email / SMS / web push / http) that match the given severity and topic.

Parameters:
  • message (str | Notification) – Either the notification body, or a fully-constructed Notification (in which case title, severity and topic are ignored).

  • title (str, optional) – Optional title / headline for the notification.

  • severity (NotificationSeverity | int, optional) – Severity level. Subscribers only receive notifications at or above their subscription severity.

  • topic (str, optional) – Optional topic used to match subscription topic_filter entries.

Returns:

The ID of the created channel message.

Return type:

int

async set_global_tag(tag_key: str, value: Any, only_if_changed: bool = True, log: bool = False) None[source]

Set a global tag value.

As in get_global_tag(), global tags are not specific to an app, but are shared across all apps and should be used sparingly as such.

Examples

>>> self.set_global_tag("my_global_flag", True)
>>> self.set_global_tag("system_status", "operational")
Parameters:
  • tag_key (str) – The global tag to set.

  • value (Any) – The value to set the global tag to.

  • only_if_changed (bool, optional) – If True, the tag will only be set if the value is different from the current value. Defaults to True.

  • log (bool, optional) – If True, the update is also recorded as a logged data point at the end of the current main loop iteration. Defaults to False.

async set_tag(tag_key: str, value: Any, app_key: str = None, only_if_changed: bool = True, log: bool = False) None[source]

Set a tag value.

This method sets a tag value for a specific app. If you want to set a global tag, use set_global_tag() instead.

Tag updates are accumulated and flushed to the aggregate once per main loop cycle. Call flush_tags() to force an immediate flush.

Examples

>>> self.set_tag("my_tag", "my_value")
>>> self.set_tag("other_tag", "other_value", app_key="some-other-app-1234")
Parameters:
  • tag_key (str) – The tag to set.

  • value (Any) – The value to set the tag to.

  • app_key (str, optional) – The app key to set the tag for. This defaults to the current app’s key.

  • only_if_changed (bool, optional) – If True, the tag will only be set if the value is different from the current value. Defaults to True.

  • log (bool, optional) – If True, the update is also recorded as a logged data point at the end of the current main loop iteration, rather than waiting for the next periodic log flush (up to 15 minutes). Defaults to False.

async set_tags(tags: dict[str, Any], app_key: str = None, only_if_changed: bool = True, log: bool = False) None[source]

Set multiple tags at once.

async setup()[source]

The main setup function for the application.

Your application should override this method to perform any setup tasks that need to be done before the main loop starts.

Generally, that involves setting up UI, registering callbacks, starting state machines, etc.

This function can be asynchronous or synchronous, depending on your needs.

You do not need to call super() inside your setup method; this function does nothing by default.

async subscribe(channel_name: str, events: EventSubscription = <EventSubscription.all: 31>)[source]

Subscribe to events on a channel.

When events are received on the channel, the appropriate on_* callback methods will be called (e.g. on_message_create(), on_aggregate_update()).

You can subscribe to specific event types using the events parameter, or subscribe to all events (the default).

Examples

Subscribe to all events on a channel:

async def setup(self):
    await self.subscribe("my_channel")

Subscribe to only message creation events:

async def setup(self):
    await self.subscribe("my_channel", EventSubscription.message_create)

Combine event types:

async def setup(self):
    await self.subscribe(
        "my_channel",
        EventSubscription.message_create | EventSubscription.aggregate_update,
    )
Parameters:
  • channel_name (str) – The name of the channel to subscribe to.

  • events (EventSubscription, optional) – Which event types to subscribe to. Defaults to EventSubscription.all.

tags_cls

alias of Tags

async update_channel_aggregate(channel_name: str, data: dict[str, Any], files: list[File] = None, clear_attachments: bool = False, replace_data: bool = False, max_age_secs: float = None) Aggregate[source]

Update the aggregate data on a channel.

Parameters:
  • channel_name (str) – The name of the channel to update the aggregate on.

  • data (dict) – The aggregate data. By default this is merged with existing data.

  • files (list[File], optional) – Files to attach to the aggregate.

  • clear_attachments (bool, optional) – If True, clear existing attachments before adding new ones. Defaults to False.

  • replace_data (bool, optional) – If True, replace the aggregate data entirely instead of merging. Defaults to False.

  • max_age_secs (float, optional) – Maximum age in seconds before the aggregate is published to the cloud.

Returns:

The updated aggregate.

Return type:

Aggregate

async update_message(channel_name: str, message_id: int, data: dict[str, Any], files: list[File] = None, replace_data: bool = False, clear_attachments: bool = False) Message[source]

Update an existing message on a channel.

Parameters:
  • channel_name (str) – The name of the channel the message belongs to.

  • message_id (int) – The ID of the message to update.

  • data (dict) – The updated message data. By default this is merged with existing data.

  • files (list[File], optional) – Files to attach to the message.

  • replace_data (bool, optional) – If True, replace the message data entirely instead of merging. Defaults to False.

  • clear_attachments (bool, optional) – If True, clear existing attachments before adding new ones. Defaults to False.

Returns:

The updated message.

Return type:

Message

async wait_for_interval(target_time: float)[source]

Waits for the necessary amount of time to maintain a consistent interval of target_time seconds between calls to this method.

async wait_until_ready()[source]

Wait until the application is ready.

This method waits (blocks) the current loop until the application is ready.

pydoover.docker.run_app(app: Application, start: bool = True, setup_logging: bool = True, log_formatter: Formatter = None, log_filters: Filter | list[Filter] = None)[source]

Run the application.

This function initializes the application, sets up the interfaces, and runs the main loop. If start is True, it will run the application in a blocking manner, otherwise it will return an async runner function. This is useful for testing or when you want to run the application in an event loop without blocking the main thread, but not recommended for production use.

Examples

The general recommended structure for starting applications in the __init__.py file:

from pydoover.docker import run_app

from .application import SampleApplication
from .app_config import SampleConfig

def main():
    run_app(SampleApplication())
Parameters:
  • app (Application) – The application instance to run.

  • start (bool, optional) – If True, the application will run in a blocking manner. If False, it will return an async runner function. Defaults to True.

  • setup_logging (bool, optional) – If True, the logging will be set up. Defaults to True. You can pass a custom logging formatter to the log_formatter parameter.

  • log_formatter (logging.Formatter, optional) – The logging formatter to use. Defaults to None, which will use a simple custom formatter defined in pydoover.utils.LogFormatter.

  • log_filters (logging.Filter | list[logging.Filter], optional) – The logging filters to use. Defaults to None, which will not apply any filters.