Source code for tangl.core.behavior

# tangl/core/behavior.py
"""Behavior dispatch primitives and receipt aggregation for core.

This module provides:

- behavior metadata and invocation wrappers (:class:`Behavior`);
- registry composition and execution plumbing (:class:`BehaviorRegistry`);
- execution receipts and aggregation helpers (:class:`CallReceipt`);
- shared ordering enums (:class:`Priority`, :class:`DispatchLayer`).

Higher layers supply policy by deciding which registries are active in context.
Core only defines deterministic selection, ordering, and result folding.
"""

from __future__ import annotations
import itertools
from typing import (
    Any,
    Callable,
    Protocol,
    Iterator,
    Iterable,
    Self,
    ClassVar,
    runtime_checkable,
    Mapping,
    Type,
    Optional,
)
from enum import IntEnum, Enum
from collections import ChainMap
from inspect import isclass
import logging

from pydantic import ConfigDict, model_validator, SkipValidation

from tangl.type_hints import Tag
from .entity import Entity
from .registry import Registry, RegistryAware
from .record import HasOrder, Record
from .selector import Selector
from .ctx import DispatchCtx

logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)

class Priority(IntEnum):
    """Execution priority within one dispatch layer (lower values run earlier)."""
    FIRST = 0
    EARLY = 25
    NORMAL = 50
    LATE = 75
    LAST = 100


class DispatchLayer(IntEnum):
    """Registry layer ordering used before per-layer priority ordering."""
    # local sorts _later_ in execution priority so it can observe and aggregate globals
    GLOBAL = 0
    SYSTEM = 1
    APPLICATION = 2
    AUTHOR = 3
    USER = 4
    LOCAL = 5


@runtime_checkable
class RuntimeCtx(DispatchCtx, Protocol):
    """Runtime context protocol used by behavior execution chains.

    Core dispatch only requires registry and inline behavior access. Higher
    layers may expose additional fields and helper methods.
    """

    def get_authorities(self) -> Iterable[BehaviorRegistry]: ...
    def get_inline_behaviors(self) -> Iterable[Behavior | Callable[..., Any]]: ...


class AggregationMode(Enum):
    """How to reduce multiple receipts to a result."""
    FIRST = "first_result"       # Early-exit, first wins
    LAST = PIPE = "last_result"  # Composite result
    ALL_TRUE = "all_true"        # Validation gate
    GATHER = "gather_results"    # Collect all
    MERGE = "merge_results"      # Flatten/combine, later wins


[docs] class CallReceipt(Record): """ **Aggregation Modes:** Receipt aggregation or folding summarizes dispatch traces into a concrete result or list of results. One key detail is that behaviors that return a None result are tracked with a receipt for audit, but do not participate in result reduction. Several generic aggregators are implemented as class functions on Receipt (handling for collections of receipts). Additional aggregators can be introduced at other layers. Supported aggregation helpers: - ``first_result``: first non-``None`` result (single winner). - ``last_result``: last non-``None`` result (override pattern). - ``all_true``: all non-``None`` results are truthy (validation gates). - ``gather_results``: collect all non-``None`` results. - ``merge_results``: flatten lists or merge dicts (later entries override). Examples: >>> receipts = [ CallReceipt(result=None), ... CallReceipt(result=1), ... CallReceipt(result=0), ... CallReceipt(result=None) ] >>> CallReceipt.gather_results(*receipts) [1, 0] >>> CallReceipt.first_result(*receipts) 1 >>> CallReceipt.last_result(*receipts) 0 >>> CallReceipt.all_true(*receipts) False >>> CallReceipt.merge_results(CallReceipt(result=[1,2,3]), ... CallReceipt(result=[4,5,6])) # flattens [1, 2, 3, 4, 5, 6] >>> dict( CallReceipt.merge_results(CallReceipt(result={'a': 'foo'}), ... CallReceipt(result={'b': 'bar'}), ... CallReceipt(result={'a': 'baz'})) ) # late overrides {'a': 'baz', 'b': 'bar'} """ model_config = ConfigDict(arbitrary_types_allowed=True) # carries arbitrary types in callbacks and context, so don't serialize guard_unstructure: ClassVar[bool] = True result: Any = None callback: Callable[[Any], Any] = None args: tuple[Any, ...] = None kwargs: dict[str, Any] = None ctx: SkipValidation[RuntimeCtx] = None @model_validator(mode='after') def _either_result_or_cb_specified(self) -> Self: """Require exactly one of ``result`` or ``callback``.""" value = sum(['callback' in self.model_fields_set, 'result' in self.model_fields_set]) if value != 1: raise ValueError("Exactly one of 'callback' or 'result' should be specified") return self def resolve(self, *args, **kwargs) -> Any: """Resolve deferred callback receipts once and cache the result.""" if self.result is None and self.callback is not None: self.force_set('args', args) # by-pass frozen self.force_set('kwargs', kwargs) result = self.callback(*self.args, ctx=self.ctx, **self.kwargs) self.force_set('result', result) return self.result # Aggregation functions # todo: force resolve any deferred receipts? otherwise they don't count as None's @classmethod def iter_results(cls, *receipts) -> Iterator[Any]: """Yield non-``None`` receipt results in order.""" return (receipt.result for receipt in receipts if receipt.result is not None) @classmethod def gather_results(cls, *receipts) -> list[Any]: """Collect non-``None`` receipt results in order.""" return list(cls.iter_results(*receipts)) @classmethod def first_result(cls, *receipts: Self): """Return first non-``None`` receipt result, or ``None``.""" return next(cls.iter_results(*receipts), None) @classmethod def last_result(cls, *receipts: Self): """Return last non-``None`` receipt result, or ``None``.""" return next(cls.iter_results(*reversed(receipts)), None) @classmethod def all_true(cls, *receipts: Self): """Return ``True`` when all non-``None`` results are truthy.""" return all([bool(r) for r in cls.iter_results(*receipts)]) @classmethod def all_truthy(cls, *receipts: Self): """Legacy alias for :meth:`all_true`.""" return cls.all_true(*receipts) @classmethod def merge_results(cls, *receipts: Self) -> list[Any] | Mapping[Any, Any]: """Merge homogeneous results (lists/dicts) or return gathered mixed results.""" results = cls.gather_results(*receipts) if all( isinstance(r, list) for r in results ): return list( itertools.chain.from_iterable(results) ) elif all( isinstance(r, dict) for r in results ): return ChainMap(*reversed(results)) # later dict values override earlier ones return results @classmethod def aggregate(cls, mode: AggregationMode, *receipts: Self): """Dispatch aggregation by :class:`AggregationMode`.""" match mode: case AggregationMode.FIRST: return cls.first_result(*receipts) case AggregationMode.LAST: return cls.last_result(*receipts) case AggregationMode.ALL_TRUE: return cls.all_true(*receipts) case AggregationMode.GATHER: return cls.gather_results(*receipts) case AggregationMode.MERGE: return cls.merge_results(*receipts) case _: raise ValueError(f"Unknown aggregation mode: {mode}")
[docs] class Behavior(RegistryAware, HasOrder, Entity): """ Example: >>> b = Behavior(func=lambda *nums, **kwargs: sum(nums)) >>> receipt = b(1, 2, 3) >>> f"sum{receipt.args}={receipt.result}" 'sum(1, 2, 3)=6' >>> deferred = b.defer() >>> assert deferred.result is None >>> deferred.resolve(4, 5, 6) 15 >>> f"sum{deferred.args}={deferred.result}" 'sum(4, 5, 6)=15' >>> c = Behavior(func=lambda *_, **__: True, wants_caller_kind=Entity) >>> assert Selector(caller_kind=Entity).matches(c) and not Selector(caller_kind=dict).matches(c) """ # todo: method type introspection was in v37, but complicated and underutilized? # - detect class methods as caller hint # - detect inst methods as caller hint and bind func to caller dynamically # - detect instance funcs and bind source func: Callable = lambda *_, **__: True task: Tag = None priority: int = Priority.NORMAL dispatch_layer: int = DispatchLayer.LOCAL wants_caller_kind: Type[Any] | None = None wants_exact_kind: bool = True # disallow caller-kind subclasses def caller_kind(self, kind: Type[Any]) -> bool: """Return whether this behavior accepts a caller of ``kind``.""" logger.debug("checking caller kind against wants_caller_kind") if self.wants_caller_kind is None: return True if isclass(kind): if kind is self.wants_caller_kind: return True elif not self.wants_exact_kind and issubclass(kind, self.wants_caller_kind): return True return False def __call__(self, *args, ctx: RuntimeCtx = None, **kwargs) -> CallReceipt: """Invoke behavior function and return a resolved :class:`CallReceipt`.""" # todo: could do some introspection here, if the func wants caller, etc., check # for default call args/kwargs in ctx return CallReceipt( origin_id=self.uid, result=self.func(*args, ctx=ctx, **kwargs), args=args, kwargs=kwargs, ctx=ctx ) def defer(self, ctx: RuntimeCtx = None) -> CallReceipt: """Return a deferred receipt that resolves the callback later.""" return CallReceipt( origin_id=self.uid, ctx=ctx, callback = self.func ) @property def sort_key(self): """ Sorts by: - layer: global -> local - priority: low -> high - wants_exact_kind: ``False`` then ``True`` - registration seq: earlier -> later """ return self.dispatch_layer, self.priority, self.wants_exact_kind, self.seq
[docs] class BehaviorRegistry(Registry[Behavior]): """ Example: >>> br = BehaviorRegistry() >>> f = br.register(lambda *nums, **kwargs: sum(nums), task="sum") >>> g = br.register(lambda *args, **kwargs: ''.join([str(a) for a in args]), ... task="join", priority=Priority.EARLY) >>> next( br.execute_all(task="sum", call_args=(1, 2, 3)) ).result 6 >>> next( br.execute_all(task="join", call_args=('a', 'b', 'c')) ).result 'abc' >>> CallReceipt.gather_results( *br.execute_all(call_args=(1, 2, 3)) ) ... # join triggers first even tho registered last, b/c lower priority ['123', 6] """ default_task: Tag = None default_priority: Priority = Priority.NORMAL default_dispatch_layer: DispatchLayer = DispatchLayer.APPLICATION def register(self, func: Callable | None = None, **kwargs): """Register behavior function(s), supporting both direct and decorator use.""" def _register(target: Callable) -> Callable: payload = dict(kwargs) payload.setdefault("task", self.default_task) payload.setdefault("priority", self.default_priority) payload.setdefault("dispatch_layer", self.default_dispatch_layer) behavior = Behavior(func=target, **payload) setattr(target, "_behavior", behavior) self.add(behavior) return target if func is None: return _register return _register(func) @classmethod def _get_receipts(cls, behaviors, *, call_args, call_kwargs, ctx) -> Iterator[CallReceipt]: """Yield receipts for each behavior call with normalized args/kwargs.""" call_args = call_args or () call_kwargs = call_kwargs or {} yield from (b(*call_args, ctx=ctx, **call_kwargs) for b in behaviors) # It would be nice to include aggregator here, but it makes type checking a pain def execute_all(self, *, call_args: tuple[Any, ...] = None, call_kwargs: dict[str, Any] = None, ctx: RuntimeCtx = None, task: Tag = None, selector: Selector = None, inline_behaviors: Iterable[Behavior | Callable[..., Any]] = None ) -> Iterator[CallReceipt]: """ Execute all behaviors matching selector in sorted order. Args: call_args: Positional arguments for behavior functions call_kwargs: Keyword arguments for behavior functions ctx: Runtime context (optional) task: Task tag to filter behaviors (convenience) selector: Additional selection criteria inline_behaviors: Additional behaviors/callables to execute Yields: CallReceipt for each executed behavior in sort order """ return self.chain_execute_all( self, call_args=call_args, call_kwargs=call_kwargs, ctx=ctx, task=task, selector=selector, inline_behaviors=inline_behaviors, ) def dispatch( self, *call_args: Any, ctx: RuntimeCtx = None, task: Tag = None, selector: Selector = None, extra_handlers: Iterable[Behavior | Callable[..., Any]] = None, **call_kwargs: Any, ) -> list[CallReceipt]: """Legacy alias returning a materialized receipt list.""" kwargs = call_kwargs or None args = call_args or None return list( self.execute_all( call_args=args, call_kwargs=kwargs, ctx=ctx, task=task, selector=selector, inline_behaviors=extra_handlers, ) ) @classmethod def _wrap_inline( cls, behaviors: Iterable[Behavior | Callable[..., Any]], *, task: Tag = "inline", ) -> BehaviorRegistry: """Wrap ad-hoc inline callables/behaviors into a temporary local registry.""" registry = cls(default_dispatch_layer=DispatchLayer.LOCAL) for behavior in behaviors: if isinstance(behavior, Behavior): # Keep original task/layer metadata without rebinding ownership. registry.members[behavior.uid] = behavior continue if callable(behavior): registry.register(func=behavior, task=task) continue raise TypeError(f"Expected Behavior or callable, got {type(behavior)!r}") return registry @classmethod def _ctx_authorities(cls, ctx: Any) -> Iterable[BehaviorRegistry]: """Yield registry authorities provided by ``ctx``.""" get_authorities = getattr(ctx, "get_authorities", None) if callable(get_authorities): return get_authorities() or () return () @classmethod def chain_execute_all( cls, *registries: BehaviorRegistry, call_args: Optional[tuple[Any, ...]] = None, call_kwargs: Optional[dict[str, Any]] = None, ctx: Optional[RuntimeCtx] = None, task: Optional[Tag] = None, selector: Optional[Selector] = None, inline_behaviors: Optional[Iterable[Behavior | Callable[..., Any]]] = None, ) -> Iterator[CallReceipt]: """Execute behaviors across multiple registries plus context-provided sources. Registry sources are assembled in this order: 1. Explicit ``registries`` arguments. 2. ``ctx.get_authorities()`` when available. 3. Inline callables from ``ctx.get_inline_behaviors()`` and ``inline_behaviors``. Registries are deduplicated by object identity, then behaviors are filtered and sorted by :attr:`Behavior.sort_key`. """ assembled_registries = list(registries) if ctx is not None: assembled_registries.extend(cls._ctx_authorities(ctx)) get_inline_behaviors = getattr(ctx, "get_inline_behaviors", None) if callable(get_inline_behaviors): inline_from_ctx = get_inline_behaviors() or () if inline_from_ctx: assembled_registries.append(cls._wrap_inline(inline_from_ctx, task=task or "inline")) if inline_behaviors: assembled_registries.append(cls._wrap_inline(inline_behaviors, task=task or "inline")) deduplicated_registries: list[BehaviorRegistry] = [] seen_registry_ids: set[int] = set() for registry in assembled_registries: registry_id = id(registry) if registry_id in seen_registry_ids: continue seen_registry_ids.add(registry_id) deduplicated_registries.append(registry) if task is not None: selector = (selector or Selector()).with_criteria(task=task) behaviors = cls.chain_find_all( *deduplicated_registries, selector=selector, sort_key=lambda v: v.sort_key, ) return cls._get_receipts(behaviors, call_args=call_args, call_kwargs=call_kwargs, ctx=ctx) @classmethod def chain_execute( cls, *registries: BehaviorRegistry, call_args: Optional[tuple[Any, ...]] = None, call_kwargs: Optional[dict[str, Any]] = None, ctx: Optional[RuntimeCtx] = None, task: Optional[Tag] = None, selector: Optional[Selector] = None, inline_behaviors: Optional[Iterable[Behavior | Callable[..., Any]]] = None, ) -> Iterator[CallReceipt]: """Backwards-compatible alias for :meth:`chain_execute_all`.""" return cls.chain_execute_all( *registries, call_args=call_args, call_kwargs=call_kwargs, ctx=ctx, task=task, selector=selector, inline_behaviors=inline_behaviors, )