diff --git a/param/__init__.py b/param/__init__.py index f2c481fa..5d47e96b 100644 --- a/param/__init__.py +++ b/param/__init__.py @@ -49,7 +49,7 @@ ParamOverrides, Undefined, get_logger ) from .parameterized import (batch_watch, output, script_repr, - discard_events, edit_constant) + discard_events, edit_constant, serializer) from .parameterized import shared_parameters from .parameterized import logging_level from .parameterized import DEBUG, VERBOSE, INFO, WARNING, ERROR, CRITICAL @@ -152,8 +152,7 @@ #: Top-level object to allow messaging not tied to a particular #: Parameterized object, as in 'param.main.warning("Invalid option")'. -main=Parameterized(name="main") - +main = Parameterized(name="main") # A global random seed (integer or rational) available for controlling # the behaviour of Parameterized objects with random state. diff --git a/param/_utils.py b/param/_utils.py index a858240c..e92f05b5 100644 --- a/param/_utils.py +++ b/param/_utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -import collections import contextvars import datetime as dt import functools @@ -10,16 +9,17 @@ import re import sys import traceback +import typing as t import warnings from collections import OrderedDict, abc, defaultdict from contextlib import contextmanager -from numbers import Real from textwrap import dedent from threading import get_ident -from typing import TYPE_CHECKING, Callable, TypeVar +from typing import TYPE_CHECKING, Callable, Protocol, TypeVar, runtime_checkable if TYPE_CHECKING: from typing_extensions import Concatenate, ParamSpec + from param.parameterized import Parameter P = ParamSpec("P") R = TypeVar("R") @@ -65,8 +65,8 @@ class Skip(Exception): """Exception that allows skipping an update when resolving a reference.""" -def _deprecated(extra_msg="", warning_cat=ParamDeprecationWarning): - def decorator(func): +def _deprecated(extra_msg: str = "", warning_cat: type[Warning] = ParamDeprecationWarning): + def decorator(func: Callable[..., t.Any]) -> Callable[..., t.Any]: """Mark a function or method as deprecated. This internal decorator issues a warning when the decorated function @@ -96,7 +96,7 @@ def inner(*args, **kwargs): return decorator -def _deprecate_positional_args(func): +def _deprecate_positional_args(func: Callable[..., t.Any]) -> Callable[..., t.Any]: """Issue warnings for methods using deprecated positional arguments. This internal decorator warns when arguments after the `*` separator @@ -171,8 +171,8 @@ def wrapper(self, *args, **kwargs): return decorating_function -def _is_auto_name(class_name, instance_name): - return re.match('^'+class_name+'[0-9]{5}$', instance_name) +def _is_auto_name(class_name: str, instance_name: str) -> bool: + return bool(re.match(f'^{class_name}[0-9]{5}$', instance_name)) def _find_pname(pclass): @@ -182,12 +182,14 @@ def _find_pname(pclass): """ stack = traceback.extract_stack() for frame in stack: - match = re.match(r"^(\S+)\s*=\s*(param|pm)\." + pclass + r"\(", frame.line) + if frame.line is None: + continue + match = re.match(rf"^(\S+)\s*=\s*(param|pm)\.{pclass}\(", frame.line) if match: return match.group(1) -def _validate_error_prefix(parameter, attribute=None): +def _validate_error_prefix(parameter: Parameter, attribute: str | None = None) -> str: """ Generate an error prefix suitable for Parameters when they raise a validation error. @@ -376,10 +378,10 @@ def _hashable(x): part of the object has changed. Does not (currently) recursively replace mutable subobjects. """ - if isinstance(x, collections.abc.MutableSequence): + if isinstance(x, abc.MutableSequence): return tuple(x) - elif isinstance(x, collections.abc.MutableMapping): - return tuple([(k,v) for k,v in x.items()]) + elif isinstance(x, abc.MutableMapping): + return tuple([(k, v) for k, v in x.items()]) else: return x @@ -446,42 +448,62 @@ def named_objs(objlist, namesdict=None): """ return _named_objs(objlist, namesdict=namesdict) +from typing import SupportsFloat + -def _get_min_max_value(min, max, value=None, step=None): +def _get_min_max_value( + min: SupportsFloat | None, + max: SupportsFloat | None, + value: SupportsFloat | None = None, + step: SupportsFloat | None = None, +) -> tuple[float, float, float]: """Return min, max, value given input values with possible None.""" - # Either min and max need to be given, or value needs to be given + fmin = float(min) if min is not None else None + fmax = float(max) if max is not None else None + if value is None: - if min is None or max is None: - raise ValueError( - f'unable to infer range, value from: ({min}, {max}, {value})' - ) - diff = max - min - value = min + (diff / 2) - # Ensure that value has the same type as diff - if not isinstance(value, type(diff)): - value = min + (diff // 2) - else: # value is not None - if not isinstance(value, Real): - raise TypeError('expected a real number, got: %r' % value) - # Infer min/max from value - if value == 0: - # This gives (0, 1) of the correct type - vrange = (value, value + 1) - elif value > 0: - vrange = (-value, 3*value) - else: - vrange = (3*value, -value) - if min is None: - min = vrange[0] - if max is None: - max = vrange[1] + if fmin is None or fmax is None: + raise ValueError(f"unable to infer range, value from: ({min}, {max}, {value})") + fvalue = (fmin + fmax) / 2.0 + else: + fvalue = float(value) + if fmin is None or fmax is None: + if fvalue == 0.0: + low, high = 0.0, 1.0 + elif fvalue > 0.0: + low, high = -fvalue, 3.0 * fvalue + else: + low, high = 3.0 * fvalue, -fvalue + if fmin is None: + fmin = low + if fmax is None: + fmax = high + + # Safety: ensure bounds exist + if fmin is None or fmax is None: + raise RuntimeError("internal error: bounds not resolved") + + # Normalize so fmin <= fmax + if fmin > fmax: + fmin, fmax = fmax, fmin + + # Snap to step if requested if step is not None: - # ensure value is on a step - tick = int((value - min) / step) - value = min + tick * step - if not min <= value <= max: - raise ValueError(f'value must be between min and max (min={min}, value={value}, max={max})') - return min, max, value + fstep = abs(float(step)) + if fstep == 0.0: + raise ValueError("step must be non-zero") + ticks = round((fvalue - fmin) / fstep) # nearest tick; use math.floor for always-down + fvalue = fmin + ticks * fstep + # Clamp after snapping + if fvalue < fmin: + fvalue = fmin + if fvalue > fmax: + fvalue = fmax + + if not (fmin <= fvalue <= fmax): + raise ValueError(f"value must be between min and max (min={fmin}, value={fvalue}, max={fmax})") + + return fmin, fmax, fvalue def _deserialize_from_path(ext_to_routine, path, type_name): @@ -660,12 +682,13 @@ def exceptions_summarized(): except Exception: import sys etype, value, tb = sys.exc_info() - print(f"{etype.__name__}: {value}", file=sys.stderr) + if etype is not None: + print(f"{etype.__name__}: {value}", file=sys.stderr) def _in_ipython(): try: - get_ipython + get_ipython() # type: ignore[name-defined] return True except NameError: return False @@ -685,21 +708,27 @@ def async_executor(func): else: event_loop.run_until_complete(func()) +@runtime_checkable +class _HasTypes(Protocol): + @classmethod + def types(cls) -> abc.Iterable[type]: ... + class _GeneratorIsMeta(type): - def __instancecheck__(cls, inst): + def __instancecheck__(cls: type[_HasTypes], inst): return isinstance(inst, tuple(cls.types())) - def __subclasscheck__(cls, sub): + def __subclasscheck__(cls: type[_HasTypes], sub: type) -> bool: return issubclass(sub, tuple(cls.types())) - def __iter__(cls): + def __iter__(cls: type[_HasTypes]) -> abc.Iterator[type]: yield from cls.types() class _GeneratorIs(metaclass=_GeneratorIsMeta): @classmethod - def __iter__(cls): + def __iter__(cls: type[_HasTypes]) -> abc.Iterator[type]: yield from cls.types() + def gen_types(gen_func): """Decorate a generator function to support type checking. diff --git a/param/depends.py b/param/depends.py index 31e2df75..dceb37d2 100644 --- a/param/depends.py +++ b/param/depends.py @@ -1,10 +1,12 @@ from __future__ import annotations import inspect +import typing as t +from collections.abc import AsyncGenerator, Generator from collections import defaultdict from functools import wraps -from typing import TYPE_CHECKING, TypeVar, Callable, Protocol, TypedDict, overload +from typing import TYPE_CHECKING, Any, Awaitable, TypeVar, Callable, ParamSpec, Protocol, TypedDict, overload from .parameterized import ( Parameter, Parameterized, ParameterizedMetaclass, transform_reference, @@ -12,35 +14,62 @@ from ._utils import accept_arguments, iscoroutinefunction if TYPE_CHECKING: - CallableT = TypeVar("CallableT", bound=Callable) - Dependency = Parameter | str + Y = TypeVar("Y") + S = TypeVar("S") + T = TypeVar("T") - class DependencyInfo(TypedDict): - dependencies: tuple[Dependency, ...] - kw: dict[str, Dependency] - watch: bool - on_init: bool +P = ParamSpec("P") +R = TypeVar("R", covariant=True) + +Dependency = Parameter | str + +class DependencyInfo(TypedDict): + dependencies: tuple[Dependency, ...] + kw: dict[str, Dependency] + watch: bool + on_init: bool + +class DependsFunc(Protocol[P, R]): + _dinfo: DependencyInfo + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: ... - class DependsFunc(Protocol[CallableT]): - _dinfo: DependencyInfo - __call__: CallableT @overload def depends( *dependencies: str, watch: bool = ..., on_init: bool = ... -) -> Callable[[CallableT], DependsFunc[CallableT]]: +) -> Callable[[Callable[P, R]], DependsFunc[P, R]]: ... @overload def depends( *dependencies: Parameter, watch: bool = ..., on_init: bool = ..., **kw: Parameter -) -> Callable[[CallableT], DependsFunc[CallableT]]: +) -> Callable[[Callable[P, R]], DependsFunc[P, R]]: ... +@overload +def depends( + *dependencies: str, watch: bool = ..., on_init: bool = ... +) -> Callable[[Callable[P, Awaitable[R]]], DependsFunc[P, Awaitable[R]]]: ... + +@overload +def depends( + *dependencies: Parameter, watch: bool = ..., on_init: bool = ..., **kw: Parameter +) -> Callable[[Callable[P, Awaitable[R]]], DependsFunc[P, Awaitable[R]]]: ... + +@overload +def depends( + *dependencies: str, watch: bool = ..., on_init: bool = ... +)-> Callable[[Callable[P, Generator[Y, S, T]]], DependsFunc[P, Generator[Y, S, T]]]: ... + +@overload +def depends( + *dependencies: Parameter, watch: bool = ..., on_init: bool = ..., **kw: Parameter +)-> Callable[[Callable[P, AsyncGenerator[Y, S]]], DependsFunc[P, AsyncGenerator[Y, S]]]: ... + @accept_arguments def depends( - func: CallableT, /, *dependencies: Dependency, watch: bool = False, on_init: bool = False, **kw: Parameter -) -> Callable[[CallableT], DependsFunc[CallableT]]: + func: Callable[P, R], /, *dependencies: Dependency, watch: bool = False, on_init: bool = False, **kw: Dependency +) -> DependsFunc[P, R]: """ Annotates a function or Parameterized method to express its dependencies. @@ -67,22 +96,27 @@ def depends( if inspect.isgeneratorfunction(func): @wraps(func) - def _depends(*args, **kw): + def _depends_gen(*args, **kw): for val in func(*args, **kw): yield val + _depends = t.cast(Callable[P, R], _depends_gen) elif inspect.isasyncgenfunction(func): @wraps(func) - async def _depends(*args, **kw): + async def _depends_async_gen(*args, **kw): async for val in func(*args, **kw): yield val + _depends = t.cast(Callable[P, R], _depends_async_gen) elif iscoroutinefunction(func): + F = t.cast(Callable[P, Awaitable[R]], func) @wraps(func) - async def _depends(*args, **kw): - return await func(*args, **kw) + async def _depends_coro(*args, **kw): + return await F(*args, **kw) + _depends = t.cast(Callable[P, R], _depends_coro) else: @wraps(func) - def _depends(*args, **kw): + def _depends_sync(*args, **kw): return func(*args, **kw) + _depends = t.cast(Callable[P, R], _depends_sync) deps = list(dependencies)+list(kw.values()) string_specs = False @@ -115,40 +149,53 @@ def _depends(*args, **kw): 'or function is not supported when referencing ' 'parameters by name.') - if not string_specs and watch: # string_specs case handled elsewhere (later), in Parameterized.__init__ - if inspect.isgeneratorfunction(func): - def cb(*events): - args = (getattr(dep.owner, dep.name) for dep in dependencies) - dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in kw.items()} - for val in func(*args, **dep_kwargs): - yield val - elif inspect.isasyncgenfunction(func): - async def cb(*events): - args = (getattr(dep.owner, dep.name) for dep in dependencies) - dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in kw.items()} - async for val in func(*args, **dep_kwargs): - yield val - elif iscoroutinefunction(func): - async def cb(*events): - args = (getattr(dep.owner, dep.name) for dep in dependencies) - dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in kw.items()} - await func(*args, **dep_kwargs) - else: - def cb(*events): - args = (getattr(dep.owner, dep.name) for dep in dependencies) - dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in kw.items()} - return func(*args, **dep_kwargs) - - grouped = defaultdict(list) - for dep in deps: - grouped[id(dep.owner)].append(dep) - for group in grouped.values(): - group[0].owner.param.watch(cb, [dep.name for dep in group]) - _dinfo = getattr(func, '_dinfo', {}) _dinfo.update({'dependencies': dependencies, 'kw': kw, 'watch': watch, 'on_init': on_init}) - _depends._dinfo = _dinfo # type: ignore[attr-defined] + typed_depends = t.cast(DependsFunc[P, R], _depends) + typed_depends._dinfo = _dinfo # type: ignore[attr-defined] - return _depends + if string_specs or not watch: + # string_specs case handled elsewhere (later), in Parameterized.__init__ + return typed_depends + param_args = [dep for dep in dependencies if isinstance(dep, Parameter)] + param_kwargs = {n: dep for n, dep in kw.items() if isinstance(dep, Parameter)} + param_deps = list(param_args) + list(param_kwargs.values()) + if inspect.isgeneratorfunction(func): + def cb_gen(*events): + args: tuple[Any, ...] = tuple(getattr(dep.owner, dep.name) for dep in param_args if dep.name) + dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in param_kwargs.items() if dep.name} + func_gen = t.cast(Callable[P, Generator[Any, Any, Any]], func) + for val in func_gen(*args, **dep_kwargs): + yield val + cb = cb_gen + elif inspect.isasyncgenfunction(func): + async def cb_async_gen(*events): + args: tuple[Any, ...] = tuple(getattr(dep.owner, dep.name) for dep in param_args if dep.name) + dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in param_kwargs.items() if dep.name} + func_agen = t.cast(Callable[P, AsyncGenerator[Any, Any]], func) + async for val in func_agen(*args, **dep_kwargs): + yield val + cb = cb_async_gen + elif iscoroutinefunction(func): + async def cb_coro(*events): + args: tuple[Any, ...] = tuple(getattr(dep.owner, dep.name) for dep in param_args if dep.name) + dep_kwargs: dict[str, Any] = {n: getattr(dep.owner, dep.name) for n, dep in param_kwargs.items() if dep.name} + func_coro = t.cast(Callable[P, Awaitable[Any]], func) + await func_coro(*args, **dep_kwargs) + cb = cb_coro + else: + def cb_sync(*events): + args: tuple[Any, ...] = tuple(getattr(dep.owner, dep.name) for dep in param_args if dep.name) + dep_kwargs = {n: getattr(dep.owner, dep.name) for n, dep in param_kwargs.items() if dep.name} + return func(*args, **dep_kwargs) + cb = cb_sync + + grouped = defaultdict(list) + for dep in param_deps: + grouped[id(dep.owner)].append(dep) + for group in grouped.values(): + group[0].owner.param.watch(cb, [dep.name for dep in group]) + + return typed_depends diff --git a/param/ipython.py b/param/ipython.py index 7290fe05..df234711 100644 --- a/param/ipython.py +++ b/param/ipython.py @@ -101,8 +101,10 @@ def param_docstrings(self, info, max_col_len=100, only_changed=False): if (WARN_MISFORMATTED_DOCSTRINGS and not unindented.startswith("\n") and len(unindented.splitlines()) > 1): - param.main.warning("Multi-line docstring for %r is incorrectly formatted " - " (should start with newline)", name) + param.main.param.warning( + "Multi-line docstring for %r is incorrectly formatted " + " (should start with newline)", name + ) # Strip any starting newlines while unindented.startswith("\n"): unindented = unindented[1:] diff --git a/param/parameterized.py b/param/parameterized.py index 9d2494bf..bb1777f0 100644 --- a/param/parameterized.py +++ b/param/parameterized.py @@ -7,6 +7,7 @@ either alone (providing basic Parameter support) or with param's __init__.py (providing specialized Parameter types). """ +from __future__ import annotations import copy import datetime as dt @@ -17,19 +18,19 @@ import re import sys import types -import typing +import typing as t import warnings +from contextlib import contextmanager from inspect import getfullargspec from collections import defaultdict, namedtuple, OrderedDict -from collections.abc import Callable, Iterable +from collections.abc import Callable, Iterable, Generator, Mapping from functools import partial, wraps, reduce from itertools import chain from operator import itemgetter, attrgetter from types import FunctionType, MethodType -from typing import Any, Union, Literal # When python 3.9 support is dropped replace Union with | +from typing import Literal, TypeVar, Generic -from contextlib import contextmanager CRITICAL = 50 ERROR = 40 WARNING = 30 @@ -149,7 +150,7 @@ def transform_reference(arg): arg = transform(arg) return arg -def eval_function_with_deps(function): +def eval_function_with_deps(function: Callable[..., t.Any]) -> t.Any: """ Evaluate a function after resolving its dependencies. @@ -166,7 +167,7 @@ def eval_function_with_deps(function): kwargs = {n: getattr(dep.owner, dep.name) for n, dep in kw_deps.items()} return function(*args, **kwargs) -def resolve_value(value, recursive=True): +def resolve_value(value: t.Any, recursive: bool = True) -> t.Any: """Resolve the current value of a dynamic reference.""" if not recursive: pass @@ -186,11 +187,11 @@ def resolve_value(value, recursive=True): value = eval_function_with_deps(value) if is_gen: value = _to_async_gen(value) - elif isinstance(value, Parameter): + elif isinstance(value, Parameter) and value.name is not None: value = getattr(value.owner, value.name) return value -def resolve_ref(reference, recursive=False): +def resolve_ref(reference: t.Any, recursive: bool = False) -> list[Parameter]: """Resolve all parameters a dynamic reference depends on.""" if recursive: if isinstance(reference, (list, tuple, set)): @@ -208,7 +209,9 @@ def resolve_ref(reference, recursive=False): for arg in (args + kwargs): if isinstance(arg, str): owner = get_method_owner(reference) - if arg in owner.param: + if not isinstance(owner, Parameterized): + continue + elif arg in owner.param: arg = owner.param[arg] elif '.' in arg: path = arg.split('.') @@ -224,7 +227,7 @@ def resolve_ref(reference, recursive=False): return [reference] return [] -def _identity_hook(obj, val): +def _identity_hook(obj: t.Any, val: t.Any) -> t.Any: """To be removed when set_hook is removed.""" return val @@ -235,6 +238,8 @@ class _Undefined: simple None values. """ + __slots__ = () + def __bool__(self): # Haven't defined whether Undefined is falsy or truthy, # so to avoid subtle bugs raise an error when it @@ -245,8 +250,8 @@ def __repr__(self): return '' -Undefined = _Undefined() - +Undefined: t.Final[_Undefined] = _Undefined() +UndefinedType: t.TypeAlias = _Undefined @contextmanager def logging_level(level): @@ -494,7 +499,7 @@ def no_instance_params(cls): return cls -def _instantiate_param_obj(paramobj, owner=None): +def _instantiate_param_obj(paramobj: Parameter, owner: Parameterized | None = None) -> Parameter: """Return a Parameter object suitable for instantiation given the class's Parameter object.""" # Shallow-copy Parameter object without the watchers p = copy.copy(paramobj) @@ -509,10 +514,11 @@ def _instantiate_param_obj(paramobj, owner=None): v = getattr(p, s) if _is_mutable_container(v) and s != "default": setattr(p, s, copy.copy(v)) + return p -def _instantiated_parameter(parameterized, param): +def _instantiated_parameter(parameterized: Parameterized, param: Parameter): """ Given a Parameterized object and one of its class Parameter objects, return the appropriate Parameter object for this instance, instantiating @@ -521,6 +527,8 @@ def _instantiated_parameter(parameterized, param): if (getattr(parameterized._param__private, 'initialized', False) and param.per_instance and not getattr(type(parameterized)._param__private, 'disable_instance_params', False)): key = param.name + if key is None: + raise ValueError("Parameter name is not set") if key not in parameterized._param__private.params: parameterized._param__private.params[key] = _instantiate_param_obj(param, parameterized) @@ -652,7 +660,7 @@ def output(func, *output, **kw): def _output(*args,**kw): return func(*args,**kw) - _output._dinfo = _dinfo + _output._dinfo = _dinfo # type: ignore[attr-defined] return _output @@ -670,9 +678,13 @@ def _parse_dependency_spec(spec): assert spec.count(":")<=1 spec = spec.strip() m = re.match("(?P[^:]*):?(?P.*)", spec) + if m is None: + return None, None, None what = m.group('what') path = "."+m.group('path') m = re.match(r"(?P.*)(\.)(?P.*)", path) + if m is None: + return None, None, None obj = m.group('obj') attr = m.group("attr") return obj or None, attr, what or 'value' @@ -758,13 +770,15 @@ def _skip_event(*events, **kwargs): return True -def extract_dependencies(function): +def extract_dependencies(function: t.Callable[..., t.Any]) -> list[Parameter]: """Extract references from a method or function that declares the references.""" subparameters = list(function._dinfo['dependencies'])+list(function._dinfo['kw'].values()) params = [] for p in subparameters: if isinstance(p, str): owner = get_method_owner(function) + if not isinstance(owner, Parameterized): + continue *subps, p = p.split('.') for subp in subps: owner = getattr(owner, subp, None) @@ -784,21 +798,39 @@ def extract_dependencies(function): # Two callers at the module top level to support pickling. -async def _async_caller(*events, what='value', changed=None, callback=None, function=None): +async def _async_caller( + *events: Event, + what: str = 'value', + changed: t.Any = None, + callback: t.Callable[..., None] | None = None, + function: t.Callable[[], t.Awaitable[t.Any]] | None = None +) -> t.Any: if callback: callback(*events) - if not _skip_event or not _skip_event(*events, what=what, changed=changed): + if not _skip_event(*events, what=what, changed=changed) and function is not None: await function() -def _sync_caller(*events, what='value', changed=None, callback=None, function=None): +def _sync_caller( + *events: Event, + what: str = 'value', + changed: t.Any = None, + callback: t.Callable[..., None] | None = None, + function: t.Callable[[], t.Any] | None = None +) -> t.Any: if callback: callback(*events) - if not _skip_event(*events, what=what, changed=changed): + if not _skip_event(*events, what=what, changed=changed) and function is not None: return function() -def _m_caller(self, method_name, what='value', changed=None, callback=None): +def _m_caller( + self, + method_name: str, + what: str = 'value', + changed: t.Any = None, + callback: t.Callable[..., None] | None = None +) -> t.Callable[..., t.Any]: """ Wrap a method call adding support for scheduling a callback before it is executed and skipping events if a subobject has @@ -807,7 +839,7 @@ def _m_caller(self, method_name, what='value', changed=None, callback=None): function = getattr(self, method_name) _caller = _async_caller if iscoroutinefunction(function) else _sync_caller caller = partial(_caller, what=what, changed=changed, callback=callback, function=function) - caller._watcher_name = method_name + caller._watcher_name = method_name # type: ignore[attr-defined] return caller @@ -949,7 +981,7 @@ def __new__(mcs, classname, bases, classdict): # store the class's docstring in __classdoc if '__doc__' in classdict: - classdict['__classdoc']=classdict['__doc__'] + classdict['__classdoc'] = classdict['__doc__'] # when asking for help on Parameter *object*, return the doc slot classdict['__doc__'] = property(attrgetter('doc')) @@ -974,13 +1006,13 @@ def __new__(mcs, classname, bases, classdict): # No special handling for a __dict__ slot; should there be? return type.__new__(mcs, classname, bases, classdict) - def __getattribute__(mcs,name): - if name=='__doc__': + def __getattribute__(mcs, name: str) -> t.Any: + if name == '__doc__': # when asking for help on Parameter *class*, return the # stored class docstring - return type.__getattribute__(mcs,'__classdoc') + return type.__getattribute__(mcs, '__classdoc') else: - return type.__getattribute__(mcs,name) + return type.__getattribute__(mcs, name) _UDPATE_PARAMETER_SIGNATURE = _in_ipython() or (os.getenv("PARAM_PARAMETER_SIGNATURE", "false").lower() in ("1" , "true")) @@ -993,7 +1025,7 @@ class _ParameterBase(metaclass=ParameterMetaclass): """ @classmethod - def _modified_slots_defaults(cls): + def _modified_slots_defaults(cls) -> dict[str, t.Any]: defaults = cls._slot_defaults.copy() defaults['label'] = defaults.pop('_label') return defaults @@ -1056,7 +1088,22 @@ def _sorter(p): cls.__signature__ = new_sig -class Parameter(_ParameterBase): +T = t.TypeVar("T") +P = t.ParamSpec("P") +R = TypeVar("R", covariant=True) + +class ParameterKwargs(t.TypedDict, total=False): + doc: str | None + instantiate: bool + constant: bool + readonly: bool + pickle_default_value: bool + per_instance: bool + allow_refs: bool + nested_refs: bool + + +class Parameter(_ParameterBase, t.Generic[T]): """ An attribute descriptor for declaring parameters. @@ -1215,22 +1262,57 @@ class Foo(Bar): 'constant', 'pickle_default_value', 'watchers', 'owner'] - @typing.overload + allow_refs: bool + nested_refs: bool + precedence: float | None + default: t.Any + doc: str | None + _label: str | None + instantiate: bool + constant: bool + readonly: bool + pickle_default_value: bool + allow_None: bool + per_instance: bool + + @t.overload + def __init__(self) -> None: ... + + @t.overload def __init__( self, - default=None, *, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False - ): + default: t.Any = "", + *, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False + ) -> None: ... - @_deprecate_positional_args - def __init__(self, default=Undefined, *, doc=Undefined, # pylint: disable-msg=R0913 - label=Undefined, precedence=Undefined, - instantiate=Undefined, constant=Undefined, readonly=Undefined, - pickle_default_value=Undefined, allow_None=Undefined, - per_instance=Undefined, allow_refs=Undefined, nested_refs=Undefined): + def __init__( + self, + default: t.Any = Undefined, + *, + doc=Undefined, # pylint: disable-msg=R0913 + label=Undefined, + precedence=Undefined, + instantiate=Undefined, + constant=Undefined, + readonly=Undefined, + pickle_default_value=Undefined, + allow_None=Undefined, + per_instance=Undefined, + allow_refs=Undefined, + nested_refs=Undefined + ): """ Initialize a new Parameter object and store the supplied attributes. @@ -1311,24 +1393,24 @@ def __init__(self, default=Undefined, *, doc=Undefined, # pylint: disable-msg=R0 inheritance of Parameter slots (attributes) from the owning-class' class hierarchy (see ParameterizedMetaclass). """ - self.name = None - self.owner = None - self.allow_refs = allow_refs - self.nested_refs = nested_refs - self.precedence = precedence + self.name: str | None = None + self.owner: Parameterized | None = None + self.allow_refs = allow_refs # type: ignore[assignment] + self.nested_refs = nested_refs # type: ignore[assignment] + self.precedence = precedence # type: ignore[assignment] self.default = default - self.doc = doc + self.doc = doc # type: ignore[assignment] if constant is True or readonly is True: # readonly => constant self.constant = True else: - self.constant = constant - self.readonly = readonly - self._label = label + self.constant = constant # type: ignore[assignment] + self.readonly = readonly # type: ignore[assignment] + self._label = label # type: ignore[assignment] self._set_instantiate(instantiate) - self.pickle_default_value = pickle_default_value + self.pickle_default_value = pickle_default_value # type: ignore[assignment] self._set_allow_None(allow_None) self.watchers = {} - self.per_instance = per_instance + self.per_instance = per_instance # type: ignore[assignment] @classmethod def serialize(cls, value): @@ -1340,7 +1422,9 @@ def deserialize(cls, value): """Given a serializable Python value, return a value that the parameter can be set to.""" return value - def schema(self, safe=False, subset=None, mode='json'): + def schema( + self, safe: bool = False, subset: list[str] | None = None, mode: str = 'json' + ) -> dict[str, t.Any]: if mode not in self._serializers: raise KeyError(f'Mode {mode!r} not in available serialization formats {list(self._serializers.keys())!r}') return self._serializers[mode].param_schema(self.__class__.__name__, self, @@ -1393,42 +1477,42 @@ def rx(self): return reactive_ops(self) @property - def label(self): - if self.name and self._label is None: + def label(self) -> str: + if self._label is None: return label_formatter(self.name) else: return self._label @label.setter - def label(self, val): + def label(self, val: str): self._label = val - def _set_allow_None(self, allow_None): + def _set_allow_None(self, allow_None: bool | UndefinedType): # allow_None is set following these rules (last takes precedence): # 1. to False by default # 2. to the value provided in the constructor, if any # 3. to True if default is None if self.default is None: self.allow_None = True - elif allow_None is not Undefined: + elif isinstance(allow_None, bool): self.allow_None = allow_None else: - self.allow_None = self._slot_defaults['allow_None'] + self.allow_None = t.cast(bool, self._slot_defaults['allow_None']) - def _set_instantiate(self,instantiate): + def _set_instantiate(self, instantiate: bool | UndefinedType): """Constant parameters must be instantiated.""" # instantiate doesn't actually matter for read-only # parameters, since they can't be set even on a class. But # having this code avoids needless instantiation. if self.readonly: self.instantiate = False - elif instantiate is not Undefined: + elif isinstance(instantiate, bool): self.instantiate = instantiate else: # Default value - self.instantiate = self._slot_defaults['instantiate'] + self.instantiate = t.cast(bool, self._slot_defaults['instantiate']) - def __setattr__(self, attribute, value): + def __setattr__(self, attribute: str, value): if attribute == 'name': name = getattr(self, 'name', None) if name is not None and value != name: @@ -1453,7 +1537,9 @@ def __setattr__(self, attribute, value): if has_watcher and old is not NotImplemented: self._trigger_event(attribute, old, value) - def _trigger_event(self, attribute, old, new): + def _trigger_event(self, attribute: str, old: t.Any, new: t.Any): + if self.owner is None: + return event = Event(what=attribute, name=self.name, obj=None, cls=self.owner, old=old, new=new, type=None) for watcher in self.watchers[attribute]: @@ -1461,7 +1547,7 @@ def _trigger_event(self, attribute, old, new): if not self.owner.param._BATCH_WATCH: self.owner.param._batch_call_watchers() - def __getattribute__(self, key): + def __getattribute__(self, key: str) -> t.Any: """ Allow slot values to be Undefined in an "unbound" parameter, i.e. one that is not (yet) owned by a Parameterized object, in which case their @@ -1493,7 +1579,7 @@ def _update_state(self): values, after the slot values have been set in the inheritance procedure. """ - def __get__(self, obj, objtype): # pylint: disable-msg=W0613 + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized] | None = None) -> T: # pylint: disable-msg=W0613 """ Return the value for this Parameter. @@ -1505,6 +1591,9 @@ def __get__(self, obj, objtype): # pylint: disable-msg=W0613 instance's value, if one has been set - otherwise produce the class's value (default). """ + if self.name is None: + raise ValueError("Parameter name is not set") + if obj is None: # e.g. when __get__ called for a Parameterized class result = self.default else: @@ -1517,7 +1606,7 @@ def __get__(self, obj, objtype): # pylint: disable-msg=W0613 return result @instance_descriptor - def __set__(self, obj, val): + def __set__(self, obj: Parameterized | None, val: T): """ Set the value for this Parameter. @@ -1543,12 +1632,15 @@ def __set__(self, obj, val): object stored in a constant or read-only Parameter (e.g. one item in a list). """ + if self.name is None: + return name = self.name + if obj is not None and self.allow_refs and obj._param__private.initialized: syncing = name in obj._param__private.syncing ref, deps, val, is_async = obj.param._resolve_ref(self, val) refs = obj._param__private.refs - if ref is not None: + if ref is not None and self.owner is not None: self.owner.param._update_ref(name, ref) elif name in refs and not syncing: del refs[name] @@ -1590,11 +1682,6 @@ def __set__(self, obj, val): _old = self.default self.default = val else: - # When setting a Parameter before calling super. - if not isinstance(obj._param__private, _InstancePrivate): - obj._param__private = _InstancePrivate( - explicit_no_refs=type(obj)._param__private.explicit_no_refs - ) _old = obj._param__private.values.get(name, self.default) obj._param__private.values[name] = val self._post_setter(obj, val) @@ -1675,32 +1762,33 @@ def _post_setter(self, obj, val): def __delete__(self,obj): raise TypeError("Cannot delete '%s': Parameters deletion not allowed." % self.name) - def _set_names(self, attrib_name): - if None not in (self.owner, self.name) and attrib_name != self.name: - raise AttributeError('The {} parameter {!r} has already been ' - 'assigned a name by the {} class, ' - 'could not assign new name {!r}. Parameters ' - 'may not be shared by multiple classes; ' - 'ensure that you create a new parameter ' - 'instance for each new class.'.format(type(self).__name__, self.name, - self.owner.name, attrib_name)) + def _set_names(self, attrib_name: str): + if self.owner is not None and self.name is not None and attrib_name != self.name: + raise AttributeError( + f'The {type(self).__name__} parameter {self.name!r} has already been ' + f'assigned a name by the {self.owner.name} class, ' + f'could not assign new name {attrib_name!r}. Parameters ' + 'may not be shared by multiple classes; ' + 'ensure that you create a new parameter ' + 'instance for each new class.' + ) self.name = attrib_name - def __getstate__(self): + def __getstate__(self) -> dict[str, t.Any]: """ All Parameters have slots, not a dict, so we have to support pickle and deepcopy ourselves. """ return {slot: getattr(self, slot) for slot in self.__class__._all_slots_} - def __setstate__(self,state): + def __setstate__(self, state: dict[str, t.Any]): # set values of __slots__ (instead of in non-existent __dict__) for k, v in state.items(): setattr(self, k, v) # Define one particular type of Parameter that is used in this file -class String(Parameter): +class String(Parameter[T]): r""" A String Parameter, with a default value and optional regular expression (regex) matching. @@ -1718,23 +1806,75 @@ def __init__(self, default="0.0.0.0", allow_None=False, **kwargs): _slot_defaults = dict(Parameter._slot_defaults, default="", regex=None) - @typing.overload + regex: str | re.Pattern[str] | None + + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: String[str], + default: str = "", + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: String[str | None], + default: str | None = None, + *, + regex: str | None = None, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: String[str | None], + default: str | None = "", + *, + regex: str | None = None, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( - self, - default="", *, regex=None, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False - ): + self: String[str | None], + default: str | None = "", + *, + regex: str | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False + ) -> None: ... @_deprecate_positional_args - def __init__(self, default=Undefined, *, regex=Undefined, **kwargs): + def __init__( + self, + default: t.Any = Undefined, + *, + regex: str | re.Pattern[str] | None | _Undefined = Undefined, + **kwargs: t.Any + ) -> None: super().__init__(default=default, **kwargs) - self.regex = regex + self.regex = regex # type: ignore[assignment] self._validate(self.default) - def _validate_regex(self, val, regex): + def _validate_regex(self, val: t.Any, regex: str | re.Pattern[str] | None): if (val is None and self.allow_None): return if regex is not None and re.match(regex, val) is None: @@ -1743,7 +1883,7 @@ def _validate_regex(self, val, regex): f'match regex {regex!r}.' ) - def _validate_value(self, val, allow_None): + def _validate_value(self, val: t.Any, allow_None: bool): if allow_None and val is None: return if not isinstance(val, str): @@ -1752,7 +1892,7 @@ def _validate_value(self, val, allow_None): f'not value of {type(val)}.' ) - def _validate(self, val): + def _validate(self, val: t.Any): self._validate_value(val, self.allow_None) self._validate_regex(val, self.regex) @@ -1815,7 +1955,7 @@ class Comparator: To use the Comparator simply call the is_equal function. """ - equalities = { + equalities: dict[type | tuple[type, ...] | Callable[[t.Any], bool], Callable[[t.Any, t.Any], bool]] = { numbers.Number: operator.eq, str: operator.eq, bytes: operator.eq, @@ -1830,17 +1970,18 @@ class Comparator: def is_equal(cls, obj1, obj2): equals = cls.equalities.copy() for gen, op in cls.gen_equalities.items(): - for t in gen(): - equals[t] = op + for v in gen(): + equals[v] = op for eq_type, eq in equals.items(): - try: - are_instances = isinstance(obj1, eq_type) and isinstance(obj2, eq_type) - except TypeError: - pass - else: - if are_instances: - return eq(obj1, obj2) + if isinstance(eq_type, type) or (isinstance(eq_type, tuple) and all(isinstance(t, type) for t in eq_type)): + try: + are_instances = isinstance(obj1, eq_type) and isinstance(obj2, eq_type) + except TypeError: + pass + else: + if are_instances: + return eq(obj1, obj2) if isinstance(eq_type, FunctionType) and eq_type(obj1) and eq_type(obj2): return eq(obj1, obj2) if isinstance(obj2, (list, set, tuple)): @@ -1903,13 +2044,14 @@ class or the instance as necessary. https://param.holoviz.org/user_guide/Parameters.html#parameterized-namespace """ - def __init__(self_, cls: type['Parameterized'], self: Union['Parameterized', None]=None): + def __init__(self_, cls: type[Parameterized], self: Parameterized | None = None): """ cls is the Parameterized class which is always set. self is the instance if set. """ self_.cls = cls self_.self = self + self_._depends = {"watch": []} @property def _BATCH_WATCH(self_): @@ -1957,7 +2099,7 @@ def watchers(self_, value): self_.self._param__private.watchers = value @property - def self_or_cls(self_) -> Union['Parameterized', type['Parameterized']]: + def self_or_cls(self_) -> Parameterized | type[Parameterized]: return self_.cls if self_.self is None else self_.self def __setstate__(self, state): @@ -2002,16 +2144,17 @@ def __dir__(self_): list[str]: A combined list of standard attributes and parameter names. """ - return super().__dir__() + list(self_._cls_parameters) + return list(super().__dir__()) + list(self_._cls_parameters) - def __iter__(self_): + def __iter__(self_) -> Generator[str, None, None]: """Iterate over the parameters on this object.""" - yield from self_._cls_parameters + params = iter(self_._cls_parameters) + yield from params def __contains__(self_, param): return param in self_._cls_parameters - def __getattr__(self_, attr): + def __getattr__(self_, attr: str) -> t.Any: """Handle attribute access for parameter objects. This method extends standard attribute access to support parameters @@ -2045,8 +2188,9 @@ def __getattr__(self_, attr): raise AttributeError(f"'{self_.cls.__name__}.param' object has no attribute {attr!r}") @as_uninitialized - def _set_name(self_, name): - self_.self.name = name + def _set_name(self_, name: str): + if self_.self is not None: + self_.self.name = name @as_uninitialized def _generate_name(self_): @@ -2093,9 +2237,14 @@ def _setup_params(self_, **params): ) pobj = objects.get(name) - if pobj is None or not pobj.allow_refs: + if pobj is None: + setattr(self, name, val) + continue + elif pobj.allow_refs: + setattr(self, name, val) + else: # Until Parameter.allow_refs=True by default we have to - # speculatively evaluate a values to check whether they + # speculatively evaluate values to check whether they # contain a reference and warn the user that the # behavior may change in future. if name not in self_.cls._param__private.explicit_no_refs: @@ -2127,7 +2276,9 @@ def _setup_params(self_, **params): setattr(self, name, resolved) return refs, deps - def _setup_refs(self_, refs): + def _setup_refs(self_, refs: Mapping[str, Iterable[t.Any]]): + if self_.self is None: + return groups = defaultdict(list) for pname, subrefs in refs.items(): for p in subrefs: @@ -2144,7 +2295,9 @@ def _setup_refs(self_, refs): owner.param._watch(self_._sync_refs, list(set(pnames)), precedence=-1) )) - def _update_ref(self_, name, ref): + def _update_ref(self_, name: str, ref: t.Any): + if self_.self is None: + return param_private = self_.self._param__private if name in param_private.async_refs: param_private.async_refs.pop(name).cancel() @@ -2158,6 +2311,8 @@ def _update_ref(self_, name, ref): self_.self._param__private.refs = refs def _sync_refs(self_, *events): + if self_.self is None: + return updates = {} for pname, ref in self_.self._param__private.refs.items(): # Skip updating value if dependency has not changed @@ -2175,7 +2330,7 @@ def _sync_refs(self_, *events): if new_val is Skip or new_val is Undefined: continue elif is_async: - async_executor(partial(self_._async_ref, pname, new_val)) + async_executor(partial(self_._async_ref, pname, t.cast(t.Awaitable[t.Any], new_val))) continue updates[pname] = new_val @@ -2184,7 +2339,7 @@ def _sync_refs(self_, *events): with _syncing(self_.self, updates): self_.update(updates) - def _resolve_ref(self_, pobj, value): + def _resolve_ref(self_, pobj: Parameter, value: t.Any): is_gen = inspect.isgeneratorfunction(value) is_async = iscoroutinefunction(value) or is_gen deps = resolve_ref(value, recursive=pobj.nested_refs) @@ -2195,12 +2350,14 @@ def _resolve_ref(self_, pobj, value): value = resolve_value(value, recursive=pobj.nested_refs) except Skip: value = Undefined - if is_async: + if is_async and pobj.name: async_executor(partial(self_._async_ref, pobj.name, value)) value = None return ref, deps, value, is_async - async def _async_ref(self_, pname, awaitable): + async def _async_ref(self_, pname: str, awaitable: t.Awaitable[t.Any]): + if self_.self is None: + return if not self_.self._param__private.initialized: async_executor(partial(self_._async_ref, pname, awaitable)) return @@ -2236,14 +2393,22 @@ def _changed(cls, event): """ return not Comparator.is_equal(event.old, event.new) - def _instantiate_param(self_, param_obj, dict_=None, key=None, deepcopy=True): + def _instantiate_param( + self_, + param_obj: Parameter, + dict_: dict[str, t.Any] | None = None, + key: str | None = None, + deepcopy: bool = True, + ): + self = self_.self + if self is None: + return # deepcopy or store a reference to reference param_obj.default into # self._param__private.values (or dict_ if supplied) under the # parameter's name (or key if supplied) instantiator = copy.deepcopy if deepcopy else lambda o: o - self = self_.self dict_ = dict_ or self._param__private.values - key = key or param_obj.name + key = key or t.cast(str, param_obj.name) if shared_parameters._share: param_key = (str(type(self)), param_obj.name) if param_key in shared_parameters._shared_cache: @@ -2263,10 +2428,12 @@ def _instantiate_param(self_, param_obj, dict_=None, key=None, deepcopy=True): # could instead have kept the same name new_object.param._generate_name() - def _update_deps(self_, attribute=None, init=False): + def _update_deps(self_, attribute: str | None = None, init: bool = False): obj = self_.self + if obj is None: + return init_methods = [] - for method, queued, on_init, constant, dynamic in type(obj).param._depends['watch']: + for method, queued, on_init, constant, dynamic in self_.cls.param._depends['watch']: # On initialization set up constant watchers; otherwise # clean up previous dynamic watchers for the updated attribute dynamic = [d for d in dynamic if attribute is None or d.spec.split(".")[0] == attribute] @@ -2297,7 +2464,9 @@ def _update_deps(self_, attribute=None, init=False): for m in init_methods: m() - def _resolve_dynamic_deps(self, obj, dynamic_dep, param_dep, attribute): + def _resolve_dynamic_deps( + self, obj: Parameterized, dynamic_dep: Parameter, param_dep: PInfo, attribute: str | None = None + ) -> tuple[list[str] | None, Callable[[], None] | None, str]: """ If a subobject whose parameters are being depended on changes we should only trigger events if the actual parameter values @@ -2314,7 +2483,8 @@ def _resolve_dynamic_deps(self, obj, dynamic_dep, param_dep, attribute): subobjs = [obj] for subpath in dynamic_dep.spec.split('.')[:-1]: subobj = getattr(subobj, subpath.split(':')[0], None) - subobjs.append(subobj) + if subobj is not None: + subobjs.append(subobj) dep_obj = param_dep.cls if param_dep.inst is None else param_dep.inst if dep_obj not in subobjs[:-1]: @@ -2323,12 +2493,13 @@ def _resolve_dynamic_deps(self, obj, dynamic_dep, param_dep, attribute): depth = subobjs.index(dep_obj) callback = None if depth > 0: - def callback(*events): + def cb(*events): """ If a subobject changes, we need to notify the main object to update the dependencies. """ obj.param._update_deps(attribute) + callback = cb p = '.'.join(dynamic_dep.spec.split(':')[0].split('.')[depth+1:]) if p == 'param': @@ -2343,7 +2514,14 @@ def callback(*events): return subparams, callback, what - def _watch_group(self_, obj, name, queued, group, attribute=None): + def _watch_group( + self_, + obj: Parameterized, + name: str, + queued: bool, + group: list[tuple[Parameter | None, PInfo]], + attribute: str | None = None, + ): """ Set up a watcher for a group of dependencies. @@ -2482,7 +2660,12 @@ def params(self_, parameter_name=None): # Bothmethods - def update(self_, arg=Undefined, /, **kwargs): + def update( + self_, + arg: dict[str, t.Any] | Iterable[tuple[str, t.Any]] | UndefinedType = Undefined, + /, + **kwargs: t.Any, + ) -> _ParametersRestorer: """ Update multiple parameters of this object or class before triggering events. @@ -2493,6 +2676,8 @@ def update(self_, arg=Undefined, /, **kwargs): Parameters ---------- + arg : dict or iterable or keyword arguments + The parameters to update, provided as a dictionary, iterable, or keyword arguments in `param=value` format. **kwargs : dict or iterable or keyword arguments The parameters to update, provided as a dictionary, iterable, or keyword arguments in `param=value` format. @@ -2539,23 +2724,29 @@ def update(self_, arg=Undefined, /, **kwargs): refs = {} if self_.self is not None: private = self_.self._param__private - params = list(kwargs if arg is Undefined else dict(arg, **kwargs)) - for pname in params: + base: dict[str, t.Any] = {} + if not isinstance(arg, UndefinedType): + base.update(arg) + base.update(kwargs) + for pname in base: if pname in refs: continue elif pname in private.refs: refs[pname] = private.refs[pname] elif pname in private.async_refs: refs[pname] = private.async_refs[pname] - restore = dict(self_._update(arg, **kwargs)) + restore = {**(self_._update(arg, **kwargs))} return _ParametersRestorer(parameters=self_, restore=restore, refs=refs) - def _update(self_, arg=Undefined, /, **kwargs): + def _update(self_, arg: dict[str, t.Any] | Iterable[tuple[str, t.Any]] | UndefinedType = Undefined, /, **kwargs: t.Any): BATCH_WATCH = self_._BATCH_WATCH self_._BATCH_WATCH = True self_or_cls = self_.self_or_cls - if arg is not Undefined: - kwargs = dict(arg, **kwargs) + + base: dict[str, t.Any] = {} + if not isinstance(arg, UndefinedType): + base.update(arg) + base.update(kwargs) trigger_params = [ k for k in kwargs @@ -2566,9 +2757,9 @@ def _update(self_, arg=Undefined, /, **kwargs): self_[tp]._mode = 'set' values = self_.values() - restore = {k: values[k] for k, v in kwargs.items() if k in values} + restore = {k: values[k] for k, v in base.items() if k in values} - for (k, v) in kwargs.items(): + for (k, v) in base.items(): if k not in self_: self_._BATCH_WATCH = False raise ValueError(f"{k!r} is not a parameter of {self_.cls.__name__}") @@ -2591,7 +2782,7 @@ def _update(self_, arg=Undefined, /, **kwargs): # PARAM3_DEPRECATION @_deprecated(extra_msg="Use instead `.param.update`", warning_cat=_ParamFutureWarning) - def set_param(self_, *args,**kwargs): + def set_param(self_, *args, **kwargs): """ For each param=value keyword argument, sets the corresponding parameter of this object or class to the given value. @@ -2638,7 +2829,7 @@ def _cls_parameters(self_): cls._param__private.params = paramdict return paramdict - def objects(self_, instance: Literal[True, False, 'existing']=True) -> dict[str, Parameter]: + def objects(self_, instance: Literal[True, False, 'existing'] = True) -> dict[str, Parameter]: """ Return the Parameters of this instance or class. @@ -2754,7 +2945,7 @@ def trigger(self_, *param_names: str) -> None: self_._events += events self_._state_watchers += watchers - def _update_event_type(self_, watcher, event, triggered): + def _update_event_type(self_, watcher: Watcher, event: Event, triggered: bool): """Return an updated Event object with the type field set appropriately.""" if triggered: event_type = 'triggered' @@ -2763,9 +2954,9 @@ def _update_event_type(self_, watcher, event, triggered): return Event(what=event.what, name=event.name, obj=event.obj, cls=event.cls, old=event.old, new=event.new, type=event_type) - def _execute_watcher(self, watcher, events): + def _execute_watcher(self, watcher: Watcher, events: Iterable[Event]): if watcher.mode == 'args': - args, kwargs = events, {} + args, kwargs = tuple(events), {} else: args, kwargs = (), {event.name: event.new for event in events} @@ -2783,7 +2974,7 @@ def _execute_watcher(self, watcher, events): except Skip: pass - def _call_watcher(self_, watcher, event): + def _call_watcher(self_, watcher: Watcher, event: Event): """Invoke the given watcher appropriately given an Event object.""" if self_._TRIGGER: pass @@ -2822,7 +3013,7 @@ def _batch_call_watchers(self_): # I've (MarcSkovMadsen) not been able to understand this. Its probably because I lack context. # Its not mentioned in the documentation. # The pytests do not make sense to me. - def set_dynamic_time_fn(self_,time_fn,sublistattr=None): + def set_dynamic_time_fn(self_, time_fn, sublistattr=None): """ Set time_fn for all Dynamic Parameters of this class or instance object that are currently being dynamically @@ -2841,7 +3032,7 @@ class or instance that contains an iterable collection of set_dynamic_time_fn() will be called for those, too. """ self_or_cls = self_.self_or_cls - self_or_cls._Dynamic_time_fn = time_fn + self_or_cls._Dynamic_time_fn = time_fn # type: ignore[attr-defined] if isinstance(self_or_cls,type): a = (None,self_or_cls) @@ -2863,7 +3054,7 @@ class or instance that contains an iterable collection of for obj in sublist: obj.param.set_dynamic_time_fn(time_fn,sublistattr) - def serialize_parameters(self_, subset: Union[Iterable[str], None]=None, mode='json'): + def serialize_parameters(self_, subset: Iterable[str] | None = None, mode: str = 'json'): """ Return the serialized parameters of the Parameterized object. @@ -2911,7 +3102,7 @@ def serialize_parameters(self_, subset: Union[Iterable[str], None]=None, mode='j serializer = Parameter._serializers[mode] return serializer.serialize_parameters(self_or_cls, subset=subset) - def serialize_value(self_, pname: str, mode: str='json'): + def serialize_value(self_, pname: str, mode: str = 'json'): """ Serialize the value of a specific parameter. @@ -2962,7 +3153,7 @@ def serialize_value(self_, pname: str, mode: str='json'): serializer = Parameter._serializers[mode] return serializer.serialize_parameter_value(self_or_cls, pname) - def deserialize_parameters(self_, serialization, subset: Union[Iterable[str], None]=None, mode: str='json') -> dict: + def deserialize_parameters(self_, serialization, subset: Iterable[str] | None = None, mode: str = 'json') -> dict: """ Deserialize the given serialized data. This data can be used to create a `Parameterized` object or update the parameters of an existing `Parameterized` object. @@ -3013,7 +3204,7 @@ def deserialize_parameters(self_, serialization, subset: Union[Iterable[str], No serializer = Parameter._serializers[mode] return serializer.deserialize_parameters(self_or_cls, serialization, subset=subset) - def deserialize_value(self_, pname: str, value, mode: str='json'): + def deserialize_value(self_, pname: str, value, mode: str = 'json'): """ Deserialize the value of a specific parameter. @@ -3066,7 +3257,7 @@ def deserialize_value(self_, pname: str, value, mode: str='json'): serializer = Parameter._serializers[mode] return serializer.deserialize_parameter_value(self_or_cls, pname, value) - def schema(self_, safe: bool=False, subset: Union[Iterable[str], None]=None, mode: str='json'): + def schema(self_, safe: bool = False, subset: Iterable[str] | None = None, mode: str = 'json'): """ Generate a schema for the parameters on this Parameterized object. @@ -3146,7 +3337,7 @@ def get_param_values(self_, onlychanged=False): vals = self_.values(onlychanged) return [(k, v) for k, v in vals.items()] - def values(self_, onlychanged: bool = False) -> dict[str, Any]: + def values(self_, onlychanged: bool = False) -> dict[str, t.Any]: """ Retrieve a dictionary of parameter names and their current values. @@ -3195,7 +3386,7 @@ def values(self_, onlychanged: bool = False) -> dict[str, Any]: # Please update the docstring with better description and examples # I've (MarcSkovMadsen) not been able to understand this. Its probably because I lack context. # Its not mentioned in the documentation or pytests - def force_new_dynamic_value(self_, name): # pylint: disable-msg=E0213 + def force_new_dynamic_value(self_, name: str): # pylint: disable-msg=E0213 """ Force a new value to be generated for the dynamic attribute name, and return it. @@ -3220,7 +3411,7 @@ def force_new_dynamic_value(self_, name): # pylint: disable-msg=E0213 else: return param_obj._force(slf, cls) - def get_value_generator(self_,name: str) -> Any: # pylint: disable-msg=E0213 + def get_value_generator(self_, name: str) -> t.Any: # pylint: disable-msg=E0213 """ Retrieve the value or value-generating object of a named parameter. @@ -3290,7 +3481,7 @@ def get_value_generator(self_,name: str) -> Any: # pylint: disable-msg=E0213 return value - def inspect_value(self_,name: str) -> Any: # pylint: disable-msg=E0213 + def inspect_value(self_, name: str) -> t.Any: # pylint: disable-msg=E0213 """ Inspect the current value of a parameter without modifying it. @@ -3471,7 +3662,7 @@ def outputs(self_) -> dict[str,tuple]: outputs[name] = (otype, method, idx) return outputs - def _spec_to_obj(self_, spec, dynamic=True, intermediate=True): + def _spec_to_obj(self_, spec: str, dynamic: bool = True, intermediate: bool = True): """ Resolve a dependency specification into lists of explicit parameter dependencies and dynamic dependencies. @@ -3517,7 +3708,7 @@ def _spec_to_obj(self_, spec, dynamic=True, intermediate=True): 'Parameterized constructor.' ) - src = _getattrr(self_.self_or_cls, obj[1::], None) + src: Parameterized | type[Parameterized] = _getattrr(self_.self_or_cls, obj[1::], None) if src is None: path = obj[1:].split('.') deps = [] @@ -3538,7 +3729,9 @@ def _spec_to_obj(self_, spec, dynamic=True, intermediate=True): return deps, [] if intermediate == 'only' else [DInfo(spec=spec)] cls, inst = (src, None) if isinstance(src, type) else (type(src), src) - if attr == 'param': + if attr is None: + raise AttributeError("Specification must include attribute.") + elif obj is not None and attr == 'param': deps, dynamic_deps = self_._spec_to_obj(obj[1:], dynamic, intermediate) for p in src.param: param_deps, param_dynamic_deps = src.param._spec_to_obj(p, dynamic, intermediate) @@ -3569,7 +3762,7 @@ def _spec_to_obj(self_, spec, dynamic=True, intermediate=True): deps.append(info) return deps, dynamic_deps - def _register_watcher(self_, action, watcher, what='value'): + def _register_watcher(self_, action: Literal['append', 'remove'], watcher: Watcher, what: str = 'value'): if self_.self is not None and not self_.self._param__private.initialized: raise RuntimeError( '(Un)registering a watcher on a partially initialized Parameterized instance ' @@ -3598,8 +3791,8 @@ def _register_watcher(self_, action, watcher, what='value'): def watch( self_, - fn, - parameter_names: Union[str, list[str]], + fn: Callable, + parameter_names: str | list[str] | tuple[str, ...], what: str = 'value', onlychanged: bool = True, queued: bool = False, @@ -3690,11 +3883,30 @@ def watch( "are reserved for internal Watchers.") return self_._watch(fn, parameter_names, what, onlychanged, queued, precedence) - def _watch(self_, fn, parameter_names, what='value', onlychanged=True, queued=False, precedence=-1): - parameter_names = tuple(parameter_names) if isinstance(parameter_names, list) else (parameter_names,) - watcher = Watcher(inst=self_.self, cls=self_.cls, fn=fn, mode='args', - onlychanged=onlychanged, parameter_names=parameter_names, - what=what, queued=queued, precedence=precedence) + def _watch( + self_, + fn: Callable, + parameter_names: str | list[str] | tuple[str, ...], + what: str = 'value', + onlychanged: bool = True, + queued: bool = False, + precedence: int = -1, + ) -> Watcher: + if isinstance(parameter_names, (list, tuple)): + parameter_names = tuple(parameter_names) + else: + parameter_names = (parameter_names,) + watcher = Watcher( + inst=self_.self, + cls=self_.cls, + fn=fn, + mode='args', + onlychanged=onlychanged, + parameter_names=parameter_names, + what=what, + queued=queued, + precedence=precedence, + ) self_._register_watcher('append', watcher, what) return watcher @@ -3753,7 +3965,7 @@ def unwatch(self_, watcher: Watcher) -> None: def watch_values( self_, fn: Callable, - parameter_names: Union[str, list[str]], + parameter_names: str | list[str] | tuple[str, ...], what: Literal["value"] = 'value', onlychanged: bool = True, queued: bool = False, @@ -3844,7 +4056,7 @@ def watch_values( "a positive precedence. Negative precedences " "are reserved for internal Watchers.") assert what == 'value' - if isinstance(parameter_names, list): + if isinstance(parameter_names, (list, tuple)): parameter_names = tuple(parameter_names) else: parameter_names = (parameter_names,) @@ -3870,7 +4082,7 @@ def defaults(self_): .. deprecated:: 1.12.0 Use instead `{k:v.default for k,v in p.param.objects().items()}` """ - self = self_.self + self = self_.self_or_cls d = {} for param_name, param in self.param.objects('existing').items(): if param.constant: @@ -3907,7 +4119,7 @@ def print_param_values(self_): .. deprecated:: 1.12.0 Use instead `for k,v in p.param.objects().items(): print(f"{p.__class__.name}.{k}={repr(v.default)}")` """ - self = self_.self + self = self_.self_or_cls for name, val in self.param.values().items(): print(f'{self.name}.{name} = {val}') @@ -4052,7 +4264,7 @@ def _state_push(self_): # CB: not storing the time_fn: assuming that doesn't # change. elif hasattr(g,'_state_push') and isinstance(g,Parameterized): - g._state_push() + g._state_push() # type: ignore[attr-defined] def _state_pop(self_): """ @@ -4063,19 +4275,19 @@ def _state_pop(self_): self = self_.self_or_cls if not isinstance(self, Parameterized): raise NotImplementedError('_state_pop is not implemented at the class level') - for pname, p in self.param.objects('existing').items(): + for pname, _ in self.param.objects('existing').items(): g = self.param.get_value_generator(pname) - if hasattr(g,'_Dynamic_last'): + if hasattr(g, '_Dynamic_last'): g._Dynamic_last = g._saved_Dynamic_last.pop() g._Dynamic_time = g._saved_Dynamic_time.pop() - elif hasattr(g,'_state_pop') and isinstance(g,Parameterized): - g._state_pop() + elif isinstance(g, Parameterized) and hasattr(g, '_state_pop'): + g._state_pop() # type: ignore[attr-defined] def pprint( self_, - imports: Union[list[str], None]=None, + imports: list[str] | None = None, prefix: str = " ", - unknown_value: str = "", + unknown_value: str | None = "", qualify: bool = False, separator: str = "" )->str: @@ -4150,8 +4362,14 @@ def pprint( @staticmethod @_recursive_repr() - def _pprint(self, imports=None, prefix=" ", unknown_value='', - qualify=False, separator=""): + def _pprint( + self, + imports: list[str] | None = None, + prefix: str = " ", + unknown_value: str | None = '', + qualify: bool = False, + separator: str = "", + ) -> str: if imports is None: imports = [] # would have been simpler to use a set from the start imports[:] = list(set(imports)) @@ -4173,7 +4391,7 @@ def _pprint(self, imports=None, prefix=" ", unknown_value='', posargs = spec.args[:-len(spec.defaults)] kwargs = dict(zip(spec.args[-len(spec.defaults):], spec.defaults)) else: - posargs, kwargs = args, [] + posargs, kwargs = args, {} parameters = self.param.objects('existing') ordering = sorted( @@ -4210,9 +4428,7 @@ def _pprint(self, imports=None, prefix=" ", unknown_value='', if k in posargs: # value will be unknown_value unless k is a parameter arglist.append(value) - elif (k in kwargs or - (hasattr(spec, 'varkw') and (spec.varkw is not None)) or - (hasattr(spec, 'keywords') and (spec.keywords is not None))): + elif k in kwargs or (spec.varkw is not None): # Explicit modified keywords or parameters in # precendence order (if **kwargs present) keywords.append(f'{k}={value}') @@ -4223,7 +4439,9 @@ def _pprint(self, imports=None, prefix=" ", unknown_value='', arguments = arglist + keywords + (['**%s' % spec.varargs] if spec.varargs else []) return qualifier + '{}({})'.format(self.__class__.__name__, (','+separator+prefix).join(arguments)) - +@t.dataclass_transform( + field_specifiers=(Parameter,), +) class ParameterizedMetaclass(type): """ The metaclass of Parameterized (and all its descendents). @@ -4248,7 +4466,7 @@ class ParameterizedMetaclass(type): used to find out if a class is abstract or not. """ - def __init__(mcs, name, bases, dict_): + def __init__(mcs, name: str, bases: tuple[type, ...], dict_: dict[str, t.Any]): """ Initialize the class object (not an instance of the class, but the class itself). @@ -4267,9 +4485,10 @@ def __init__(mcs, name, bases, dict_): explicit_no_refs |= set(base._param__private.explicit_no_refs) _param__private = _ClassPrivate(explicit_no_refs=list(explicit_no_refs)) - mcs._param__private = _param__private + mcs._param__private = PrivateNS(class_ns=_param__private) + param_ns = Parameters(mcs) + mcs.param = NS(param_ns) mcs.__set_name(name, dict_) - mcs._param__parameters = Parameters(mcs) # All objects (with their names) of type Parameter that are # defined in this class @@ -4297,7 +4516,7 @@ def __init__(mcs, name, bases, dict_): # Resolve dependencies in class hierarchy _inherited = [] for cls in classlist(mcs)[:-1][::-1]: - if not hasattr(cls, '_param__parameters'): + if not issubclass(cls, mcs): continue for dep in cls.param._depends['watch']: method = getattr(mcs, dep[0], None) @@ -4306,12 +4525,16 @@ def __init__(mcs, name, bases, dict_): and dinfo.get('watch')): _inherited.append(dep) - mcs.param._depends = {'watch': _inherited+_watch} + param_ns._depends = {'watch': _inherited+_watch} if docstring_signature: mcs.__class_docstring() - def __set_name(mcs, name, dict_): + @property + def __get_params(mcs) -> Parameters: + return mcs.param # type: ignore[attr-defined] + + def __set_name(mcs, name: str, dict_: dict[str, t.Any]): """ Give Parameterized classes a useful 'name' attribute that is by default the class name, unless a class in the hierarchy has defined @@ -4319,6 +4542,7 @@ def __set_name(mcs, name, dict_): that value is used to set the class name. """ name_param = dict_.get("name", None) + private = mcs.__get_private() if name_param is not None: if type(name_param) is not String: raise TypeError( @@ -4328,7 +4552,7 @@ def __set_name(mcs, name, dict_): ) if name_param.default: mcs.name = name_param.default - mcs._param__private.renamed = True + private.renamed = True else: mcs.name = name else: @@ -4349,10 +4573,10 @@ def __class_docstring(mcs): if not docstring_describe_params or not param_pager: return class_docstr = mcs.__doc__ if mcs.__doc__ else '' - description = param_pager(mcs) - mcs.__doc__ = class_docstr + '\n' + description + description = param_pager(mcs) or "" + mcs.__doc__ = f"{class_docstr}\n{description}" - def _initialize_parameter(mcs, param_name, param): + def _initialize_parameter(mcs, param_name: str, param: Parameter): # A Parameter has no way to find out the name a # Parameterized class has for it param._set_names(param_name) @@ -4377,11 +4601,14 @@ def __is_abstract(mcs): # runtime. Mangling follows description in # https://docs.python.org/2/tutorial/classes.html#private-variables-and-class-local-references try: - return getattr(mcs,'_%s__abstract'%mcs.__name__.lstrip("_")) + return getattr(mcs, f'_{mcs.__name__.lstrip("_")}__abstract') except AttributeError: return False - def __get_signature(mcs): + def __get_private(mcs) -> _ClassPrivate: + return mcs._param__private # type: ignore[attr-defined] + + def __get_signature(mcs) -> inspect.Signature | None: """ For classes with a constructor signature that matches the default Parameterized.__init__ signature (i.e. ``__init__(self, **params)``) @@ -4389,8 +4616,9 @@ def __get_signature(mcs): parameters. If the signature differs from the default the custom signature is returned. """ - if mcs._param__private.signature: - return mcs._param__private.signature + private = mcs.__get_private() + if private.signature: + return private.signature # allowed_signature must be the signature of Parameterized.__init__ # Inspecting `mcs.__init__` instead of `mcs` to avoid a recursion error if inspect.signature(mcs.__init__) != DEFAULT_SIGNATURE: @@ -4405,7 +4633,7 @@ def __get_signature(mcs): keyword_groups.append(keyword_group) keywords = [el for grp in reversed(keyword_groups) for el in grp] - mcs._param__private.signature = signature = inspect.Signature([ + private.signature = signature = inspect.Signature([ inspect.Parameter(k, inspect.Parameter.KEYWORD_ONLY) for k in keywords ]) @@ -4415,12 +4643,7 @@ def __get_signature(mcs): abstract = property(__is_abstract) - def _get_param(mcs): - return mcs._param__parameters - - param = property(_get_param) - - def __setattr__(mcs, attribute_name, value): + def __setattr__(mcs, attribute_name: str, value: t.Any): """Set an attribute, supporting special behavior for Parameters. If the attribute being set corresponds to a Parameter descriptor and the @@ -4445,8 +4668,8 @@ def __setattr__(mcs, attribute_name, value): if parameter and not isinstance(value,Parameter): if owning_class != mcs: parameter = copy.copy(parameter) - parameter.owner = mcs - type.__setattr__(mcs,attribute_name,parameter) + parameter.owner = mcs # type: ignore[attr-defined] + type.__setattr__(mcs, attribute_name, parameter) mcs.__dict__[attribute_name].__set__(None,value) else: @@ -4455,7 +4678,7 @@ def __setattr__(mcs, attribute_name, value): if isinstance(value,Parameter): mcs.__param_inheritance(attribute_name,value) - def __param_inheritance(mcs, param_name, param): + def __param_inheritance(mcs, param_name: str, param: Parameter): """ Look for Parameter values in superclasses of this Parameterized class. @@ -4518,6 +4741,7 @@ def __param_inheritance(mcs, param_name, param): callables, slot_values = {}, {} slot_overridden = False + private = mcs.__get_private() for slot in slots.keys(): # Search up the hierarchy until param.slot (which has to # be obtained using getattr(param,slot)) is not Undefined, @@ -4561,8 +4785,10 @@ def __param_inheritance(mcs, param_name, param): slot_values[slot] = default_val elif slot == 'allow_refs': # Track Parameters that explicitly declared no refs - explicit_no_refs = mcs._param__private.explicit_no_refs - if param.allow_refs is False: + explicit_no_refs = private.explicit_no_refs + if param.name is None: + pass + elif param.allow_refs is False: explicit_no_refs.append(param.name) elif param.allow_refs is True and param.name in explicit_no_refs: explicit_no_refs.remove(param.name) @@ -4623,7 +4849,7 @@ def __param_inheritance(mcs, param_name, param): msg += f'\nValidation failed with:\n{e}' raise RuntimeError(msg) from e - def get_param_descriptor(mcs,param_name): + def get_param_descriptor(mcs, param_name: str) -> tuple[Parameter | None, type[Parameterized] | None]: """ Goes up the class hierarchy (starting from the current class) looking for a Parameter class attribute param_name. As soon as @@ -4633,9 +4859,9 @@ def get_param_descriptor(mcs,param_name): classes = classlist(mcs) for c in classes[::-1]: attribute = c.__dict__.get(param_name) - if isinstance(attribute,Parameter): - return attribute,c - return None,None + if isinstance(attribute, Parameter): + return attribute, c + return None, None @@ -4647,9 +4873,16 @@ def get_param_descriptor(mcs,param_name): script_repr_suppress_defaults=True -def script_repr(val, imports=None, prefix="\n ", settings=[], - qualify=True, unknown_value=None, separator="\n", - show_imports=True): +def script_repr( + val, + imports: list[str] | None = None, + prefix: str = "\n ", + settings: list[str] = [], + qualify: bool = True, + unknown_value: str | None = None, + separator: str = "\n", + show_imports: bool = True, +) -> str: """ Variant of pprint() designed for generating a (nearly) runnable script. @@ -4691,8 +4924,15 @@ def script_repr(val, imports=None, prefix="\n ", settings=[], # PARAM2_DEPRECATION: Remove entirely unused settings argument -def pprint(val,imports=None, prefix="\n ", settings=[], - unknown_value='', qualify=False, separator=''): +def pprint( + val, + imports: list[str] | None = None, + prefix: str = "\n ", + settings: list[str] = [], + unknown_value: str | None = None, + qualify: bool = False, + separator: str = '', +) -> str: """ Pretty printed representation of a parameterized object that may be evaluated with eval. @@ -4740,21 +4980,22 @@ def pprint(val,imports=None, prefix="\n ", settings=[], if isinstance(val,type): rep = type_script_repr(val,imports,prefix,settings) - elif type(val) in script_repr_reg: rep = script_repr_reg[type(val)](val,imports,prefix,settings) - elif isinstance(val, _no_script_repr): rep = None - elif isinstance(val, Parameterized) or (type(val) is type and issubclass(val, Parameterized)): - rep=val.param.pprint(imports=imports, prefix=prefix+" ", - qualify=qualify, unknown_value=unknown_value, - separator=separator) + rep=val.param.pprint( + imports=imports, + prefix=prefix+" ", + qualify=qualify, + unknown_value=unknown_value, + separator=separator, + ) else: - rep=repr(val) + rep = repr(val) - return rep + return rep or "" # Registry for special handling for certain types in script_repr and pprint @@ -4959,6 +5200,14 @@ class _ClassPrivate: 'explicit_no_refs', ] + parameters_state: dict[str, t.Any] + disable_instance_params: bool + renamed: bool + params: dict[str, Parameter] + initialized: bool + signature: inspect.Signature | None + explicit_no_refs: list[str] + def __init__( self, parameters_state=None, @@ -5026,16 +5275,28 @@ class _InstancePrivate: 'explicit_no_refs', ] + initialized: bool + parameters_state: dict[str, t.Any] + dynamic_watchers: defaultdict[str, list[Watcher]] + params: dict[str, Parameter] + async_refs: dict[str, t.Any] + refs: dict[str, t.Any] + ref_watchers: list[tuple[tuple[str, ...], Watcher]] + syncing: set[str] + watchers: dict[str, dict[str, list[Watcher]]] + values: dict[str, t.Any] + explicit_no_refs: list[str] + def __init__( self, - initialized=False, - parameters_state=None, - dynamic_watchers=None, - refs=None, - params=None, - watchers=None, - values=None, - explicit_no_refs=None + initialized: bool = False, + parameters_state: dict[str, t.Any] | None = None, + dynamic_watchers: dict[str, list[Watcher]] | None = None, + refs: dict[str, t.Any] | None = None, + params: dict[str, Parameter] | None = None, + watchers: dict[str, dict[str, list[Watcher]]] | None = None, + values: dict[str, t.Any] | None = None, + explicit_no_refs: list[str] | None = None, ): self.initialized = initialized self.explicit_no_refs = [] if explicit_no_refs is None else explicit_no_refs @@ -5050,7 +5311,7 @@ def __init__( self.ref_watchers = [] self.async_refs = {} self.parameters_state = parameters_state - self.dynamic_watchers = defaultdict(list) if dynamic_watchers is None else dynamic_watchers + self.dynamic_watchers = defaultdict(list) if dynamic_watchers is None else defaultdict(list, dynamic_watchers) self.params = {} if params is None else params self.refs = {} if refs is None else refs self.watchers = {} if watchers is None else watchers @@ -5064,6 +5325,49 @@ def __setstate__(self, state): setattr(self, k, v) +class _HasPrivateStorage(t.Protocol): + + _param__private_storage: t.Optional["_InstancePrivate"] + +C = t.TypeVar("C") +PS = t.TypeVar("PS", bound=_HasPrivateStorage) + + +class NS: + + def __init__(self, class_ns: Parameters): + self.class_ns = class_ns + + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized] | None = None) -> Parameters: + if obj is None: + return self.class_ns + ns = getattr(obj, "_param__parameters", None) + if ns is None: + ns = Parameters(self.class_ns.cls, self=obj) + setattr(obj, "_param__parameters", ns) + return ns + + +class PrivateNS: + + def __init__(self, class_ns: _ClassPrivate): + self.class_ns = class_ns + + @t.overload + def __get__(self, obj: None, objtype: type[PS]) -> _ClassPrivate: ... + @t.overload + def __get__(self, obj: PS, objtype: type[PS] | None = ...) -> _InstancePrivate: ... + + def __get__(self, obj: PS | None, objtype: type[PS] | None = None) -> _ClassPrivate | _InstancePrivate: + if obj is None: + return self.class_ns + ns = getattr(obj, "_param__private_storage", None) + if ns is None: + ns = _InstancePrivate(explicit_no_refs=self.class_ns.explicit_no_refs) + setattr(obj, "_param__private_storage", ns) + return ns + + class Parameterized(metaclass=ParameterizedMetaclass): """ A base class for creating Parameterized objects. @@ -5136,6 +5440,15 @@ class Parameterized(metaclass=ParameterizedMetaclass): name = String(default=None, constant=True, doc=""" String identifier for this object.""") + _param__private: t.ClassVar[PrivateNS] + param: t.ClassVar[NS] + _param__parameters: t.ClassVar[Parameters] + + if t.TYPE_CHECKING: + _param__private: _InstancePrivate + _param__parameters: Parameters + param: NS + def __init__(self, **params): """ Initialize a `Parameterized` instance with optional Parameter values. @@ -5169,16 +5482,6 @@ def __init__(self, **params): The `value` parameter is set to 15 for this instance, overriding the default. """ global object_count - - # Setting a Parameter value in an __init__ block before calling - # Parameterized.__init__ (via super() generally) already sets the - # _InstancePrivate namespace over the _ClassPrivate namespace - # (see Parameter.__set__) so we shouldn't override it here. - if not isinstance(self._param__private, _InstancePrivate): - self._param__private = _InstancePrivate( - explicit_no_refs=type(self)._param__private.explicit_no_refs - ) - # Skip generating a custom instance name when a class in the hierarchy # has overriden the default of the `name` Parameter. if self.param.name.default == self.__class__.__name__: @@ -5186,49 +5489,13 @@ def __init__(self, **params): refs, deps = self.param._setup_params(**params) object_count += 1 + self._param__private_storage: _InstancePrivate | None = None self._param__private.initialized = True self.param._setup_refs(deps) self.param._update_deps(init=True) self._param__private.refs = refs - @property - def param(self) -> Parameters: - """ - The `.param` namespace for `Parameterized` classes and instances. - - This namespace provides access to powerful methods and properties for managing - parameters in a `Parameterized` object. It includes utilities for adding parameters, - updating parameters, debugging, serialization, logging, and more. - - User Guide - ---------- - For more details on parameter objects and instances, see: - https://param.holoviz.org/user_guide/Parameters.html#parameter-objects-and-instances - - Examples - -------- - Basic usage of `.param` in a `Parameterized` class: - - >>> import param - >>> - >>> class MyClass(param.Parameterized): - ... value = param.Parameter() - >>> - >>> my_instance = MyClass(value=0) - - Access the `value` parameter of `my_instance`: - - >>> my_instance.param.value # the Parameter instance - - Note that this is different from the current `value` of `my_instance`: - - >>> my_instance.value # the current parameter value - 0 - - """ - return Parameters(self.__class__, self=self) - # 'Special' methods def __getstate__(self): @@ -5327,7 +5594,12 @@ class ParamOverrides(dict): # same name, so all attributes of this object should have names # starting with an underscore (_). - def __init__(self,overridden,dict_,allow_extra_keywords=False): + def __init__( + self, + overridden: Parameterized, + dict_: dict[str, t.Any], + allow_extra_keywords: bool = False, + ) -> None: """ If allow_extra_keywords is False, then all keys in the @@ -5429,7 +5701,21 @@ def _new_parameterized(cls): return Parameterized.__new__(cls) -class ParameterizedFunction(Parameterized): + +class _HasInstance(t.Protocol): + + @bothmethod + def instance(self_or_cls, *args: t.Any, **kwargs: t.Any) -> t.Self: ... + + +class ParameterizedFunctionMetaclass(ParameterizedMetaclass): + + def __call__(cls: type[_HasInstance], *args, **params) -> R: + inst = cls.instance() + inst.param._set_name(cls.__name__) + return inst.__call__(*args, **params) + +class ParameterizedFunction(Parameterized, Generic[P, R], metaclass=ParameterizedFunctionMetaclass): """ Acts like a Python function, but with arguments that are Parameters. @@ -5442,16 +5728,16 @@ class ParameterizedFunction(Parameterized): __abstract = True - def __str__(self): + def __str__(self) -> str: return self.__class__.__name__+"()" @bothmethod - def instance(self_or_cls,**params): + def instance(self_or_cls, **params) -> t.Self: """ Return an instance of this class, copying parameters from any existing instance provided. """ - if isinstance (self_or_cls,ParameterizedMetaclass): + if isinstance(self_or_cls, ParameterizedFunctionMetaclass): cls = self_or_cls else: p = params @@ -5460,19 +5746,15 @@ def instance(self_or_cls,**params): params.pop('name') cls = self_or_cls.__class__ - inst=Parameterized.__new__(cls) - Parameterized.__init__(inst,**params) - if 'name' in params: inst.__name__ = params['name'] - else: inst.__name__ = self_or_cls.name + inst = Parameterized.__new__(cls) # type: ignore[call-arg] + Parameterized.__init__(inst, **params) + if 'name' in params: + inst.__name__ = params['name'] + else: + inst.__name__ = self_or_cls.name return inst - def __new__(class_,*args,**params) -> Any: - # Create and __call__() an instance of this class. - inst = class_.instance() - inst.param._set_name(class_.__name__) - return inst.__call__(*args,**params) - - def __call__(self,*args,**kw): + def __call__(self, *args, **kw) -> R: raise NotImplementedError("Subclasses must implement __call__.") def __reduce__(self): @@ -5485,8 +5767,14 @@ def __reduce__(self): # __main__. Pretty obscure aspect of pickle.py... return (_new_parameterized,(self.__class__,),state) - def _pprint(self, imports=None, prefix="\n ",unknown_value='', - qualify=False, separator=""): + def _pprint( + self, + imports: list[str] | None = None, + prefix: str = "\n ", + unknown_value: str = '', + qualify: bool = False, + separator: str = "", + ) -> str: """Pretty-print the object with adjustments for instance representation. This method is similar to `self.param.pprint`, but replaces @@ -5510,28 +5798,31 @@ def _pprint(self, imports=None, prefix="\n ",unknown_value='', str The formatted string representation of the object. """ - r = self.param.pprint(imports,prefix, - unknown_value=unknown_value, - qualify=qualify,separator=separator) + r = self.param.pprint( + imports, prefix, + unknown_value=unknown_value, + qualify=qualify, + separator=separator + ) classname=self.__class__.__name__ - return r.replace(".%s("%classname,".%s.instance("%classname) + return r.replace(f".{classname}(", f".{classname}.instance(") class default_label_formatter(ParameterizedFunction): """Default formatter to turn parameter names into appropriate widget labels.""" - capitalize = Parameter(default=True, doc=""" + capitalize: Parameter[bool] = Parameter(default=True, doc=""" Whether or not the label should be capitalized.""") - replace_underscores = Parameter(default=True, doc=""" + replace_underscores: Parameter[bool] = Parameter(default=True, doc=""" Whether or not underscores should be replaced with spaces.""") - overrides = Parameter(default={}, doc=""" + overrides: Parameter[dict[str, str]] = Parameter(default={}, doc=""" Allows custom labels to be specified for specific parameter names using a dictionary where key is the parameter name and the value is the desired label.""") - def __call__(self, pname): + def __call__(self, pname: str) -> str: if pname in self.overrides: return self.overrides[pname] if self.replace_underscores: diff --git a/param/parameters.py b/param/parameters.py index d903abe7..f01b26db 100644 --- a/param/parameters.py +++ b/param/parameters.py @@ -15,8 +15,8 @@ parameter types (e.g. Number), and also imports the definition of Parameters and Parameterized classes. """ +from __future__ import annotations -import collections import copy import datetime as dt import glob @@ -26,16 +26,17 @@ import pathlib import re import sys -import typing +import typing as t import warnings from collections import OrderedDict -from collections.abc import Iterable +from collections.abc import Iterable, Mapping, Sequence from contextlib import contextmanager +from typing import Literal from .parameterized import ( - Parameterized, Parameter, ParameterizedFunction, ParamOverrides, String, - Undefined, get_logger, instance_descriptor, _dt_types, + T, Parameterized, Parameter, ParameterizedFunction, ParameterKwargs, ParamOverrides, + String, Undefined, UndefinedType, get_logger, instance_descriptor, _dt_types, _int_types, _identity_hook ) from ._utils import ( @@ -57,6 +58,11 @@ # Utilities #----------------------------------------------------------------------------- +if t.TYPE_CHECKING: + from typing_extensions import Literal + + K = t.TypeVar("K", default=t.Hashable) + V = t.TypeVar("V", default=t.Any) def param_union(*parameterizeds, warn=True): """ @@ -85,7 +91,7 @@ def param_union(*parameterizeds, warn=True): return d -def guess_param_types(**kwargs): +def guess_param_types(**kwargs) -> dict[str, Parameter]: """ Given a set of keyword literals, promote to the appropriate parameter type based on some simple heuristics. @@ -137,17 +143,23 @@ def guess_param_types(**kwargs): return params -def parameterized_class(name, params, bases=Parameterized): +def parameterized_class( + name: str, + params: dict[str, Parameter], + bases: type[Parameterized] | tuple[type[Parameterized], ...] = Parameterized +) -> type[Parameterized]: """ Dynamically create a parameterized class with the given name and the supplied parameters, inheriting from the specified base(s). """ - if not isinstance(bases, (list, tuple)): - bases=[bases] - return type(name, tuple(bases), params) + if isinstance(bases, type): + basecls = (bases,) + else: + basecls = tuple(bases) + return type(name, basecls, params) -def guess_bounds(params, **overrides): +def guess_bounds(params: dict[str, Parameter], **overrides: tuple[t.Any, t.Any]): """ Given a dictionary of Parameter instances, return a corresponding set of copies with the bounds appropriately set. @@ -168,7 +180,10 @@ def guess_bounds(params, **overrides): return guessed -def get_soft_bounds(bounds, softbounds): +def get_soft_bounds( + bounds: tuple[t.Any | None, t.Any | None] | None, + softbounds: tuple[t.Any | None, t.Any | None] | None +) -> tuple[t.Any | None, t.Any | None]: """ For each soft bound (upper and lower), if there is a defined bound (not equal to None) and does not exceed the hard bound, then it is @@ -286,12 +301,11 @@ class Time(Parameterized): forever = Infinity() - label= String(default='Time', doc=""" + label = String(default='Time', doc=""" The label given to the Time object. Can be used to convey more specific notions of time as appropriate. For instance, the label could be 'Simulation Time' or 'Duration'.""") - time_type = Parameter(default=int, constant=True, doc=""" Callable that Time will use to convert user-specified time values into the current time; all times will be of the resulting @@ -347,14 +361,12 @@ class Time(Parameterized): Typical values for the parameter are 'seconds' (the SI unit for time) or subdivisions thereof (e.g. 'milliseconds').""") - def __init__(self, **params): super().__init__(**params) self._time = self.time_type(0) self._exhausted = None self._pushed_state = [] - def __eq__(self, other): if not isinstance(other, Time): return False @@ -364,14 +376,11 @@ def __eq__(self, other): return False return True - def __ne__(self, other): return not (self == other) - def __iter__(self): return self - def __next__(self): timestep = self.time_type(self.timestep) @@ -400,6 +409,8 @@ def __call__(self, val=None, time_type=None): raise Exception("Please specify a value for the new time_type.") if time_type: type_param = self.param.objects('existing').get('time_type') + if type_param is None: + raise ValueError("time_type parameter not found") type_param.constant = False self.time_type = time_type type_param.constant = True @@ -408,28 +419,23 @@ def __call__(self, val=None, time_type=None): return self._time - def advance(self, val): self += val - def __iadd__(self, other): self._time = self._time + self.time_type(other) return self - def __isub__(self, other): self._time = self._time - self.time_type(other) return self - def __enter__(self): """Enter the context and push the current state.""" self._pushed_state.append((self._time, self.timestep, self.until)) self.in_context = True return self - def __exit__(self, exc, *args): """ Exit from the current context, restoring the previous state. @@ -451,7 +457,7 @@ def __exit__(self, exc, *args): #----------------------------------------------------------------------------- -class Dynamic(Parameter): +class Dynamic(Parameter[T]): """ Parameter whose value can be generated dynamically by a callable object. @@ -479,21 +485,37 @@ class Dynamic(Parameter): time_fn = Time() time_dependent = False - @typing.overload + @t.overload def __init__( - self, default=None, *, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False - ): + self: Dynamic[t.Any], + default: t.Any = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False + ) -> None: ... - def __init__(self, default=Undefined, **params): + @_deprecate_positional_args + def __init__( + self, + default: t.Any = Undefined, + *, + allow_None: bool | UndefinedType = Undefined, + **params: t.Unpack[ParameterKwargs] + ) -> None: """ Call the superclass's __init__ and set instantiate=True if the default is dynamic. """ - super().__init__(default=default, **params) + super().__init__(default=default, allow_None=allow_None, **params) if callable(self.default): self._set_instantiate(True) @@ -515,22 +537,21 @@ def _initialize_generator(self,gen,obj=None): gen._saved_Dynamic_time = [] - def __get__(self,obj,objtype): + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized]) -> T: """ Call the superclass's __get__; if the result is not dynamic return that result, otherwise ask that result to produce a value and return it. """ - gen = super().__get__(obj,objtype) + gen = super().__get__(obj, objtype) if not hasattr(gen,'_Dynamic_last'): return gen else: return self._produce_value(gen) - @instance_descriptor - def __set__(self,obj,val): + def __set__(self, obj: Parameterized | type[Parameterized], val: T): """ Call the superclass's set and keep this parameter's instantiate value up to date (dynamic parameters @@ -544,8 +565,7 @@ def __set__(self,obj,val): if dynamic: self._initialize_generator(val,obj) if obj is None: self._set_instantiate(dynamic) - - def _produce_value(self,gen,force=False): + def _produce_value(self, gen, force: bool = False): """ Return a value from gen. @@ -578,16 +598,14 @@ def _produce_value(self,gen,force=False): return value - - def _value_is_dynamic(self,obj,objtype=None): + def _value_is_dynamic(self, obj: Parameterized | type[Parameterized], objtype: type[Parameterized] | None = None) -> bool: """ Return True if the parameter is actually dynamic (i.e. the value is being generated). """ return hasattr(super().__get__(obj,objtype),'_Dynamic_last') - - def _inspect(self,obj,objtype=None): + def _inspect(self, obj: Parameterized | type[Parameterized], objtype: type[Parameterized] | None = None) -> t.Any: """Return the last generated value for this parameter.""" gen=super().__get__(obj,objtype) @@ -596,8 +614,7 @@ def _inspect(self,obj,objtype=None): else: return gen - - def _force(self,obj,objtype=None): + def _force(self, obj: Parameterized | type[Parameterized], objtype: type[Parameterized] | None = None) -> t.Any: """Force a new value to be generated, and return it.""" gen=super().__get__(obj,objtype) @@ -623,8 +640,16 @@ def sig(self): _compute_set_hook = __compute_set_hook() +class NumberKwargs(ParameterKwargs): + bounds: tuple[t.Any | None, t.Any | None] | None + softbounds: tuple[t.Any | None, t.Any | None] | None + inclusive_bounds: tuple[bool, bool] + step: t.Any | None + set_hook: Callable | None + allow_None: bool -class Number(Dynamic): + +class Number(Dynamic[T]): """ A numeric Dynamic Parameter, with a default value and optional bounds. @@ -676,19 +701,74 @@ class Number(Dynamic): inclusive_bounds=(True,True), step=None, set_hook=_compute_set_hook, ) - @typing.overload + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Number[float], + default: float = 0.0, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Number[float | None], + default: float | None = 0.0, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Number[float | None], + default: float | None = None, + *, + allow_None: bool = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload def __init__( - self, - default=0.0, *, bounds=None, softbounds=None, inclusive_bounds=(True,True), step=None, set_hook=None, - allow_None=False, doc=None, label=None, precedence=None, instantiate=False, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False - ): + self: Number[float | None], + default: float | None = None, + *, + bounds: tuple[float | None, float | None] | None = None, + softbounds: tuple[float | None, float | None] | None = None, + inclusive_bounds: tuple[bool, bool] = (True, True), + step: float | int | None = None, + set_hook: Callable | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False + ) -> None: ... @_deprecate_positional_args - def __init__(self, default=Undefined, *, bounds=Undefined, softbounds=Undefined, - inclusive_bounds=Undefined, step=Undefined, set_hook=Undefined, **params): + def __init__( + self, + default: t.Any = Undefined, + *, + bounds: tuple[float | int | None, float | int | None] = Undefined, + softbounds: tuple[float | int | None, float | int | None] = Undefined, + inclusive_bounds: tuple[bool, bool] = Undefined, + step: float | int | None = Undefined, + set_hook: Callable = Undefined, + **params: t.Unpack[ParameterKwargs] + ) -> None: """ Initialize this parameter object and store the bounds. @@ -702,7 +782,7 @@ def __init__(self, default=Undefined, *, bounds=Undefined, softbounds=Undefined, self.step = step self._validate(self.default) - def __get__(self, obj, objtype): + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized]) -> T: """Retrieve the value of the attribute, checking bounds if dynamically generated. Arguments @@ -725,7 +805,7 @@ def __get__(self, obj, objtype): self._validate(result) return result - def set_in_bounds(self,obj,val): + def set_in_bounds(self, obj: Parameterized | type[Parameterized], val: t.Any) -> None: """ Set to the given value, but cropped to be within the legal bounds. All objects are accepted, and no exceptions will be raised. See @@ -737,7 +817,7 @@ def set_in_bounds(self,obj,val): bounded_val = val super().__set__(obj, bounded_val) - def crop_to_bounds(self, val): + def crop_to_bounds(self, val: t.Any) -> t.Any: """ Return the given value cropped to be within the hard bounds for this parameter. @@ -777,7 +857,12 @@ def crop_to_bounds(self, val): return val - def _validate_bounds(self, val, bounds, inclusive_bounds): + def _validate_bounds( + self, + val: t.Any, + bounds: tuple[t.Any | None, t.Any | None] | None, + inclusive_bounds: tuple[bool, bool] + ) -> None: if bounds is None or (val is None and self.allow_None) or callable(val): return vmin, vmax = bounds @@ -810,7 +895,7 @@ def _validate_bounds(self, val, bounds, inclusive_bounds): f"{vmin}, not {val}." ) - def _validate_value(self, val, allow_None): + def _validate_value(self, val: t.Any, allow_None: bool) -> None: if (allow_None and val is None) or (callable(val) and not inspect.isgeneratorfunction(val)): return @@ -820,14 +905,14 @@ def _validate_value(self, val, allow_None): f"not {type(val)}." ) - def _validate_step(self, val, step): + def _validate_step(self, val: t.Any, step: t.Any) -> None: if step is not None and not _is_number(step): raise ValueError( f"{_validate_error_prefix(self, 'step')} can only be " f"None or a numeric value, not {type(step)}." ) - def _validate(self, val): + def _validate(self, val: t.Any) -> None: """ Check that the value is numeric and that it is within the hard bounds; if not, an exception is raised. @@ -839,20 +924,52 @@ def _validate(self, val): def get_soft_bounds(self): return get_soft_bounds(self.bounds, self.softbounds) - def __setstate__(self,state): + def __setstate__(self, state: dict[str, t.Any]) -> None: + # Pickling backward compatibility if 'step' not in state: state['step'] = None super().__setstate__(state) - -class Integer(Number): +class Integer(Number[T]): """Numeric Parameter required to be an Integer.""" _slot_defaults = dict(Number._slot_defaults, default=0) - def _validate_value(self, val, allow_None): + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Integer[int, int], + default: int = 0, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Integer[int | None, int | None], + default: int | None = 0, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Integer[int | None, int | None], + default: int | None = None, + *, + allow_None: bool = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + def _validate_value(self, val: t.Any, allow_None: bool) -> None: if callable(val): return @@ -865,7 +982,7 @@ def _validate_value(self, val, allow_None): f"not {type(val)}." ) - def _validate_step(self, val, step): + def _validate_step(self, val: t.Any, step: t.Any) -> None: if step is not None and not isinstance(step, int): raise ValueError( f"{_validate_error_prefix(self, 'step')} can only be " @@ -873,21 +990,68 @@ def _validate_step(self, val, step): ) -class Magnitude(Number): +class Magnitude(Number[T]): """Numeric Parameter required to be in the range [0.0-1.0].""" - _slot_defaults = dict(Number._slot_defaults, default=1.0, bounds=(0.0,1.0)) - - @typing.overload + _slot_defaults = dict(Number._slot_defaults, default=1.0, bounds=(0.0, 1.0)) + + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Magnitude[float], + default: float = 1.0, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Magnitude[float | None], + default: float | None = 1.0, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Magnitude[float | None], + default: float | None = None, + *, + allow_None: bool = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=1.0, *, bounds=(0.0, 1.0), softbounds=None, inclusive_bounds=(True,True), step=None, set_hook=None, - allow_None=False, doc=None, label=None, precedence=None, instantiate=False, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False - ): + default: float = 1.0, + *, + bounds: tuple[float, float] = (0.0, 1.0), + softbounds: tuple[float | None, float | None] | None = None, + inclusive_bounds: tuple[bool, bool] = (True, True), + step: float | int | None = None, + set_hook: Callable | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False + ) -> None: ... + @_deprecate_positional_args def __init__(self, default=Undefined, *, bounds=Undefined, softbounds=Undefined, inclusive_bounds=Undefined, step=Undefined, set_hook=Undefined, **params): super().__init__( @@ -896,25 +1060,62 @@ def __init__(self, default=Undefined, *, bounds=Undefined, softbounds=Undefined, ) -class Date(Number): +class Date(Number[T]): """Date parameter of datetime or date type.""" _slot_defaults = dict(Number._slot_defaults, default=None) - @typing.overload + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Date[dt.datetime | dt.date], + default: dt.datetime | dt.date | None = None, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Date[dt.datetime | dt.date | None], + default: dt.datetime | dt.date | None = None, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=None, *, bounds=None, softbounds=None, inclusive_bounds=(True,True), step=None, set_hook=None, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False + default=None, + *, + bounds: tuple[t.Any | None, t.Any | None] | None = None, + softbounds: tuple[t.Any | None, t.Any | None] | None = None, + inclusive_bounds: tuple[bool, bool] = (True, True), + step: t.Any | None = None, + set_hook: Callable | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False ): ... - def __init__(self, default=Undefined, **kwargs): + @_deprecate_positional_args + def __init__(self, default=Undefined, **kwargs: t.Unpack[NumberKwargs]): super().__init__(default=default, **kwargs) - def _validate_value(self, val, allow_None): + def _validate_value(self, val: t.Any, allow_None: bool) -> None: """ Check that the value is numeric and that it is within the hard bounds; if not, an exception is raised. @@ -928,20 +1129,20 @@ def _validate_value(self, val, allow_None): f"date types, not {type(val)}." ) - def _validate_step(self, val, step): + def _validate_step(self, val: t.Any, step: t.Any) -> None: if step is not None and not isinstance(step, _dt_types): raise ValueError( f"{_validate_error_prefix(self, 'step')} can only be None, " f"a datetime or date type, not {type(step)}." ) - def _validate_bounds(self, val, bounds, inclusive_bounds): + def _validate_bounds(self, val: t.Any, bounds: tuple[t.Any | None, t.Any | None] | None, inclusive_bounds: tuple[bool, bool]) -> None: val = _to_datetime(val) bounds = None if bounds is None else map(_to_datetime, bounds) return super()._validate_bounds(val, bounds, inclusive_bounds) @classmethod - def serialize(cls, value): + def serialize(cls, value: dt.datetime | dt.date | None) -> str | None: if value is None: return None if not isinstance(value, (dt.datetime, dt.date)): # i.e np.datetime64 @@ -949,28 +1150,65 @@ def serialize(cls, value): return value.strftime("%Y-%m-%dT%H:%M:%S.%f") @classmethod - def deserialize(cls, value): + def deserialize(cls, value: str | None) -> dt.datetime | None: if value == 'null' or value is None: return None return dt.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f") -class CalendarDate(Number): +class CalendarDate(Number[T]): """Parameter specifically allowing dates (not datetimes).""" _slot_defaults = dict(Number._slot_defaults, default=None) - @typing.overload + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: CalendarDate[dt.date, dt.date], + default: dt.date | None = None, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: CalendarDate[dt.date | None, dt.date | None], + default: dt.date | None = None, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[NumberKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=None, *, bounds=None, softbounds=None, inclusive_bounds=(True,True), step=None, set_hook=None, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False + default=None, + *, + bounds: tuple[t.Any | None, t.Any | None] | None = None, + softbounds: tuple[t.Any | None, t.Any | None] | None = None, + inclusive_bounds: tuple[bool, bool] = (True, True), + step: dt.date | None = None, + set_hook: Callable | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False ): ... - def __init__(self, default=Undefined, **kwargs): + @_deprecate_positional_args + def __init__(self, default=Undefined, **kwargs: t.Unpack[NumberKwargs]): super().__init__(default=default, **kwargs) def _validate_value(self, val, allow_None): @@ -986,7 +1224,7 @@ def _validate_value(self, val, allow_None): f"{_validate_error_prefix(self)} only takes date types." ) - def _validate_step(self, val, step): + def _validate_step(self, val: t.Any, step: t.Any) -> None: if step is not None and not isinstance(step, dt.date): raise ValueError( f"{_validate_error_prefix(self, 'step')} can only be None or " @@ -994,13 +1232,13 @@ def _validate_step(self, val, step): ) @classmethod - def serialize(cls, value): + def serialize(cls, value: dt.date | None) -> str | None: if value is None: return None return value.strftime("%Y-%m-%d") @classmethod - def deserialize(cls, value): + def deserialize(cls, value: str | None) -> dt.date | None: if value == 'null' or value is None: return None return dt.datetime.strptime(value, "%Y-%m-%d").date() @@ -1009,18 +1247,58 @@ def deserialize(cls, value): # Boolean #----------------------------------------------------------------------------- -class Boolean(Parameter): +class Boolean(Parameter[T]): """Binary or tristate Boolean Parameter.""" _slot_defaults = dict(Parameter._slot_defaults, default=False) - @typing.overload + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Boolean[bool], + default: bool = False, + *, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Boolean[bool | None], + default: bool | None = False, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Boolean[bool | None], + default: bool | None = None, + *, + allow_None: bool = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=False, *, - allow_None=False, doc=None, label=None, precedence=None, instantiate=False, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + default=False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False ): ... @@ -1029,7 +1307,7 @@ def __init__(self, default=Undefined, **params): super().__init__(default=default, **params) self._validate(self.default) - def _validate_value(self, val, allow_None): + def _validate_value(self, val: t.Any, allow_None: bool) -> None: if allow_None: if not isinstance(val, bool) and val is not None: raise ValueError( @@ -1042,7 +1320,7 @@ def _validate_value(self, val, allow_None): f"not {val!r}." ) - def _validate(self, val): + def _validate(self, val: t.Any) -> None: self._validate_value(val, self.allow_None) @@ -1067,18 +1345,26 @@ class Event(Boolean): # value change is then what triggers the watcher callbacks. __slots__ = ['_autotrigger_value', '_mode', '_autotrigger_reset_value'] - @typing.overload + @t.overload def __init__( self, - default=False, *, - allow_None=False, doc=None, label=None, precedence=None, instantiate=False, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + default=False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False ): ... @_deprecate_positional_args - def __init__(self,default=False,**params): + def __init__(self, default=False, **params): self._autotrigger_value = True self._autotrigger_reset_value = False self._mode = 'set-reset' @@ -1133,38 +1419,82 @@ def sig(self): _compute_length_of_default = __compute_length_of_default() -class Tuple(Parameter): +class Tuple(Parameter[T]): """A tuple Parameter (e.g. ('a',7.6,[3,5])) with a fixed tuple length.""" __slots__ = ['length'] _slot_defaults = dict(Parameter._slot_defaults, default=(0,0), length=_compute_length_of_default) - @typing.overload + if t.TYPE_CHECKING: + + @t.overload + def __init__( + self: Tuple[tuple[t.Any, ...]], + default: tuple[t.Any, ...] = (0, 0), + *, + length: int | None = None, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Tuple[tuple[t.Any, ...] | None], + default: tuple[t.Any, ...] | None = (0, 0), + *, + length: int | None = None, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Tuple[tuple[t.Any, ...] | None], + default: tuple[t.Any, ...] | None = None, + *, + length: int | None = None, + allow_None: bool = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( - self, - default=(0,0), *, length=None, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False + self: Tuple[tuple[t.Any, ...]], + default: tuple[t.Any, ...] = (0, 0), + *, + length: int | None = None, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + allow_None: bool = False, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False ): ... @_deprecate_positional_args - def __init__(self, default=Undefined, *, length=Undefined, **params): + def __init__(self, default: t.Any = Undefined, *, length = Undefined, allow_None = Undefined, **params: t.Unpack[ParameterKwargs]): """ Initialize a tuple parameter with a fixed length (number of elements). The length is determined by the initial default value, if any, and must be supplied explicitly otherwise. The length is not allowed to change after instantiation. """ - super().__init__(default=default, **params) + super().__init__(default=default, allow_None=allow_None, **params) if length is Undefined and self.default is None: raise ValueError( f"{_validate_error_prefix(self, 'length')} must be " "specified if no default is supplied." ) - elif default is not Undefined and default: + elif not isinstance(default, UndefinedType) and default: self.length = len(default) else: self.length = length @@ -1207,9 +1537,42 @@ def deserialize(cls, value): return tuple(value) # As JSON has no tuple representation -class NumericTuple(Tuple): +class NumericTuple(Tuple[T]): """A numeric tuple Parameter (e.g. (4.5,7.6,3)) with a fixed tuple length.""" + if t.TYPE_CHECKING: + @t.overload + def __init__( + self: NumericTuple[tuple[float, ...]], + default: tuple[float, ...] = (0.0, 0.0), + *, + length: int | None = None, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: NumericTuple[tuple[float, ...] | None], + default: tuple[float, ...] | None = (0.0, 0.0), + *, + length: int | None = None, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: NumericTuple[tuple[float, ...] | None], + default: tuple[float, ...] | None = None, + *, + length: int | None = None, + allow_None: bool = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + def _validate_value(self, val, allow_None): super()._validate_value(val, allow_None) if allow_None and val is None: @@ -1228,7 +1591,7 @@ class XYCoordinates(NumericTuple): _slot_defaults = dict(NumericTuple._slot_defaults, default=(0.0, 0.0)) - @typing.overload + @t.overload def __init__( self, default=(0.0, 0.0), *, length=None, @@ -1252,24 +1615,33 @@ class Range(NumericTuple): inclusive_bounds=(True,True), softbounds=None, step=None ) - @typing.overload + bounds: tuple[t.Any, t.Any] | None + inclusive_bounds: tuple[bool, bool] + softbounds: tuple[t.Any, t.Any] | None + step: t.Any | None + + @t.overload def __init__( self, - default=None, *, bounds=None, softbounds=None, inclusive_bounds=(True,True), step=None, length=None, - doc=None, label=None, precedence=None, instantiate=False, constant=False, - readonly=False, pickle_default_value=True, allow_None=False, per_instance=True, - allow_refs=False, nested_refs=False + default=None, + *, + bounds: tuple[t.Any, t.Any] | None = None, + softbounds: tuple[t.Any, t.Any] | None = None, + inclusive_bounds: tuple[bool, bool] = (True, True), + step: t.Any | None = None, + length: int | None = None, + **kwargs: t.Unpack[ParameterKwargs] ): ... @_deprecate_positional_args def __init__(self, default=Undefined, *, bounds=Undefined, softbounds=Undefined, inclusive_bounds=Undefined, step=Undefined, **params): - self.bounds = bounds - self.inclusive_bounds = inclusive_bounds - self.softbounds = softbounds - self.step = step - super().__init__(default=default,length=2,**params) + self.bounds = bounds # type: ignore[attr-defined] + self.inclusive_bounds = inclusive_bounds # type: ignore[attr-defined] + self.softbounds = softbounds # type: ignore[attr-defined] + self.step = step # type: ignore[attr-defined] + super().__init__(default=default, length=2, **params) def _validate(self, val): super()._validate(val) @@ -1483,7 +1855,7 @@ class Callable(Parameter): 2.4, so instantiate must be False for those values. """ - @typing.overload + @t.overload def __init__( self, default=None, *, @@ -1538,7 +1910,10 @@ class Composite(Parameter): __slots__ = ['attribs', 'objtype'] - @typing.overload + attribs: list[str] + objtype: type[Parameterized] + + @t.overload def __init__( self, *, attribs=None, @@ -1553,9 +1928,9 @@ def __init__(self, *, attribs=Undefined, **kw): if attribs is Undefined: attribs = [] super().__init__(default=Undefined, **kw) - self.attribs = attribs + self.attribs = attribs # type: ignore[attr-defined] - def __get__(self, obj, objtype): + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized]) -> T: """Return the values of all the attribs, as a list.""" if obj is None: return [getattr(objtype, a) for a in self.attribs] @@ -1585,7 +1960,7 @@ def _post_setter(self, obj, val): # Selector #----------------------------------------------------------------------------- -class SelectorBase(Parameter): +class SelectorBase(Parameter[T]): """ Parameter whose value must be chosen from a list of possibilities. @@ -1606,7 +1981,7 @@ class ListProxy(list): items). """ - def __init__(self, iterable, parameter=None): + def __init__(self, iterable, parameter: Selector): super().__init__(iterable) self._parameter = parameter @@ -1749,7 +2124,7 @@ def update(self, objects, **items): objects = objects.items() if isinstance(objects, dict) else objects with self._trigger(): for i, o in enumerate(objects): - if not isinstance(o, collections.abc.Sequence): + if not isinstance(o, Sequence): raise TypeError( f'cannot convert dictionary update sequence element #{i} to a sequence' ) @@ -1851,7 +2226,12 @@ class Selector(SelectorBase, _SignatureSelector): __slots__ = ['_objects', 'compute_default_fn', 'check_on_set', 'names'] - @typing.overload + _objects: list[t.Any] + compute_default_fn: t.Callable[[], t.Any] | None + check_on_set: bool + names: Mapping[str, t.Any] + + @t.overload def __init__( self, *, objects=[], default=None, instantiate=False, compute_default_fn=None, @@ -1869,7 +2249,7 @@ def __init__(self, *, objects=Undefined, default=Undefined, instantiate=Undefine allow_None=Undefined, empty_default=False, **params): autodefault = Undefined - if objects is not Undefined and objects: + if not isinstance(objects, UndefinedType) and objects: if isinstance(objects, dict): autodefault = list(objects.values())[0] elif isinstance(objects, list): @@ -1878,13 +2258,12 @@ def __init__(self, *, objects=Undefined, default=Undefined, instantiate=Undefine default = autodefault if (not empty_default and default is Undefined) else default self.objects = objects - self.compute_default_fn = compute_default_fn - self.check_on_set = check_on_set + self.compute_default_fn = compute_default_fn # type: ignore[attr-defined] + self.check_on_set = check_on_set # type: ignore[attr-defined] - super().__init__( - default=default, instantiate=instantiate, **params) + super().__init__(default=default, instantiate=instantiate, **params) # Required as Parameter sets allow_None=True if default is None - if allow_None is Undefined: + if isinstance(allow_None, UndefinedType): self.allow_None = self._slot_defaults['allow_None'] else: self.allow_None = allow_None @@ -1902,7 +2281,7 @@ def objects(self): @objects.setter def objects(self, objects): - if isinstance(objects, collections.abc.Mapping): + if isinstance(objects, Mapping): self.names = objects self._objects = list(objects.values()) else: @@ -1974,7 +2353,7 @@ class ObjectSelector(Selector): historical reasons. """ - @typing.overload + @t.overload def __init__( self, default=None, *, objects=[], instantiate=False, compute_default_fn=None, @@ -1999,7 +2378,7 @@ class FileSelector(Selector): Selector._slot_defaults, path="", ) - @typing.overload + @t.overload def __init__( self, default=None, *, path="", objects=[], instantiate=False, compute_default_fn=None, @@ -2047,7 +2426,7 @@ class ListSelector(Selector): a list of possible objects. """ - @typing.overload + @t.overload def __init__( self, default=None, *, objects=[], instantiate=False, compute_default_fn=None, @@ -2108,7 +2487,7 @@ class MultiFileSelector(ListSelector): Selector._slot_defaults, path="", ) - @typing.overload + @t.overload def __init__( self, default=None, *, path="", objects=[], compute_default_fn=None, @@ -2145,7 +2524,7 @@ def get_range(self): return _abbreviate_paths(self.path,super().get_range()) -class ClassSelector(SelectorBase): +class ClassSelector(SelectorBase[T]): """ Parameter allowing selection of either a subclass or an instance of a class or tuple of classes. By default, requires an instance, but if is_instance=False, accepts a class instead. @@ -2157,28 +2536,65 @@ class ClassSelector(SelectorBase): _slot_defaults = dict(SelectorBase._slot_defaults, instantiate=True, is_instance=True) - @typing.overload + instantiate: bool + is_instance: bool + + if t.TYPE_CHECKING: + @t.overload + def __init__( + self: ClassSelector[T], + *, + class_: type[T] | tuple[type[T], ...], + default: T | None = None, + instantiate: bool = True, + is_instance: bool = True, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: ClassSelector[T], + *, + class_: type[T] | tuple[type[T], ...], + default: T | None = None, + instantiate: bool = True, + is_instance: bool = True, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - *, class_, default=None, instantiate=True, is_instance=True, - allow_None=False, doc=None, label=None, precedence=None, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + *, + class_: type[T], + default: T | None = None, + instantiate: bool = True, + is_instance: bool = True, + allow_None: bool = False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + constant: bool = False, + readonly: bool = False, ): ... @_deprecate_positional_args def __init__(self, *, class_, default=Undefined, instantiate=Undefined, is_instance=Undefined, **params): self.class_ = class_ - self.is_instance = is_instance - super().__init__(default=default,instantiate=instantiate,**params) + self.is_instance = is_instance # type: ignore + super().__init__(default=default, instantiate=instantiate, **params) self._validate(self.default) def _validate(self, val): super()._validate(val) self._validate_class_(val, self.class_, self.is_instance) - def _validate_class_(self, val, class_, is_instance): + def _validate_class_(self, val: t.Any, class_: type[T] | tuple[type[T], ...], is_instance: bool): if (val is None and self.allow_None): return if (is_instance and isinstance(val, class_)) or (not is_instance and issubclass(val, class_)): @@ -2214,19 +2630,50 @@ def get_range(self): return d -class Dict(ClassSelector): +class Dict(ClassSelector[T]): """Parameter whose value is a dictionary.""" - @typing.overload + if t.TYPE_CHECKING: + @t.overload + def __init__( + self: Dict[dict[K, V]], + *, + default: dict[K, V] | Literal[Undefined] = Undefined, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Dict[dict[K, V] | None], + *, + default: dict[K, V] | None = None, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=None, *, is_instance=True, - allow_None=False, doc=None, label=None, precedence=None, instantiate=True, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + default=Undefined, *, + is_instance: bool = True, + allow_None: bool = False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = True, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False, ): ... + @_deprecate_positional_args def __init__(self, default=Undefined, **params): super().__init__(default=default, class_=dict, **params) @@ -2234,7 +2681,7 @@ def __init__(self, default=Undefined, **params): class Array(ClassSelector): """Parameter whose value is a numpy array.""" - @typing.overload + @t.overload def __init__( self, default=None, *, is_instance=True, @@ -2293,7 +2740,7 @@ class DataFrame(ClassSelector): ClassSelector._slot_defaults, rows=None, columns=None, ordered=None ) - @typing.overload + @t.overload def __init__( self, default=None, *, rows=None, columns=None, ordered=None, is_instance=True, @@ -2409,7 +2856,7 @@ class Series(ClassSelector): ClassSelector._slot_defaults, rows=None, allow_None=False ) - @typing.overload + @t.overload def __init__( self, default=None, *, rows=None, allow_None=False, is_instance=True, @@ -2453,7 +2900,7 @@ def _validate(self, val): # List #----------------------------------------------------------------------------- -class List(Parameter): +class List(Parameter[T]): """ Parameter whose value is a list of objects, usually of a specified type. @@ -2474,19 +2921,61 @@ class List(Parameter): instantiate=True, default=[], is_instance=True, ) - @typing.overload + if t.TYPE_CHECKING: + @t.overload + def __init__( + self: List[list[T]], + default: list[T] = [], + *, + item_type: type[T] | tuple[type[T], ...], + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: List[list[T] | None], + default: list[T] | None = None, + *, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=[], *, class_=None, item_type=None, instantiate=True, bounds=(0, None), - is_instance=True, allow_None=False, doc=None, label=None, precedence=None, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + default=[], + class_: type[T] | tuple[type[T], ...] | None = None, + item_type: type[T] | tuple[type[T], ...] | None = None, + instantiate: bool = True, + bounds: tuple[int, int] = (0, None), + is_instance: bool = True, + allow_None: bool = False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False, ): ... @_deprecate_positional_args - def __init__(self, default=Undefined, *, class_=Undefined, item_type=Undefined, - instantiate=Undefined, bounds=Undefined, is_instance=Undefined, **params): + def __init__( + self, + default=Undefined, *, + class_: type[T] = Undefined, + item_type: type[T] = Undefined, + instantiate: bool = Undefined, + bounds: tuple[int, int] = Undefined, + is_instance: bool = Undefined, + **params + ): if class_ is not Undefined: # PARAM3_DEPRECATION warnings.warn( @@ -2713,7 +3202,6 @@ class Path(Parameter): check_exists: boolean, default=True If True (default) the path must exist on instantiation and set, otherwise the path can optionally exist. - """ __slots__ = ['search_paths', 'check_exists'] @@ -2722,13 +3210,48 @@ class Path(Parameter): Parameter._slot_defaults, check_exists=True, ) - @typing.overload + if t.TYPE_CHECKING: + @t.overload + def __init__( + self: Path[pathlib.PurePath | str], + default: pathlib.PurePath | None = None, + *, + search_paths: list[str] | None = None, + check_exists: bool = True, + allow_None: Literal[False] = False, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload + def __init__( + self: Path[pathlib.PurePath | str | None], + default: pathlib.PurePath | str | None = None, + *, + search_paths: list[str] | None = None, + check_exists: bool = True, + allow_None: Literal[True] = True, + **kwargs: t.Unpack[ParameterKwargs] + ) -> None: + ... + + @t.overload def __init__( self, - default=None, *, search_paths=None, check_exists=True, - allow_None=False, doc=None, label=None, precedence=None, instantiate=False, - constant=False, readonly=False, pickle_default_value=True, per_instance=True, - allow_refs=False, nested_refs=False + default=None, *, + search_paths: list[str] | None = None, + check_exists: bool = True, + allow_None: bool = False, + doc: str | None = None, + label: str | None = None, + precedence: float | None = None, + instantiate: bool = False, + constant: bool = False, + readonly: bool = False, + pickle_default_value: bool = True, + per_instance: bool = True, + allow_refs: bool = False, + nested_refs: bool = False, ): ... @@ -2760,9 +3283,9 @@ def _validate(self, val): if self.check_exists: raise OSError(e.args[0]) from None - def __get__(self, obj, objtype): + def __get__(self, obj: Parameterized | None, objtype: type[Parameterized]) -> T: """Return an absolute, normalized path (see resolve_path).""" - raw_path = super().__get__(obj,objtype) + raw_path = super().__get__(obj, objtype) if raw_path is None: path = None else: @@ -2880,7 +3403,7 @@ class Color(Parameter): _slot_defaults = dict(Parameter._slot_defaults, allow_named=True) - @typing.overload + @t.overload def __init__( self, default=None, *, allow_named=True, @@ -2944,8 +3467,7 @@ class Bytes(Parameter): Parameter._slot_defaults, default=b"", regex=None, allow_None=False, ) - - @typing.overload + @t.overload def __init__( self, default=b"", *, regex=None, allow_None=False, diff --git a/param/reactive.py b/param/reactive.py index cab240b3..5730dd7d 100644 --- a/param/reactive.py +++ b/param/reactive.py @@ -110,13 +110,13 @@ class Wrapper(Parameterized): """Helper class to allow updating literal values easily.""" - object = Parameter(allow_refs=False) + object: Parameter[Any] = Parameter(allow_refs=False) class GenWrapper(Parameterized): """Helper class to allow streaming from generator functions.""" - object = Parameter(allow_refs=True) + object: Parameter[Any] = Parameter(allow_refs=True) class Trigger(Parameterized): @@ -132,11 +132,11 @@ def __init__(self, parameters=None, internal=False, **params): class Resolver(Parameterized): """Helper class to allow (recursively) resolving references.""" - object = Parameter(allow_refs=True) + object: Parameter[Any] = Parameter(allow_refs=True) recursive = Boolean(default=False) - value = Parameter() + value: Parameter[Any] = Parameter() def __init__(self, **params): self._watchers = [] @@ -174,7 +174,7 @@ def _update_refs(self, refs): class NestedResolver(Resolver): - object = Parameter(allow_refs=True, nested_refs=True) + object: Parameter[Any] = Parameter(allow_refs=True, nested_refs=True) class reactive_ops: