Source code for pydoover.tags

from typing import Any, Iterator, NoReturn, overload

from .manager import TagsManager
from ..config import Schema


class NotSet:
    """Sentinel used when a tag has not been assigned a value."""


def _coerce_tag_value(value: Any, tag_type: str) -> Any:
    """Coerce a tag value to match its declared type.

    JSON does not distinguish between integers and floats, so values
    from channels often arrive as float even when the tag is declared
    as ``"integer"``.  This function normalises the value based on the
    declared ``tag_type``.
    """
    if value is None or value is NotSet:
        return value
    if tag_type == "integer" and isinstance(value, float) and value.is_integer():
        return int(value)
    if tag_type == "boolean" and not isinstance(value, bool):
        return bool(value)
    return value


[docs] class Tag: """Represents a single declared tag definition. The ``log_on`` parameter accepts the same descriptor objects as the typed subclasses (:class:`Number`, :class:`Boolean`, :class:`String`) so the legacy ``Tag("number", log_on=...)`` form supports auto-logging too. Descriptors are validated against ``tag_type``: numerics accept :class:`Cross` / :class:`Rise` / :class:`Fall` / :class:`Delta`, and booleans/strings accept :class:`AnyChange` / :class:`Enter` / :class:`Exit`. """ def __init__( self, tag_type: str, default: Any = NotSet, name: str | None = None, log_on: "_LogTrigger | list[_LogTrigger] | None" = None, live: bool = False, ): self.tag_type = tag_type self.name = name self.default = default # When True, the docker application republishes this tag's current # value as a one-shot message on every main-loop iteration (see # ``TagsManagerDocker.flush_live_tags``) so a watching UI gets a # fresh value without it being persisted as a logged data point. self.live = live self._declared_attr_name: str | None = None if log_on is None: self.log_on: list[_LogTrigger] = [] else: self.log_on = _normalise_log_on( log_on, _allowed_log_on_for_type(tag_type), type(self).__name__ )
[docs] def to_dict(self) -> dict[str, Any]: """Convert this tag definition to its schema-style dictionary form.""" data = {"type": self.tag_type} if self.default is not NotSet: data["default"] = self.default if self.live: data["live"] = True return data
[docs] def to_schema(self) -> dict[str, Any]: """Alias for :meth:`to_dict` to match the rest of the declarative API.""" return self.to_dict()
def __repr__(self) -> str: return f"Tag(name={self.name!r}, tag_type={self.tag_type!r}, default={self.default!r})" def _raise_unbound_error(self) -> NoReturn: raise RuntimeError( "Declared Tag definitions are not runtime values. " "Access tags through a Tags instance." ) @property def value(self) -> Any: self._raise_unbound_error() def get(self) -> Any: self._raise_unbound_error() def set(self, value: Any, log: bool = False) -> Any: del value, log self._raise_unbound_error() def delete(self, log: bool = False) -> Any: del log self._raise_unbound_error() def clear(self) -> Any: self._raise_unbound_error() def is_set(self) -> bool: self._raise_unbound_error() def increment(self, amount: int | float = 1, log: bool = False) -> Any: del amount, log self._raise_unbound_error() def decrement(self, amount: int | float = 1, log: bool = False) -> Any: del amount, log self._raise_unbound_error() def __str__(self) -> str: self._raise_unbound_error() def __bool__(self) -> bool: self._raise_unbound_error() def __int__(self) -> int: self._raise_unbound_error() def __float__(self) -> float: self._raise_unbound_error() def __lt__(self, other: Any) -> bool: del other self._raise_unbound_error() def __le__(self, other: Any) -> bool: del other self._raise_unbound_error() def __gt__(self, other: Any) -> bool: del other self._raise_unbound_error() def __ge__(self, other: Any) -> bool: del other self._raise_unbound_error() def _resolve_target( self, tags: "Tags", declaration: "_DeclaredTag" ) -> tuple[str | None, str]: """Return ``(app_key, key_name)`` for manager-backed get/set calls. The default implementation routes through the owning ``Tags`` instance's own ``app_key`` and the declared tag name. Subclasses (e.g. :class:`RemoteTag`) override this to redirect reads and writes elsewhere. """ return tags.app_key, declaration.name def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: """Return ``True`` if this update should be promoted to an immediate log. Evaluates every descriptor in ``self.log_on``; returns ``True`` if any of them fired. Tags declared without ``log_on=`` always return ``False``. ``state`` is a mutable per-tag scratchpad owned by the :class:`Tags` instance. """ return _evaluate_triggers(self.log_on, prev, new, state)
def _as_list(value: Any) -> list[Any]: """Normalise ``None | scalar | iterable`` into a list.""" if value is None: return [] if isinstance(value, (list, tuple, set, frozenset)): return list(value) return [value] # --------------------------------------------------------------------------- # Log-on descriptors # --------------------------------------------------------------------------- # # Each descriptor encodes one auto-logging rule. The typed tag classes # (Number, Boolean, String) accept ``log_on=descriptor`` (or a list of # descriptors); on every set, every configured descriptor is consulted # so its private state stays consistent — the tag fires an immediate # log if any descriptor returns ``True``. class _LogTrigger: """Base class for ``log_on=`` descriptors. Subclasses implement :meth:`evaluate`. ``state`` is a per-descriptor dict owned by the :class:`Tags` instance — descriptors are shared across :class:`Tags` instances, so they must not store runtime state on themselves. """ def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: raise NotImplementedError class _Crossing(_LogTrigger): """Shared crossing-detection state machine for ``Cross``/``Rise``/``Fall``.""" _DIRECTIONS: frozenset[str] = frozenset() def __init__( self, *thresholds: float, deadband: float = 0.0, ): if not thresholds: raise ValueError(f"{type(self).__name__} requires at least one threshold.") self.thresholds: list[float] = sorted(float(t) for t in thresholds) self.deadband: float = float(deadband or 0.0) def __repr__(self) -> str: return f"{type(self).__name__}({self.thresholds!r}, deadband={self.deadband!r})" def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: # ``prev`` is unused — the recorded side per threshold (in # ``state``) is the source of truth for crossing detection. if new is None or new is NotSet or isinstance(new, bool): return False if not isinstance(new, (int, float)): return False sides: dict[float, str] = state.setdefault("sides", {}) half_band = self.deadband / 2 fired = False for t in self.thresholds: upper = t + half_band lower = t - half_band current = sides.get(t, "below") if new >= upper and current == "below": sides[t] = "above" if "rise" in self._DIRECTIONS: fired = True elif new <= lower and current == "above": sides[t] = "below" if "fall" in self._DIRECTIONS: fired = True return fired
[docs] class Cross(_Crossing): """Log on crossing any of the given threshold(s) in either direction. Parameters ---------- thresholds: A single threshold, or an iterable of thresholds. deadband: Hysteresis band — crossings only fire when the value moves at least ``deadband / 2`` beyond the threshold, suppressing repeat logs while the value oscillates near it. Defaults to ``0``. """ _DIRECTIONS = frozenset({"rise", "fall"})
[docs] class Rise(_Crossing): """Log only on rising crossings (value moving from below to above). Use for high-side alarms. See :class:`Cross` for parameter details. """ _DIRECTIONS = frozenset({"rise"})
[docs] class Fall(_Crossing): """Log only on falling crossings (value moving from above to below). Use for low-side alarms. See :class:`Cross` for parameter details. """ _DIRECTIONS = frozenset({"fall"})
[docs] class Delta(_LogTrigger): """Log when the value moves far enough from the last logged value. Unlike :class:`Cross`, ``Delta`` doesn't track threshold sides — it compares each new value against the *last value this descriptor fired on*. The first set fires unconditionally (and seeds the baseline) so graphs always have an initial data point. Exactly one of ``amount=`` or ``percent=`` must be provided. Parameters ---------- amount: Minimum absolute change required to fire — e.g. ``amount=5`` fires on any move of at least 5 units up or down from the last logged value. percent: Minimum percentage change required to fire, computed against the magnitude of the last logged value. ``percent=10`` fires on a 10% swing. When the last logged value is ``0``, any non-zero new value fires. """ def __init__( self, *, amount: float | None = None, percent: float | None = None, ): if (amount is None) == (percent is None): raise ValueError("Delta requires exactly one of `amount=` or `percent=`.") self.amount: float | None = float(amount) if amount is not None else None self.percent: float | None = float(percent) if percent is not None else None def __repr__(self) -> str: if self.amount is not None: return f"Delta(amount={self.amount!r})" return f"Delta(percent={self.percent!r})" def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: if new is None or new is NotSet or isinstance(new, bool): return False if not isinstance(new, (int, float)): return False last_logged = state.get("last_logged", NotSet) if last_logged is NotSet: # First set — log unconditionally and seed the baseline. state["last_logged"] = new return True diff = abs(new - last_logged) if self.amount is not None: fired = diff >= self.amount elif last_logged == 0: # Percent change against zero is undefined; treat any # non-zero new value as significant so the baseline shifts # off zero and percentage comparisons can resume normally. fired = new != 0 else: fired = (diff / abs(last_logged)) * 100 >= self.percent if fired: state["last_logged"] = new return fired
[docs] class AnyChange(_LogTrigger): """Log on every value transition (for :class:`Boolean` / :class:`String`).""" def __repr__(self) -> str: return "AnyChange()" def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: if prev is NotSet: prev = None return prev != new
class _StateMembership(_LogTrigger): """Shared logic for ``Enter`` / ``Exit`` descriptors.""" _CHECK_NEW: bool = False _CHECK_PREV: bool = False def __init__(self, value: Any): self.value: Any = value def __repr__(self) -> str: return f"{type(self).__name__}({self.value!r})" def evaluate(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: if prev is NotSet: prev = None if prev == new: return False if self._CHECK_NEW and new == self.value: return True if self._CHECK_PREV and prev == self.value: return True return False
[docs] class Enter(_StateMembership): """Log only on entering the given value.""" _CHECK_NEW = True
[docs] class Exit(_StateMembership): """Log only on exiting the given value.""" _CHECK_PREV = True
# --------------------------------------------------------------------------- # Typed tag classes # --------------------------------------------------------------------------- _NUMERIC_TAG_TYPES: frozenset[str] = frozenset({"number", "integer", "float"}) _STATE_TAG_TYPES: frozenset[str] = frozenset({"boolean", "string"}) def _allowed_log_on_for_type(tag_type: str) -> tuple[type, ...]: """Return the descriptor classes accepted by ``log_on=`` for ``tag_type``. Used by the base :class:`Tag` constructor so the legacy ``Tag("number", log_on=...)`` form validates against the same rules the typed subclasses enforce. """ if tag_type in _NUMERIC_TAG_TYPES: return (Cross, Rise, Fall, Delta) if tag_type in _STATE_TAG_TYPES: return (AnyChange, Enter, Exit) raise TypeError( f"log_on is not supported for tag_type {tag_type!r}; " f"supported types are: number, integer, float, boolean, string." ) def _normalise_log_on( log_on: Any, allowed: tuple[type, ...], owner: str ) -> list[_LogTrigger]: """Validate and unpack ``log_on=`` into a list of trigger descriptors.""" triggers = _as_list(log_on) for t in triggers: if not isinstance(t, allowed): allowed_names = ", ".join(c.__name__ for c in allowed) raise TypeError( f"{owner} log_on accepts {allowed_names} descriptors, " f"got {type(t).__name__}." ) return triggers
[docs] class Number(Tag): """A numeric tag declaration with optional crossing-based auto-logging. Parameters ---------- default: Value returned by reads when the manager has nothing stored. name: Optional explicit declaration name (otherwise inherits the attribute name on the owning :class:`Tags` subclass). log_on: One :class:`Cross` / :class:`Rise` / :class:`Fall` / :class:`Delta` descriptor, or a list of them. Each describes a rule that promotes the update to an immediate log when fired. """ _ALLOWED_TRIGGERS: tuple[type, ...] = (Cross, Rise, Fall, Delta) def __init__( self, *, default: Any = NotSet, name: str | None = None, log_on: _LogTrigger | list[_LogTrigger] | None = None, live: bool = False, ): super().__init__("number", default=default, name=name, live=live) self.log_on: list[_LogTrigger] = _normalise_log_on( log_on, self._ALLOWED_TRIGGERS, type(self).__name__ ) def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: return _evaluate_triggers(self.log_on, prev, new, state)
[docs] class Boolean(Tag): """A boolean tag declaration with optional state-transition auto-logging. Parameters ---------- default, name: See :class:`Tag`. log_on: One :class:`AnyChange` / :class:`Enter` / :class:`Exit` descriptor, or a list of them. """ _ALLOWED_TRIGGERS: tuple[type, ...] = (AnyChange, Enter, Exit) def __init__( self, *, default: Any = NotSet, name: str | None = None, log_on: _LogTrigger | list[_LogTrigger] | None = None, live: bool = False, ): super().__init__("boolean", default=default, name=name, live=live) self.log_on: list[_LogTrigger] = _normalise_log_on( log_on, self._ALLOWED_TRIGGERS, type(self).__name__ ) def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: return _evaluate_triggers(self.log_on, prev, new, state)
[docs] class String(Tag): """A string tag declaration with optional state-transition auto-logging. See :class:`Boolean` for the meaning of ``log_on``. """ _ALLOWED_TRIGGERS: tuple[type, ...] = (AnyChange, Enter, Exit) def __init__( self, *, default: Any = NotSet, name: str | None = None, log_on: _LogTrigger | list[_LogTrigger] | None = None, live: bool = False, ): super().__init__("string", default=default, name=name, live=live) self.log_on: list[_LogTrigger] = _normalise_log_on( log_on, self._ALLOWED_TRIGGERS, type(self).__name__ ) def _evaluate_log_trigger(self, prev: Any, new: Any, state: dict[str, Any]) -> bool: return _evaluate_triggers(self.log_on, prev, new, state)
def _evaluate_triggers( triggers: list[_LogTrigger], prev: Any, new: Any, state: dict[str, Any], ) -> bool: """Run every trigger so each updates its private state, OR results.""" if not triggers: return False fired = False for i, trigger in enumerate(triggers): sub_state = state.setdefault(f"_t{i}", {}) if trigger.evaluate(prev, new, sub_state): fired = True return fired class _UnresolvedRemoteTag(Exception): """Internal: an optional :class:`RemoteTag` has no resolved upstream target. Raised by :meth:`RemoteTag._resolve_target` when the matching :class:`pydoover.config.TagRef` was left blank by the operator. Caught by :meth:`Tags._get_tag_value` (returns the declared default) and :meth:`Tags._set_tag_value` (silently no-ops). """
[docs] class RemoteTag(Tag): """A tag declaration that resolves to a tag published by another application. The runtime target is described by a :class:`pydoover.config.TagRef` config element whose ``reference_name`` matches this tag's ``reference_name``. The binding happens at setup time via :meth:`Tags._resolve_remote_tags`. When ``republish_locally`` is true (the default), the resolved upstream value is mirrored into this app's own tag namespace under the ``reference_name`` key — so other consumers on the device (UIs, downstream tags) can read it as if it were a local tag. Cross-agent references (``agent_id`` set on the underlying :class:`~pydoover.config.TagRef`) are accepted in the schema but raise :class:`NotImplementedError` at runtime: the wiring is deferred so the schema does not need to change later. Parameters ---------- tag_type: Asserted by the developer (matches the upstream type). reference_name: Local handle; must match a ``TagRef`` config element's ``reference_name``. republish_locally: Mirror upstream changes into this app's namespace under ``reference_name``. Defaults to ``True``. No-op on managers that do not support subscriptions (e.g. processor contexts). default: Returned when the manager has no value for the upstream tag. name: Optional explicit declaration name (otherwise inherits the attribute name on the owning :class:`Tags` subclass). """ def __init__( self, tag_type: str, *, reference_name: str, republish_locally: bool = True, default: Any = NotSet, name: str | None = None, optional: bool = False, live: bool = False, ): # Optional RemoteTags need a sensible read-fallback when the matching # TagRef is left blank. Default to None so callers don't have to # restate it on every declaration. if optional and default is NotSet: default = None super().__init__(tag_type, default=default, name=name, live=live) self.reference_name = reference_name self.republish_locally = republish_locally self.optional = optional def __repr__(self) -> str: return ( f"RemoteTag(name={self.name!r}, reference_name={self.reference_name!r}, " f"tag_type={self.tag_type!r}, republish_locally={self.republish_locally}, " f"optional={self.optional})" ) def _resolve_target( self, tags: "Tags", declaration: "_DeclaredTag" ) -> tuple[str | None, str]: # Resolution state lives on the Tags instance because templates are # shared across all instances of a given Tags subclass. try: target = tags._remote_tag_targets[declaration.attr_name] except KeyError as exc: if self.optional: # Operator left the matching TagRef blank — caller falls # back to the declared default (read) or no-ops (write). raise _UnresolvedRemoteTag from exc raise RuntimeError( f"RemoteTag '{declaration.attr_name}' has not been resolved " f"against the config. The application framework should call " f"`await tags._resolve_remote_tags()` after `Tags.setup()`." ) from exc if target["agent_id"]: raise NotImplementedError( f"Cross-agent RemoteTag (agent_id={target['agent_id']!r}) " "is not yet supported." ) return target["app_key"], target["tag_name"]
[docs] class BoundTag: """Manager-backed runtime view of a declared tag. A ``BoundTag`` is what you interact with on a :class:`Tags` instance. Reads and writes are delegated to the registered tag manager. """ _NUMERIC_TAG_TYPES = {"number", "integer", "float"} def __init__(self, tags: "Tags", declaration: "_DeclaredTag"): self._tags = tags self._declaration = declaration @property def _is_async(self) -> bool: return self._tags._is_async @property def name(self) -> str: """str: The resolved runtime name of this tag.""" return self._declaration.name @property def tag_type(self) -> str: """str: The declared tag type.""" return self._declaration.template.tag_type @property def default(self) -> Any: """Any: The default value returned when no runtime value exists.""" return self._declaration.template.default @property def live(self) -> bool: """bool: Whether the underlying tag was declared with ``live=True``.""" return bool(getattr(self._declaration.template, "live", False)) @property def value(self) -> Any: """Any: Convenience alias for :meth:`get`.""" return self.get()
[docs] def get(self) -> Any: """Return the current value of this tag from the registered manager.""" return self._tags._get_tag_value(self._declaration.attr_name)
[docs] async def set(self, value: Any, log: bool = False) -> None: """Set this tag's value via the registered manager. Parameters ---------- value: The new value. log: When ``True``, the update is also recorded as a logged data point (a channel message) as soon as possible — typically at the end of the current main-loop iteration in docker apps, or at the end of the current invocation in processors — rather than waiting for the next periodic log flush (up to 15 minutes). """ await self._tags._set_tag_value(self._declaration.attr_name, value, log=log)
[docs] async def delete(self, log: bool = False) -> None: """Delete this tag from the cloud. Removes the tag's entry from the aggregate channel. Prefer this over ``tag.set(None)`` to make the deletion intent explicit. """ await self._tags._delete_tag_value(self._declaration.attr_name, log=log)
[docs] async def clear(self) -> None: """Reset this tag back to its declared default value.""" await self.set(self.default)
[docs] def is_set(self) -> bool: """Return ``True`` when this tag currently has a concrete value.""" return self.get() is not NotSet
[docs] async def increment(self, amount: int | float = 1, log: bool = False) -> Any: """Increment a numeric tag and return the new value.""" self._validate_numeric("increment") current = self.get() if current is NotSet: raise ValueError(f"Cannot increment unset tag '{self.name}'.") new_value = current + amount await self.set(new_value, log=log) return new_value
[docs] async def decrement(self, amount: int | float = 1, log: bool = False) -> Any: """Decrement a numeric tag and return the new value.""" self._validate_numeric("decrement") current = self.get() if current is NotSet: raise ValueError(f"Cannot decrement unset tag '{self.name}'.") new_value = current - amount await self.set(new_value, log=log) return new_value
def _validate_numeric(self, operation: str) -> None: if self.tag_type not in self._NUMERIC_TAG_TYPES: raise TypeError( f"Cannot {operation} non-numeric tag '{self.name}' of type '{self.tag_type}'." ) def _compare(self, other: Any, comparator): return comparator(self.get(), other) def __repr__(self) -> str: return f"BoundTag(name={self.name!r}, value={self.get()!r})" def __copy__(self): return self def __deepcopy__(self, _memo): return self def __str__(self) -> str: return str(self.get()) def __bool__(self) -> bool: return bool(self.get()) def __int__(self) -> int: return int(self.get()) def __float__(self) -> float: return float(self.get()) def __eq__(self, other: Any) -> bool: return self.get() == other def __lt__(self, other: Any) -> bool: return self._compare(other, lambda a, b: a < b) def __le__(self, other: Any) -> bool: return self._compare(other, lambda a, b: a <= b) def __gt__(self, other: Any) -> bool: return self._compare(other, lambda a, b: a > b) def __ge__(self, other: Any) -> bool: return self._compare(other, lambda a, b: a >= b)
class _DeclaredTag: def __init__(self, attr_name: str, template: Tag): self.attr_name = attr_name self.template = template @property def name(self) -> str: return self.template.name or self.attr_name @overload def __get__(self, instance: None, owner: type["Tags"]) -> Tag: ... @overload def __get__(self, instance: "Tags", owner: type["Tags"]) -> BoundTag: ... def __get__(self, instance: "Tags | None", owner: type["Tags"]) -> Tag | BoundTag: if instance is None: return self.template return BoundTag(instance, self) def __set__(self, instance: "Tags", value: Any) -> None: if isinstance(value, BoundTag): return instance._set_tag_value(self.attr_name, value)
[docs] class Tags: """Base class for declarative tag definitions. Subclasses declare available tags as class attributes. Instances expose manager-backed :class:`BoundTag` proxies and may also mutate their available tag set at runtime via :meth:`add_tag` and :meth:`remove_tag`. """ __tag_declarations__: "dict[str, _DeclaredTag]" = dict() def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) declarations: "dict[str, _DeclaredTag]" = dict() for base in reversed(cls.__mro__[1:]): declarations.update(getattr(base, "__tag_declarations__", {})) for attr_name, value in list(cls.__dict__.items()): if not isinstance(value, Tag): continue value._declared_attr_name = attr_name declaration = _DeclaredTag(attr_name, value) declarations[attr_name] = declaration setattr(cls, attr_name, declaration) cls.__tag_declarations__ = declarations def __init__(self, app_key: str, tag_manager: TagsManager, config: Schema): self.config = config self._manager = tag_manager self.app_key = app_key # Keep runtime declaration changes isolated to this instance. self._tag_declarations = dict(self.__class__.__tag_declarations__) # Resolved targets for declared RemoteTags, keyed by attr_name. # Populated by `_resolve_remote_tags()`. self._remote_tag_targets: dict[str, dict[str, Any]] = {} # Per-tag scratch state for log-trigger evaluation (e.g. tracked # threshold sides on Number tags). Owned here so it survives # across writes without leaking between Tags instances that # share class-level Tag templates. self._trigger_states: dict[str, dict[str, Any]] = {}
[docs] async def setup(self): """Mutate this tag collection before it is bound to a manager.""" pass
async def _resolve_remote_tags(self): """Bind every declared :class:`RemoteTag` to its config-side ``TagRef``. Walks ``self.config`` collecting every ``TagRef`` element by ``reference_name``, then matches each declared :class:`RemoteTag` to exactly one entry. Raises :class:`ValueError` on missing or duplicate matches, and on duplicate ``reference_name`` values across the config. For RemoteTags with ``republish_locally=True``, registers a manager-level subscription that mirrors upstream changes into this app's own namespace under ``reference_name`` and seeds the local mirror with the current upstream value (if any). Cross-agent ``TagRef`` entries (where ``agent_id`` is set) are recorded but not subscribed to — see :meth:`RemoteTag._resolve_target` for the runtime behaviour. Only top-level ``TagRef`` elements on the schema are scanned for v1. """ from ..config import TagRef # local import to avoid module-load cycles from ..config import ( NotSet as ConfigNotSet, ) # distinct sentinel from tags.NotSet refs_by_name: dict[str, TagRef] = {} if self.config is not None: for elem in getattr(self.config, "_element_map", {}).values(): if not isinstance(elem, TagRef): continue # An optional TagRef left blank has reference_name unset — # skip it so optional RemoteTags fall through to their # declared default at read time. (Note: `ConfigNotSet` is # the config-module sentinel, distinct from `tags.NotSet`.) if elem.reference_name._value is ConfigNotSet: continue ref_name = elem.reference_name.value if ref_name in refs_by_name: raise ValueError( f"Duplicate TagRef reference_name {ref_name!r} in config." ) refs_by_name[ref_name] = elem # Always re-resolve from scratch — keeps the operation idempotent and # avoids stale state when a Tags instance is reused. self._remote_tag_targets = {} for declaration in self._tag_declarations.values(): template = declaration.template if not isinstance(template, RemoteTag): continue try: tagref = refs_by_name[template.reference_name] except KeyError as exc: if template.optional: # Optional RemoteTag with no filled-in TagRef — silently # skip; reads return the declared default. continue raise ValueError( f"RemoteTag {declaration.attr_name!r} references " f"reference_name={template.reference_name!r}, but no " f"TagRef with that reference_name was found in the config." ) from exc agent_id = tagref.agent_id.value or None target = { "agent_id": agent_id, "app_key": tagref.app_name.value, "tag_name": tagref.tag_name.value, } self._remote_tag_targets[declaration.attr_name] = target if ( template.republish_locally and self._manager is not None and not agent_id ): await self._install_remote_mirror(template, target) async def _install_remote_mirror( self, template: "RemoteTag", target: dict[str, Any] ) -> None: """Register the republish-locally subscription for a resolved RemoteTag. No-op on managers that do not support tag subscriptions (e.g. processor contexts). """ subscribe = getattr(self._manager, "subscribe_to_tag", None) if subscribe is None: return ref_name = template.reference_name local_app_key = self.app_key upstream_app_key = target["app_key"] upstream_tag_name = target["tag_name"] manager = self._manager async def _mirror(_key, value): await manager.set_tag(ref_name, value, app_key=local_app_key) subscribe(upstream_tag_name, _mirror, app_key=upstream_app_key) # Seed the local mirror with the current upstream value so other # consumers see something on first read instead of waiting for the # next upstream change. current = manager.get_tag( upstream_tag_name, default=NotSet, app_key=upstream_app_key ) if current is not NotSet and current is not None: await manager.set_tag(ref_name, current, app_key=local_app_key) @property def definitions(self) -> list[Tag]: """list[Tag]: The declared tag definitions for this instance.""" return [declaration.template for declaration in self._tag_declarations.values()]
[docs] def get_live_tag_keys(self) -> list[tuple[str | None, str]]: """Return ``(app_key, tag_name)`` pairs for every ``live=True`` tag. The application framework calls this after :meth:`setup` and :meth:`_resolve_remote_tags` to register the live-tag set with the tag manager, which republishes those values as one-shot messages each main-loop iteration. Unresolved optional :class:`RemoteTag` declarations are skipped. """ keys: list[tuple[str | None, str]] = [] for declaration in self._tag_declarations.values(): if not getattr(declaration.template, "live", False): continue try: app_key, key_name = declaration.template._resolve_target( self, declaration ) except _UnresolvedRemoteTag: continue keys.append((app_key, key_name)) return keys
@property def values(self) -> dict[str, Any]: """dict[str, Any]: The current manager-backed values for all declared tags.""" return { declaration.name: self._get_tag_value(attr_name) for attr_name, declaration in self._tag_declarations.items() if self._get_tag_value(attr_name) is not NotSet } def _get_declaration(self, name: str) -> _DeclaredTag | None: for attr_name, declaration in self._tag_declarations.items(): if attr_name == name or declaration.name == name: return declaration return None
[docs] def add_tag(self, name: str, tag: Tag) -> Tag: """Add a tag definition to this instance. This is primarily intended for config-dependent tag factories, where the available tag set needs to be customized before user code runs. """ if not isinstance(tag, Tag): raise TypeError("add_tag expects a Tag instance.") if self._get_declaration(name) is not None: raise ValueError(f"Tag '{name}' already exists.") declaration = _DeclaredTag(name, tag) if self._get_declaration(declaration.name) is not None: raise ValueError(f"Tag '{declaration.name}' already exists.") self._tag_declarations[name] = declaration return declaration.template
[docs] def remove_tag(self, name: str) -> None: """Remove a tag definition from this instance.""" declaration = self._get_declaration(name) if declaration is None: raise KeyError(name) del self._tag_declarations[declaration.attr_name]
def _get_tag_value(self, name: str) -> Any: declaration = self._get_declaration(name) if declaration is None: raise AttributeError(f"Unknown tag '{name}'") if self._manager is None: return declaration.template.default try: app_key, key_name = declaration.template._resolve_target(self, declaration) except _UnresolvedRemoteTag: return declaration.template.default value = self._manager.get_tag( key_name, default=declaration.template.default, app_key=app_key, ) return _coerce_tag_value(value, declaration.template.tag_type) async def _set_tag_value(self, name: str, value: Any, log: bool = False) -> None: declaration = self._get_declaration(name) if declaration is None: raise AttributeError(f"Unknown tag '{name}'") if self._manager is None: raise RuntimeError("Tags manager has not been registered.") try: app_key, key_name = declaration.template._resolve_target(self, declaration) except _UnresolvedRemoteTag: # Optional RemoteTag with no upstream — silently no-op so apps # don't need to branch on resolution state at every write site. return # Always evaluate the trigger so per-tag state (e.g. threshold # sides) stays consistent — even when ``log`` was already True # at the call site. Only the boolean result matters when log # was False. trigger_state = self._trigger_states.setdefault(name, {}) prev_value = self._get_tag_value(name) triggered = declaration.template._evaluate_log_trigger( prev_value, value, trigger_state ) if triggered: log = True await self._manager.set_tag(key_name, value, app_key=app_key, log=log) async def _delete_tag_value(self, name: str, log: bool = False) -> None: # Deletion is communicated upstream as ``value=None`` — the cloud # interprets it as "remove this key from the aggregate". await self._set_tag_value(name, None, log=log)
[docs] def get(self, name: str) -> BoundTag | None: """Return the bound runtime proxy for a tag, if it exists.""" return self.find_tag(name)
[docs] def find_tag(self, name: str) -> BoundTag | None: """Return the bound runtime proxy for a tag, if it exists.""" declaration = self._get_declaration(name) if declaration is None: return None return BoundTag(self, declaration)
[docs] def get_tag(self, name: str) -> BoundTag: """Return the bound runtime proxy for a tag. Raises ------ KeyError If the tag does not exist on this collection. """ tag = self.find_tag(name) if tag is None: raise KeyError(name) return tag
[docs] def get_definition(self, name: str) -> Tag | None: """Return the declared :class:`Tag` definition for a tag, if it exists.""" declaration = self._get_declaration(name) if declaration is None: return None return declaration.template
[docs] async def update(self, values: dict[str, Any]) -> None: """Update multiple tag values through their bound runtime proxies.""" for key, value in values.items(): tag = self.find_tag(key) if tag is not None: await tag.set(value)
[docs] def to_dict(self) -> dict[str, Any]: """Return the current manager-backed tag values.""" return self.values
[docs] def to_schema(self) -> dict[str, Any]: """Return the schema-style definitions for the current tag set.""" return { declaration.name: declaration.template.to_schema() for declaration in self._tag_declarations.values() }
def __iter__(self) -> Iterator[BoundTag]: for name in self._tag_declarations: tag = self.find_tag(name) if tag is not None: yield tag def __len__(self): return len(self._tag_declarations) def __getitem__(self, item: str) -> BoundTag: return self.get_tag(item) def __getattr__(self, name: str) -> BoundTag: tag = self.find_tag(name) if tag is None: raise AttributeError(name) return tag def __repr__(self) -> str: return f"{self.__class__.__name__}({list(self._tag_declarations)})"