Source code for pydoover.ui.declarative

from __future__ import annotations

import copy
import json
import re
from pathlib import Path
from typing import Any, Generic, Self, TypeVar, overload, TYPE_CHECKING

from ..config import Schema
from ..tags import BoundTag, NotSet, Tag, Tags

if TYPE_CHECKING:
    from .interaction import Interaction


_TAG_TYPE_MAP = {
    "number": "number",
    "integer": "number",
    "float": "number",
    "string": "string",
    "boolean": "boolean",
    "bool": "boolean",
    "array": "array",
    "list": "array",
    "object": "object",
    "dict": "object",
}
_SKIP_BIND_ATTRS = {"parent", "_manager"}


class _MissingDefault:
    def __copy__(self):
        return self

    def __deepcopy__(self, _memo):
        return self


_MISSING = _MissingDefault()
ElementT = TypeVar("ElementT")


class UITagBinding:
    def __init__(
        self,
        tag_name: str,
        tag_type: str | None = None,
        default_value: Any = _MISSING,
        app_nested: bool = True,
        live: bool = False,
    ):
        self.tag_name = tag_name
        self.tag_type = _TAG_TYPE_MAP.get(tag_type, tag_type) if tag_type else tag_type
        self.default_value = default_value
        self.app_nested = app_nested
        # Mirror of the underlying tag's ``live=True`` flag, captured at
        # binding time so consumers (Series/Variable to_dict) can propagate
        # it onto the emitted UI element without re-resolving the Tag.
        self.live = live

    def __copy__(self):
        return type(self)(
            self.tag_name,
            tag_type=self.tag_type,
            default_value=self.default_value,
            live=self.live,
        )

    def __deepcopy__(self, memo):
        return type(self)(
            copy.deepcopy(self.tag_name, memo),
            tag_type=copy.deepcopy(self.tag_type, memo),
            default_value=copy.deepcopy(self.default_value, memo),
            live=self.live,
        )

    def to_lookup(self) -> str:
        result = "$tag."
        if self.app_nested:
            result += "app()."
        result += str(self.tag_name)
        if self.tag_type is not None:
            result += f":{self.tag_type}"
        if not _is_missing_default(self.default_value):
            if self.tag_type is None:
                result += ":string"
            result += f":{json.dumps(self.default_value, separators=(',', ':'))}"
        return result


class _DeclaredElement(Generic[ElementT]):
    def __init__(self, attr_name: str, template: ElementT):
        self.attr_name = attr_name
        self.template = template

    @overload
    def __get__(self, instance: None, owner: type["UI"]) -> ElementT: ...

    @overload
    def __get__(self, instance: "UI", owner: type["UI"]) -> ElementT: ...

    def __get__(self, instance: "UI | None", owner: type["UI"]) -> ElementT:
        if instance is None:
            return self.template
        return instance._elements[self.attr_name]

    def __set__(self, instance: "UI", value: ElementT) -> None:
        from .element import Element

        if not isinstance(value, Element):
            raise TypeError("ui.UI attributes must be Element instances.")
        instance._elements[self.attr_name] = value


[docs] class UI: __ui_declarations__: dict[str, _DeclaredElement] = dict() def __init_subclass__( cls, display_name: str = "$config.app().APP_DISPLAY_NAME", hidden: bool | str = "$config.app().hidden:boolean:false", position: int | str = "$config.app().dv_app_position:number:100", default_open: bool | str = "$config.app().dv_app_default_open:boolean", # default to None icon: str | None = None, colour: str | None = None, **kwargs, ): super().__init_subclass__(**kwargs) from .element import Element declarations: dict[str, _DeclaredElement] = dict() for base in reversed(cls.__mro__[1:]): declarations.update(getattr(base, "__ui_declarations__", {})) for attr_name, value in list(cls.__dict__.items()): if not isinstance(value, Element): continue declaration = _DeclaredElement(attr_name, value) declarations[attr_name] = declaration setattr(cls, attr_name, declaration) cls.__ui_declarations__ = declarations if isinstance(hidden, str): if not hidden.startswith("$"): raise ValueError( "If `hidden` is a `str` it must start with `$` to represent a variable." ) hidden = f"{hidden}:boolean:false" if isinstance(position, str) and not position.startswith("$"): if not position.startswith("$"): raise ValueError( "If `position` is a `str` it must start with `$` to represent a variable." ) position = f"{position}:number:100" cls._display_name = display_name cls._hidden = hidden cls._position = position cls._default_open = default_open cls._icon = icon cls._colour = colour def __init__(self, config: Schema, tags: Tags, app_key: str): self.config = config self.tags = tags self.app_key = app_key self._elements: dict[str, Any] = dict( (name, copy.deepcopy(declaration.template)) for name, declaration in self.__class__.__ui_declarations__.items() ) @property def is_static(self): return self.setup.__func__ is UI.setup
[docs] async def setup(self): """Mutate this UI instance before it is bound and installed.""" pass
def to_schema(self, resolve_config: bool = True): schema = { "displayString": self._display_name, "hidden": self._hidden, "position": self._position, "icon": self._icon, "colour": self._colour, "defaultOpen": self._default_open, "type": "uiApplication", "name": "$config.app().APP_KEY", "children": {e.name: e.to_dict() for e in self._elements.values()}, } schema = normalize_ui_value(schema) if resolve_config and self.config is not None: schema = _resolve_config_refs(schema, self.config) return schema def export(self, fp: Path, app_name: str): if fp.exists(): data = json.loads(fp.read_text()) else: data = {} schema = self.to_schema(resolve_config=False) try: data[app_name]["ui_schema"] = schema except KeyError: data[app_name] = {"ui_schema": schema} fp.write_text(json.dumps(data, indent=4)) @property def children(self) -> list[Any]: return list(self._elements.values()) def get_interactions(self) -> dict[str, "Interaction"]: result: dict[str, "Interaction"] = {} self._collect_interactions(self._elements.values(), result) return result @staticmethod def _collect_interactions(elements, result: dict[str, "Interaction"]): from .interaction import Interaction for e in elements: if isinstance(e, Interaction): result[e.name] = e children = getattr(e, "_children", None) if children is not None: UI._collect_interactions(children.values(), result) def to_elements(self) -> list[Any]: return self.children def add_element(self, element: Any) -> Any: from .element import Element if not isinstance(element, Element): raise TypeError("add_element expects an Element instance.") self._elements[element.name] = element setattr(self, element.name, element) return element def remove_element(self, name: str) -> None: try: del self._elements[name] except KeyError as exc: raise KeyError(name) from exc self.__dict__.pop(name, None) def bind_tags(self, tags: Tags | None) -> Self: visited: set[int] = set() for element in self._elements.values(): _bind_value(element, tags=tags, visited=visited) return self def __getattr__(self, name: str) -> Any: try: return self._elements[name] except KeyError as exc: raise AttributeError(name) from exc
def tag_ref( tag: BoundTag | Tag | UITagBinding | str, tag_type: str | None = None, default_value: Any = _MISSING, ) -> UITagBinding: if isinstance(tag, UITagBinding): return copy.deepcopy(tag) if isinstance(tag, BoundTag): return _binding_from_bound_tag(tag) if isinstance(tag, Tag): binding = _binding_from_tag(tag) if tag_type is None and _is_missing_default(default_value): return binding return UITagBinding( binding.tag_name, tag_type=tag_type or binding.tag_type, default_value=( binding.default_value if _is_missing_default(default_value) else default_value ), live=binding.live, ) return UITagBinding(tag, tag_type=tag_type, default_value=default_value) bind_tag = tag_ref def _value_is_live(value: Any) -> bool: """Whether *value* is a tag reference whose underlying tag is ``live=True``. Used by Series/Variable to propagate the tag's live flag onto the emitted UI element so the customer-site can offer Live Mode without a second declaration. """ return isinstance(value, (UITagBinding, BoundTag, Tag)) and bool( getattr(value, "live", False) ) def is_tag_reference(value: Any) -> bool: if isinstance(value, (UITagBinding, BoundTag, Tag)): return True if isinstance(value, dict): return any(is_tag_reference(v) for v in value.values()) if isinstance(value, (list, tuple, set)): return any(is_tag_reference(v) for v in value) return False def normalize_ui_value(value: Any, field_name: str | None = None) -> Any: if isinstance(value, UITagBinding): if field_name == "name": raise ValueError("UI field 'name' cannot reference a tag.") return value.to_lookup() if isinstance(value, BoundTag): return normalize_ui_value(_binding_from_bound_tag(value), field_name=field_name) if isinstance(value, Tag): return normalize_ui_value(_binding_from_tag(value), field_name=field_name) if isinstance(value, dict): return { key: normalize_ui_value(item, field_name=key) for key, item in value.items() } if isinstance(value, list): return [normalize_ui_value(item) for item in value] if isinstance(value, tuple): return tuple(normalize_ui_value(item) for item in value) if isinstance(value, set): return {normalize_ui_value(item) for item in value} return value def _bind_value(value: Any, tags: Tags | None, visited: set[int]) -> Any: if isinstance(value, UITagBinding): return value if isinstance(value, BoundTag): return _binding_from_bound_tag(value) if isinstance(value, Tag): return _binding_from_declared_tag(value, tags) if isinstance(value, list): for index, item in enumerate(value): value[index] = _bind_value(item, tags=tags, visited=visited) return value if isinstance(value, dict): for key, item in list(value.items()): value[key] = _bind_value(item, tags=tags, visited=visited) return value if isinstance(value, tuple): return tuple(_bind_value(item, tags=tags, visited=visited) for item in value) if isinstance(value, set): return {_bind_value(item, tags=tags, visited=visited) for item in value} if not hasattr(value, "__dict__"): return value obj_id = id(value) if obj_id in visited: return value visited.add(obj_id) for attr_name, attr_value in vars(value).items(): if attr_name in _SKIP_BIND_ATTRS or callable(attr_value): continue setattr(value, attr_name, _bind_value(attr_value, tags=tags, visited=visited)) return value def _binding_from_bound_tag(tag: BoundTag) -> UITagBinding: return UITagBinding( tag_name=_qualify_tag_name(tag.name, getattr(tag._tags, "_app_key", None)), tag_type=tag.tag_type, default_value=_normalize_default(tag.default), live=bool(getattr(tag, "live", False)), ) def _binding_from_declared_tag(tag: Tag, tags: Tags | None) -> UITagBinding: tag_name = _get_tag_name(tag) if tags is None: raise ValueError( f"UI tag reference '{tag_name}' requires application tags to be configured." ) resolved = tags.get_definition(tag_name) if resolved is None: raise ValueError( f"UI tag reference '{tag_name}' is not available in the resolved application tags." ) return _binding_from_tag(resolved, app_key=getattr(tags, "_app_key", None)) def _binding_from_tag(tag: Tag, app_key: str | None = None) -> UITagBinding: return UITagBinding( tag_name=_qualify_tag_name(_get_tag_name(tag), app_key), tag_type=tag.tag_type, default_value=_normalize_default(tag.default), live=bool(getattr(tag, "live", False)), ) def _get_tag_name(tag: Tag) -> str: tag_name = tag.name or getattr(tag, "_declared_attr_name", None) if not tag_name: raise ValueError("Unable to resolve a name for UI tag reference.") return tag_name def _qualify_tag_name(tag_name: str, app_key: str | None) -> str: if not app_key or tag_name.startswith(f"{app_key}."): return tag_name return f"{app_key}.{tag_name}" def _normalize_default(value: Any) -> Any: if value is NotSet: return _MISSING return value def _is_missing_default(value: Any) -> bool: return value is _MISSING or isinstance(value, _MissingDefault) _CONFIG_REF_RE = re.compile(r"\$config\.app\(\)\.(\w+)(?::(\w+))?(?::(.+))?") def _resolve_config_refs(obj: Any, config: Schema) -> Any: """Recursively resolve $config.app().KEY[:type[:default]] references using the Schema.""" if isinstance(obj, dict): return {k: _resolve_config_refs(v, config) for k, v in obj.items()} if isinstance(obj, list): return [_resolve_config_refs(v, config) for v in obj] if isinstance(obj, str) and "$config.app()." in obj: return _resolve_single_config_ref(obj, config) return obj def _resolve_single_config_ref(value: str, config: Schema) -> Any: """Resolve a single $config.app().KEY[:type[:default]] reference.""" match = _CONFIG_REF_RE.fullmatch(value) if not match: return value key, type_hint, default = match.groups() # Lookup is keyed by the element's `_name` (e.g. "dv_app_position"), which # may differ from the Python attribute on the Schema (e.g. `position`). # Prefer `_element_map` (keyed by `_name`); fall back to attribute access # for elements injected dynamically via `_inject_deployment_config`. element = None try: element = config._element_map.get(key) except AttributeError: pass if element is None: try: element = getattr(config, key) except AttributeError: element = None try: raw = element.value if element is not None else None except (ValueError, AttributeError): raw = None if raw is None: raw = default if raw is None: return None if type_hint == "boolean": if isinstance(raw, bool): return raw return str(raw).lower() in ("true", "1", "yes") if type_hint == "number": try: return float(raw) if "." in str(raw) else int(raw) except (ValueError, TypeError): return raw if type_hint == "string": return str(raw) return raw