Source code for pydoover.utils.utils

import asyncio
import json
import logging
import re

from collections.abc import MutableMapping

from functools import wraps
from typing import Any

log = logging.getLogger(__name__)


def sanitize_display_name(name: str) -> str:
    name = name.replace(" ", "_")
    return re.sub(r"[^0-9a-zA-Z_]", "", name).lower()


[docs] def map_reading(in_val, output_values, raw_readings=[4, 20], ignore_below=3): """Map a reading to a value in a range""" if in_val < ignore_below: return None ## Choose the value set to map between lower_val_ind = 0 found = False for i in range(0, len(raw_readings)): if in_val <= raw_readings[i]: lower_val_ind = i - 1 found = True break if not found: lower_val_ind = len(raw_readings) - 2 # Figure out how 'wide' each range is inSpan = raw_readings[lower_val_ind + 1] - raw_readings[lower_val_ind] outSpan = output_values[lower_val_ind + 1] - output_values[lower_val_ind] # Convert the left range into a 0-1 range (float) valueScaled = float(in_val - raw_readings[lower_val_ind]) / float(inSpan) # Convert the 0-1 range into a value in the right range. return output_values[lower_val_ind] + (valueScaled * outSpan)
[docs] def find_object_with_key(obj: dict[Any, Any], key_to_find: str) -> Any | None: """Iteratively searches through a dictionary (JSON object) and returns the value for the specified key. Parameters ---------- obj : dict The JSON object (dictionary) to search through. key_to_find : str The key to search for. Returns ------- Any The object containing the key, or None if the key is not found. """ stack = [obj] while stack: current = stack.pop() if isinstance(current, dict): if key_to_find in current: return current[key_to_find] for key in current: stack.append(current[key]) return None
[docs] def find_path_to_key(obj: dict[Any, Any], key_to_find: str) -> str | None: """Iteratively searches through a dictionary (JSON object) and returns the path to the specified key. Parameters ---------- obj : dict The JSON object (dictionary) to search through. key_to_find : str The key to search for. Returns ------- str, optional The path to the key, or None if the key is not found. """ stack = [{"current": obj, "path": ""}] while stack: current_entry = stack.pop() current = current_entry["current"] path = current_entry["path"] if isinstance(current, dict): if key_to_find in current: return f"{path}.{key_to_find}" if path else key_to_find for key in current: new_path = f"{path}.{key}" if path else key stack.append({"current": current[key], "path": new_path}) return None
def get_is_async(is_async: bool | None = None) -> bool: if is_async is not None: return is_async try: asyncio.get_running_loop() except RuntimeError: return False else: return True
[docs] def maybe_async(): """Wrapper to allow both a sync and async variation on the same function to provide a unified interface to the user. This is useful when writing library functions for both a sync and async context. It assumes you have a variation of your function with the same signature suffixed with `_async`. Examples -------- A simple example:: class MyClass: @maybe_async() def my_function(self, value: str): print("This is the sync version of my_function") return "sync result" async def my_function_async(self, value: str): print("This is the async version of my_function") return "async result" If the user was running an asynchronous main loop, `my_function` would be invoked as follows:: class MyApp(Application): async def main_loop(self): result = await obj.my_function("test") print(result) # This would print "async result" However, if the main loop was synchronous, it would invoke the sync version of the function:: class MyApp(Application): def main_loop(self): result = self.my_function("test") print(result) # This would print "sync result" """ def wrapper(func): @wraps(func) def inner(*args, **kwargs): # args[0] is self is_async_context = getattr(args[0], "_is_async", False) # allow them to specify if they want to run the sync version of the function (default very much not) force_sync = kwargs.pop("run_sync", False) if is_async_context is True and force_sync is False: # we're in an async context, check if we have an async variety of the function to run... try: alternative = getattr(args[0], f"{func.__name__}_async") return alternative(*args[1:], **kwargs) except AttributeError: # we don't have a corresponding async method, just use the sync one. pass return func(*args, **kwargs) return inner return wrapper
[docs] def wrap_try_except(func, *args, **kwargs): """Wrapper function to catch exceptions and log them. This does not propagate the exception.""" try: return func(*args, **kwargs) except Exception as e: log.exception(f"Error in {func.__name__}: {e}", exc_info=e)
[docs] async def wrap_try_except_async(func, *args, **kwargs): """Wrapper function to catch exceptions in an async function and log them. This does not propagate the exception.""" try: return await func(*args, **kwargs) except Exception as e: log.exception(f"Error in {func.__name__}: {e}", exc_info=e)
[docs] async def call_maybe_async( func, *args, as_task: bool = False, in_executor: bool = True, **kwargs ): """Helper function to call a function that may be either synchronous or asynchronous. Parameters ---------- func : callable The function to call, which may be synchronous or asynchronous. *args Arguments to pass to the function. as_task : bool If True, the function will be called as an asyncio task. Default is False. in_executor : bool If True, the function will be run in an executor if it is not a coroutine function. Default is True. **kwargs Any kwargs to pass to the function. """ # print(f"call_maybe_async: func={func}, as_task={as_task}, in_executor={in_executor is True and not asyncio.iscoroutinefunction(func)}") if asyncio.iscoroutinefunction(func): # coro = wrap_try_except_async(func, *args, **kwargs) coro = func(*args, **kwargs) if as_task: # assign it to a variable for weak ref task = asyncio.create_task(coro) return task else: return await coro elif in_executor: loop = asyncio.get_running_loop() # run_in_executor doesn't support kwargs if kwargs: log.warning("kwargs are not supported when calling via executor") # this is a little bit of a hack, but essentially we're creating an async function that # is called with await func to allow for both running as a task and in an executor. # future = loop.run_in_executor(None, wrap_try_except, func, *args) future = loop.run_in_executor(None, func, *args, **kwargs) if as_task: return future return await future else: # return wrap_try_except(func, catch_exception, *args, **kwargs) return func(*args, **kwargs)
[docs] def on_change(callback, name=None): """A decorator that triggers a callback when the output of the decorated function changes. The callback is called with four arguments: - new_result: The new output of the function. - old_result: The previous output (or None if this is the first call). - is_first: A boolean indicating if this is the first time the function has returned a value. - change_detector_name: The optional name identifying this change detector. :param callback: The callback function to trigger, or a string indicating the name of an instance attribute. :param name: An optional name for this change_detector_name. Examples --------- A simple usage example:: class MyClass: def __init__(self): self.last = None def my_callback(self, new_result, old_result, is_first, change_detector_name): if is_first: print(f"{change_detector_name} has returned a value for the first time: {new_result}") else: print(f"{change_detector_name} has changed from {old_result} to {new_result}") @on_change("my_callback", name="my_function") def my_function(self): import random return random.randint(0, 100) """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): detector_name = name or func.__name__ # Check if we're dealing with an instance method. if args and hasattr(args[0], "__dict__"): instance = args[0] state_attr = f"__on_change_state_{detector_name}" # Retrieve state from the instance, or initialize it if not present. state = getattr( instance, state_attr, {"last_result": None, "has_value": False} ) else: # For non-instance functions, fallback to closure state. # (You could also raise an error if you only expect instance methods.) state = wrapper.__dict__.setdefault( "state", {"last_result": None, "has_value": False} ) result = func(*args, **kwargs) is_first = not state["has_value"] # Trigger the callback if it's the first call or if the result has changed. if is_first or result != state["last_result"]: # Determine which callback to use. if isinstance(callback, str): if not args: raise ValueError( "Expected a 'self' argument when using a string callback." ) cb = getattr(args[0], callback, None) if cb is None or not callable(cb): raise ValueError( f"Attribute '{callback}' is not a callable of the instance." ) else: cb = callback cb( result, state["last_result"] if state["has_value"] else None, is_first, detector_name, ) state["last_result"] = result state["has_value"] = True # Save the updated state back to the instance (or closure, as applicable). if args and hasattr(args[0], "__dict__"): setattr(instance, state_attr, state) else: wrapper.__dict__["state"] = state return result return wrapper return decorator
class CaseInsensitiveDict(MutableMapping): def __init__(self, data=None, **kwargs): self._store = dict() if data is None: data = {} self.update(data, **kwargs) def copy(self): return CaseInsensitiveDict(self._store) def to_dict(self): return { k: v.to_dict() if isinstance(v, CaseInsensitiveDict) else v for k, v in self.items() } @classmethod def from_dict(cls, data): t = { k: CaseInsensitiveDict.from_dict(v) if isinstance(v, dict) else v for k, v in data.items() } return cls(t) def __len__(self): return len(self._store) def __iter__(self): return iter(self._store) def __setitem__(self, key, value): self._store[key.lower()] = value def __getitem__(self, key): return self._store[key.lower()] def __delitem__(self, key): del self._store[key.lower()] def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._store) class CaseInsensitiveDictEncoder(json.JSONEncoder): # might not need this def default(self, o: Any) -> Any: if isinstance(o, CaseInsensitiveDict): return o.to_dict() # Let the base class default method raise the TypeError return super().default(o) class LogFormatter(logging.Formatter): grey = "\x1b[38;20m" yellow = "\x1b[33;20m" red = "\x1b[31;20m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" format = ( "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:%(lineno)d)" ) FORMATS = { logging.DEBUG: grey + format + reset, logging.INFO: grey + format + reset, logging.WARNING: yellow + format + reset, logging.ERROR: red + format + reset, logging.CRITICAL: bold_red + format + reset, } def format(self, record): log_fmt = self.FORMATS.get(record.levelno) formatter = logging.Formatter(log_fmt) return formatter.format(record) def setup_logging( debug: bool, formatter: logging.Formatter = None, filters: logging.Filter | list[logging.Filter] = None, ): if debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) formatter = formatter or LogFormatter() handler = logging.StreamHandler() handler.setFormatter(formatter) if filters: if isinstance(filters, logging.Filter): filters = [filters] for filter in filters: handler.addFilter(filter) logging.getLogger().handlers.clear() logging.getLogger().addHandler(handler)