from typing import Any, Iterator, NoReturn, overload
from .manager import TagsManager
from ..config import Schema
class NotSet:
"""Sentinel used when a tag has not been assigned a value."""
def _coerce_tag_value(value: Any, tag_type: str) -> Any:
"""Coerce a tag value to match its declared type.
JSON does not distinguish between integers and floats, so values
from channels often arrive as float even when the tag is declared
as ``"integer"``. This function normalises the value based on the
declared ``tag_type``.
"""
if value is None or value is NotSet:
return value
if tag_type == "integer" and isinstance(value, float) and value.is_integer():
return int(value)
if tag_type == "boolean" and not isinstance(value, bool):
return bool(value)
return value
[docs]
class Tag:
"""Represents a single declared tag definition.
The ``log_on`` parameter accepts the same descriptor objects as the
typed subclasses (:class:`Number`, :class:`Boolean`, :class:`String`)
so the legacy ``Tag("number", log_on=...)`` form supports auto-logging
too. Descriptors are validated against ``tag_type``: numerics accept
:class:`Cross` / :class:`Rise` / :class:`Fall` / :class:`Delta`, and
booleans/strings accept :class:`AnyChange` / :class:`Enter` /
:class:`Exit`.
"""
def __init__(
self,
tag_type: str,
default: Any = NotSet,
name: str | None = None,
log_on: "_LogTrigger | list[_LogTrigger] | None" = None,
live: bool = False,
):
self.tag_type = tag_type
self.name = name
self.default = default
# When True, the docker application republishes this tag's current
# value as a one-shot message on every main-loop iteration (see
# ``TagsManagerDocker.flush_live_tags``) so a watching UI gets a
# fresh value without it being persisted as a logged data point.
self.live = live
self._declared_attr_name: str | None = None
if log_on is None:
self.log_on: list[_LogTrigger] = []
else:
self.log_on = _normalise_log_on(
log_on, _allowed_log_on_for_type(tag_type), type(self).__name__
)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert this tag definition to its schema-style dictionary form."""
data = {"type": self.tag_type}
if self.default is not NotSet:
data["default"] = self.default
if self.live:
data["live"] = True
return data
[docs]
def to_schema(self) -> dict[str, Any]:
"""Alias for :meth:`to_dict` to match the rest of the declarative API."""
return self.to_dict()
def __repr__(self) -> str:
return f"Tag(name={self.name!r}, tag_type={self.tag_type!r}, default={self.default!r})"
def _raise_unbound_error(self) -> NoReturn:
raise RuntimeError(
"Declared Tag definitions are not runtime values. "
"Access tags through a Tags instance."
)
@property
def value(self) -> Any:
self._raise_unbound_error()
def get(self) -> Any:
self._raise_unbound_error()
def set(self, value: Any, log: bool = False) -> Any:
del value, log
self._raise_unbound_error()
def delete(self, log: bool = False) -> Any:
del log
self._raise_unbound_error()
def clear(self) -> Any:
self._raise_unbound_error()
def is_set(self) -> bool:
self._raise_unbound_error()
def increment(self, amount: int | float = 1, log: bool = False) -> Any:
del amount, log
self._raise_unbound_error()
def decrement(self, amount: int | float = 1, log: bool = False) -> Any:
del amount, log
self._raise_unbound_error()
def __str__(self) -> str:
self._raise_unbound_error()
def __bool__(self) -> bool:
self._raise_unbound_error()
def __int__(self) -> int:
self._raise_unbound_error()
def __float__(self) -> float:
self._raise_unbound_error()
def __lt__(self, other: Any) -> bool:
del other
self._raise_unbound_error()
def __le__(self, other: Any) -> bool:
del other
self._raise_unbound_error()
def __gt__(self, other: Any) -> bool:
del other
self._raise_unbound_error()
def __ge__(self, other: Any) -> bool:
del other
self._raise_unbound_error()
def _resolve_target(
self, tags: "Tags", declaration: "_DeclaredTag"
) -> tuple[str | None, str]:
"""Return ``(app_key, key_name)`` for manager-backed get/set calls.
The default implementation routes through the owning ``Tags`` instance's
own ``app_key`` and the declared tag name. Subclasses (e.g.
:class:`RemoteTag`) override this to redirect reads and writes
elsewhere.
"""
return tags.app_key, declaration.name
def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
"""Return ``True`` if this update should be promoted to an immediate log.
Evaluates every descriptor in ``self.log_on``; returns ``True`` if
any of them fired. Tags declared without ``log_on=`` always return
``False``. ``state`` is a mutable per-tag scratchpad owned by the
:class:`Tags` instance.
"""
return _evaluate_triggers(self.log_on, prev, new, state)
def _as_list(value: Any) -> list[Any]:
"""Normalise ``None | scalar | iterable`` into a list."""
if value is None:
return []
if isinstance(value, (list, tuple, set, frozenset)):
return list(value)
return [value]
# ---------------------------------------------------------------------------
# Log-on descriptors
# ---------------------------------------------------------------------------
#
# Each descriptor encodes one auto-logging rule. The typed tag classes
# (Number, Boolean, String) accept ``log_on=descriptor`` (or a list of
# descriptors); on every set, every configured descriptor is consulted
# so its private state stays consistent — the tag fires an immediate
# log if any descriptor returns ``True``.
class _LogTrigger:
"""Base class for ``log_on=`` descriptors.
Subclasses implement :meth:`evaluate`. ``state`` is a per-descriptor
dict owned by the :class:`Tags` instance — descriptors are shared
across :class:`Tags` instances, so they must not store runtime state
on themselves.
"""
def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
raise NotImplementedError
class _Crossing(_LogTrigger):
"""Shared crossing-detection state machine for ``Cross``/``Rise``/``Fall``."""
_DIRECTIONS: frozenset[str] = frozenset()
def __init__(
self,
*thresholds: float,
deadband: float = 0.0,
):
if not thresholds:
raise ValueError(f"{type(self).__name__} requires at least one threshold.")
self.thresholds: list[float] = sorted(float(t) for t in thresholds)
self.deadband: float = float(deadband or 0.0)
def __repr__(self) -> str:
return f"{type(self).__name__}({self.thresholds!r}, deadband={self.deadband!r})"
def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
# ``prev`` is unused — the recorded side per threshold (in
# ``state``) is the source of truth for crossing detection.
if new is None or new is NotSet or isinstance(new, bool):
return False
if not isinstance(new, (int, float)):
return False
sides: dict[float, str] = state.setdefault("sides", {})
half_band = self.deadband / 2
fired = False
for t in self.thresholds:
upper = t + half_band
lower = t - half_band
current = sides.get(t, "below")
if new >= upper and current == "below":
sides[t] = "above"
if "rise" in self._DIRECTIONS:
fired = True
elif new <= lower and current == "above":
sides[t] = "below"
if "fall" in self._DIRECTIONS:
fired = True
return fired
[docs]
class Cross(_Crossing):
"""Log on crossing any of the given threshold(s) in either direction.
Parameters
----------
thresholds:
A single threshold, or an iterable of thresholds.
deadband:
Hysteresis band — crossings only fire when the value moves at
least ``deadband / 2`` beyond the threshold, suppressing repeat
logs while the value oscillates near it. Defaults to ``0``.
"""
_DIRECTIONS = frozenset({"rise", "fall"})
[docs]
class Rise(_Crossing):
"""Log only on rising crossings (value moving from below to above).
Use for high-side alarms. See :class:`Cross` for parameter details.
"""
_DIRECTIONS = frozenset({"rise"})
[docs]
class Fall(_Crossing):
"""Log only on falling crossings (value moving from above to below).
Use for low-side alarms. See :class:`Cross` for parameter details.
"""
_DIRECTIONS = frozenset({"fall"})
[docs]
class Delta(_LogTrigger):
"""Log when the value moves far enough from the last logged value.
Unlike :class:`Cross`, ``Delta`` doesn't track threshold sides — it
compares each new value against the *last value this descriptor
fired on*. The first set fires unconditionally (and seeds the
baseline) so graphs always have an initial data point.
Exactly one of ``amount=`` or ``percent=`` must be provided.
Parameters
----------
amount:
Minimum absolute change required to fire — e.g. ``amount=5``
fires on any move of at least 5 units up or down from the last
logged value.
percent:
Minimum percentage change required to fire, computed against
the magnitude of the last logged value. ``percent=10`` fires on
a 10% swing. When the last logged value is ``0``, any non-zero
new value fires.
"""
def __init__(
self,
*,
amount: float | None = None,
percent: float | None = None,
):
if (amount is None) == (percent is None):
raise ValueError("Delta requires exactly one of `amount=` or `percent=`.")
self.amount: float | None = float(amount) if amount is not None else None
self.percent: float | None = float(percent) if percent is not None else None
def __repr__(self) -> str:
if self.amount is not None:
return f"Delta(amount={self.amount!r})"
return f"Delta(percent={self.percent!r})"
def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
if new is None or new is NotSet or isinstance(new, bool):
return False
if not isinstance(new, (int, float)):
return False
last_logged = state.get("last_logged", NotSet)
if last_logged is NotSet:
# First set — log unconditionally and seed the baseline.
state["last_logged"] = new
return True
diff = abs(new - last_logged)
if self.amount is not None:
fired = diff >= self.amount
elif last_logged == 0:
# Percent change against zero is undefined; treat any
# non-zero new value as significant so the baseline shifts
# off zero and percentage comparisons can resume normally.
fired = new != 0
else:
fired = (diff / abs(last_logged)) * 100 >= self.percent
if fired:
state["last_logged"] = new
return fired
[docs]
class AnyChange(_LogTrigger):
"""Log on every value transition (for :class:`Boolean` / :class:`String`)."""
def __repr__(self) -> str:
return "AnyChange()"
def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
if prev is NotSet:
prev = None
return prev != new
class _StateMembership(_LogTrigger):
"""Shared logic for ``Enter`` / ``Exit`` descriptors."""
_CHECK_NEW: bool = False
_CHECK_PREV: bool = False
def __init__(self, value: Any):
self.value: Any = value
def __repr__(self) -> str:
return f"{type(self).__name__}({self.value!r})"
def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
if prev is NotSet:
prev = None
if prev == new:
return False
if self._CHECK_NEW and new == self.value:
return True
if self._CHECK_PREV and prev == self.value:
return True
return False
[docs]
class Enter(_StateMembership):
"""Log only on entering the given value."""
_CHECK_NEW = True
[docs]
class Exit(_StateMembership):
"""Log only on exiting the given value."""
_CHECK_PREV = True
# ---------------------------------------------------------------------------
# Typed tag classes
# ---------------------------------------------------------------------------
_NUMERIC_TAG_TYPES: frozenset[str] = frozenset({"number", "integer", "float"})
_STATE_TAG_TYPES: frozenset[str] = frozenset({"boolean", "string"})
def _allowed_log_on_for_type(tag_type: str) -> tuple[type, ...]:
"""Return the descriptor classes accepted by ``log_on=`` for ``tag_type``.
Used by the base :class:`Tag` constructor so the legacy
``Tag("number", log_on=...)`` form validates against the same rules
the typed subclasses enforce.
"""
if tag_type in _NUMERIC_TAG_TYPES:
return (Cross, Rise, Fall, Delta)
if tag_type in _STATE_TAG_TYPES:
return (AnyChange, Enter, Exit)
raise TypeError(
f"log_on is not supported for tag_type {tag_type!r}; "
f"supported types are: number, integer, float, boolean, string."
)
def _normalise_log_on(
log_on: Any, allowed: tuple[type, ...], owner: str
) -> list[_LogTrigger]:
"""Validate and unpack ``log_on=`` into a list of trigger descriptors."""
triggers = _as_list(log_on)
for t in triggers:
if not isinstance(t, allowed):
allowed_names = ", ".join(c.__name__ for c in allowed)
raise TypeError(
f"{owner} log_on accepts {allowed_names} descriptors, "
f"got {type(t).__name__}."
)
return triggers
[docs]
class Number(Tag):
"""A numeric tag declaration with optional crossing-based auto-logging.
Parameters
----------
default:
Value returned by reads when the manager has nothing stored.
name:
Optional explicit declaration name (otherwise inherits the
attribute name on the owning :class:`Tags` subclass).
log_on:
One :class:`Cross` / :class:`Rise` / :class:`Fall` /
:class:`Delta` descriptor, or a list of them. Each describes a
rule that promotes the update to an immediate log when fired.
"""
_ALLOWED_TRIGGERS: tuple[type, ...] = (Cross, Rise, Fall, Delta)
def __init__(
self,
*,
default: Any = NotSet,
name: str | None = None,
log_on: _LogTrigger | list[_LogTrigger] | None = None,
live: bool = False,
):
super().__init__("number", default=default, name=name, live=live)
self.log_on: list[_LogTrigger] = _normalise_log_on(
log_on, self._ALLOWED_TRIGGERS, type(self).__name__
)
def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
return _evaluate_triggers(self.log_on, prev, new, state)
[docs]
class Boolean(Tag):
"""A boolean tag declaration with optional state-transition auto-logging.
Parameters
----------
default, name:
See :class:`Tag`.
log_on:
One :class:`AnyChange` / :class:`Enter` / :class:`Exit` descriptor,
or a list of them.
"""
_ALLOWED_TRIGGERS: tuple[type, ...] = (AnyChange, Enter, Exit)
def __init__(
self,
*,
default: Any = NotSet,
name: str | None = None,
log_on: _LogTrigger | list[_LogTrigger] | None = None,
live: bool = False,
):
super().__init__("boolean", default=default, name=name, live=live)
self.log_on: list[_LogTrigger] = _normalise_log_on(
log_on, self._ALLOWED_TRIGGERS, type(self).__name__
)
def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
return _evaluate_triggers(self.log_on, prev, new, state)
[docs]
class String(Tag):
"""A string tag declaration with optional state-transition auto-logging.
See :class:`Boolean` for the meaning of ``log_on``.
"""
_ALLOWED_TRIGGERS: tuple[type, ...] = (AnyChange, Enter, Exit)
def __init__(
self,
*,
default: Any = NotSet,
name: str | None = None,
log_on: _LogTrigger | list[_LogTrigger] | None = None,
live: bool = False,
):
super().__init__("string", default=default, name=name, live=live)
self.log_on: list[_LogTrigger] = _normalise_log_on(
log_on, self._ALLOWED_TRIGGERS, type(self).__name__
)
def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool:
return _evaluate_triggers(self.log_on, prev, new, state)
def _evaluate_triggers(
triggers: list[_LogTrigger],
prev: Any,
new: Any,
state: dict[str, Any],
) -> bool:
"""Run every trigger so each updates its private state, OR results."""
if not triggers:
return False
fired = False
for i, trigger in enumerate(triggers):
sub_state = state.setdefault(f"_t{i}", {})
if trigger.evaluate(prev, new, sub_state):
fired = True
return fired
class _UnresolvedRemoteTag(Exception):
"""Internal: an optional :class:`RemoteTag` has no resolved upstream target.
Raised by :meth:`RemoteTag._resolve_target` when the matching
:class:`pydoover.config.TagRef` was left blank by the operator. Caught
by :meth:`Tags._get_tag_value` (returns the declared default) and
:meth:`Tags._set_tag_value` (silently no-ops).
"""
[docs]
class RemoteTag(Tag):
"""A tag declaration that resolves to a tag published by another application.
The runtime target is described by a :class:`pydoover.config.TagRef` config
element whose ``reference_name`` matches this tag's ``reference_name``. The
binding happens at setup time via :meth:`Tags._resolve_remote_tags`.
When ``republish_locally`` is true (the default), the resolved upstream
value is mirrored into this app's own tag namespace under the
``reference_name`` key — so other consumers on the device (UIs,
downstream tags) can read it as if it were a local tag.
Cross-agent references (``agent_id`` set on the underlying
:class:`~pydoover.config.TagRef`) are accepted in the schema but raise
:class:`NotImplementedError` at runtime: the wiring is deferred so the
schema does not need to change later.
Parameters
----------
tag_type:
Asserted by the developer (matches the upstream type).
reference_name:
Local handle; must match a ``TagRef`` config element's
``reference_name``.
republish_locally:
Mirror upstream changes into this app's namespace under
``reference_name``. Defaults to ``True``. No-op on managers that do
not support subscriptions (e.g. processor contexts).
default:
Returned when the manager has no value for the upstream tag.
name:
Optional explicit declaration name (otherwise inherits the attribute
name on the owning :class:`Tags` subclass).
"""
def __init__(
self,
tag_type: str,
*,
reference_name: str,
republish_locally: bool = True,
default: Any = NotSet,
name: str | None = None,
optional: bool = False,
live: bool = False,
):
# Optional RemoteTags need a sensible read-fallback when the matching
# TagRef is left blank. Default to None so callers don't have to
# restate it on every declaration.
if optional and default is NotSet:
default = None
super().__init__(tag_type, default=default, name=name, live=live)
self.reference_name = reference_name
self.republish_locally = republish_locally
self.optional = optional
def __repr__(self) -> str:
return (
f"RemoteTag(name={self.name!r}, reference_name={self.reference_name!r}, "
f"tag_type={self.tag_type!r}, republish_locally={self.republish_locally}, "
f"optional={self.optional})"
)
def _resolve_target(
self, tags: "Tags", declaration: "_DeclaredTag"
) -> tuple[str | None, str]:
# Resolution state lives on the Tags instance because templates are
# shared across all instances of a given Tags subclass.
try:
target = tags._remote_tag_targets[declaration.attr_name]
except KeyError as exc:
if self.optional:
# Operator left the matching TagRef blank — caller falls
# back to the declared default (read) or no-ops (write).
raise _UnresolvedRemoteTag from exc
raise RuntimeError(
f"RemoteTag '{declaration.attr_name}' has not been resolved "
f"against the config. The application framework should call "
f"`await tags._resolve_remote_tags()` after `Tags.setup()`."
) from exc
if target["agent_id"]:
raise NotImplementedError(
f"Cross-agent RemoteTag (agent_id={target['agent_id']!r}) "
"is not yet supported."
)
return target["app_key"], target["tag_name"]
[docs]
class BoundTag:
"""Manager-backed runtime view of a declared tag.
A ``BoundTag`` is what you interact with on a :class:`Tags` instance.
Reads and writes are delegated to the registered tag manager.
"""
_NUMERIC_TAG_TYPES = {"number", "integer", "float"}
def __init__(self, tags: "Tags", declaration: "_DeclaredTag"):
self._tags = tags
self._declaration = declaration
@property
def _is_async(self) -> bool:
return self._tags._is_async
@property
def name(self) -> str:
"""str: The resolved runtime name of this tag."""
return self._declaration.name
@property
def tag_type(self) -> str:
"""str: The declared tag type."""
return self._declaration.template.tag_type
@property
def default(self) -> Any:
"""Any: The default value returned when no runtime value exists."""
return self._declaration.template.default
@property
def live(self) -> bool:
"""bool: Whether the underlying tag was declared with ``live=True``."""
return bool(getattr(self._declaration.template, "live", False))
@property
def value(self) -> Any:
"""Any: Convenience alias for :meth:`get`."""
return self.get()
[docs]
def get(self) -> Any:
"""Return the current value of this tag from the registered manager."""
return self._tags._get_tag_value(self._declaration.attr_name)
[docs]
async def set(self, value: Any, log: bool = False) -> None:
"""Set this tag's value via the registered manager.
Parameters
----------
value:
The new value.
log:
When ``True``, the update is also recorded as a logged data
point (a channel message) as soon as possible — typically at
the end of the current main-loop iteration in docker apps,
or at the end of the current invocation in processors —
rather than waiting for the next periodic log flush (up to
15 minutes).
"""
await self._tags._set_tag_value(self._declaration.attr_name, value, log=log)
[docs]
async def delete(self, log: bool = False) -> None:
"""Delete this tag from the cloud.
Removes the tag's entry from the aggregate channel. Prefer this
over ``tag.set(None)`` to make the deletion intent explicit.
"""
await self._tags._delete_tag_value(self._declaration.attr_name, log=log)
[docs]
async def clear(self) -> None:
"""Reset this tag back to its declared default value."""
await self.set(self.default)
[docs]
def is_set(self) -> bool:
"""Return ``True`` when this tag currently has a concrete value."""
return self.get() is not NotSet
[docs]
async def increment(self, amount: int | float = 1, log: bool = False) -> Any:
"""Increment a numeric tag and return the new value."""
self._validate_numeric("increment")
current = self.get()
if current is NotSet:
raise ValueError(f"Cannot increment unset tag '{self.name}'.")
new_value = current + amount
await self.set(new_value, log=log)
return new_value
[docs]
async def decrement(self, amount: int | float = 1, log: bool = False) -> Any:
"""Decrement a numeric tag and return the new value."""
self._validate_numeric("decrement")
current = self.get()
if current is NotSet:
raise ValueError(f"Cannot decrement unset tag '{self.name}'.")
new_value = current - amount
await self.set(new_value, log=log)
return new_value
def _validate_numeric(self, operation: str) -> None:
if self.tag_type not in self._NUMERIC_TAG_TYPES:
raise TypeError(
f"Cannot {operation} non-numeric tag '{self.name}' of type '{self.tag_type}'."
)
def _compare(self, other: Any, comparator):
return comparator(self.get(), other)
def __repr__(self) -> str:
return f"BoundTag(name={self.name!r}, value={self.get()!r})"
def __copy__(self):
return self
def __deepcopy__(self, _memo):
return self
def __str__(self) -> str:
return str(self.get())
def __bool__(self) -> bool:
return bool(self.get())
def __int__(self) -> int:
return int(self.get())
def __float__(self) -> float:
return float(self.get())
def __eq__(self, other: Any) -> bool:
return self.get() == other
def __lt__(self, other: Any) -> bool:
return self._compare(other, lambda a, b: a < b)
def __le__(self, other: Any) -> bool:
return self._compare(other, lambda a, b: a <= b)
def __gt__(self, other: Any) -> bool:
return self._compare(other, lambda a, b: a > b)
def __ge__(self, other: Any) -> bool:
return self._compare(other, lambda a, b: a >= b)
class _DeclaredTag:
def __init__(self, attr_name: str, template: Tag):
self.attr_name = attr_name
self.template = template
@property
def name(self) -> str:
return self.template.name or self.attr_name
@overload
def __get__(self, instance: None, owner: type["Tags"]) -> Tag: ...
@overload
def __get__(self, instance: "Tags", owner: type["Tags"]) -> BoundTag: ...
def __get__(self, instance: "Tags | None", owner: type["Tags"]) -> Tag | BoundTag:
if instance is None:
return self.template
return BoundTag(instance, self)
def __set__(self, instance: "Tags", value: Any) -> None:
if isinstance(value, BoundTag):
return
instance._set_tag_value(self.attr_name, value)