Source code for tangl.core.dispatch

# tangl/core/dispatch.py
# language=markdown
"""
Default global dispatch registry and explicit hook pairs for core lifecycle events.

Hook pairs are exposed as explicit decorators and runners:

- creation:
  - ``on_create`` / ``do_create`` (pre-structuring, ``data -> data``)
  - ``on_init`` / ``do_init`` (post-init, ``caller -> None``)
- registry indexing:
  - ``on_add_item`` / ``do_add_item`` (``registry, item -> item``)
  - ``on_get_item`` / ``do_get_item`` (``registry, item -> item``)
  - ``on_remove_item`` / ``do_remove_item`` (``registry, item -> None``)
- graph relationships:
  - ``on_link`` / ``do_link`` (``caller, node -> None``)
  - ``on_unlink`` / ``do_unlink`` (``caller, node -> None``)

Context contract
----------------
Dispatch chaining is driven by the runtime context protocol used by
:class:`tangl.core.behavior.BehaviorRegistry`:

- ``ctx.get_authorities()`` contributes extra registries by layer.
- ``ctx.get_inline_behaviors()`` contributes one-off callables.

Callers normally pass ``_ctx`` to hook-aware APIs (for example ``Entity(..., _ctx=ctx)``,
``Entity.structure(..., _ctx=ctx)``, ``Registry.add(..., _ctx=ctx)``), or rely on ambient
context via ``using_ctx``.

Aggregation contracts
---------------------

- ``do_create`` folds hook results with :meth:`CallReceipt.merge_results`.
- ``do_add_item`` and ``do_get_item`` use :meth:`CallReceipt.last_result`.
- ``do_init``, ``do_remove_item``, ``do_link``, and ``do_unlink`` force receipt
  evaluation with :meth:`CallReceipt.gather_results`.

Example:
    >>> assert Entity(label="bar").label == "bar"
    >>> _ = on_init(func=lambda *, caller, ctx = None, **kwargs: setattr(caller, "label", "foo"))
    >>> item = Entity(
    ...     label="bar",
    ...     _ctx=SimpleNamespace(
    ...         get_authorities=lambda: [],
    ...         get_inline_behaviors=lambda: [],
    ...     ),
    ... )  # calls global dispatch by default
    >>> assert item.label == "foo"
    >>> q = BehaviorRegistry()
    >>> _ = q.register(task="init", dispatch_layer=DispatchLayer.APPLICATION,
    ...                func=lambda *, caller, ctx = None, **kwargs: setattr(caller, "label", "baz"))
    >>> item = Entity(
    ...     label="bar",
    ...     _ctx=SimpleNamespace(
    ...         get_authorities=lambda: [q],
    ...         get_inline_behaviors=lambda: [],
    ...     ),
    ... )
    >>> assert item.label == "baz"
    >>> dispatch.clear()  # always clean up global registries after using


Dispatch Layer Mapping
----------------------

- GLOBAL / core / ``core.dispatch`` / auditing and logging
- SYSTEM / vm / ``vm.dispatch`` / phase handlers and provisioning
- SYSTEM / service / ``service.dispatch`` / API and persistence
- APPLICATION / story / ``story.dispatch`` / content rendering and domain rules
- APPLICATION / mechanics / ``story.dispatch`` / story semantics extensions
- APPLICATION / discourse / ``story.dispatch`` / prose model extensions
- APPLICATION / media / ``story.dispatch`` / media extensions
- AUTHOR / world / ``world.dispatch`` / world-specific mechanics
- LOCAL / vm.frame / ``vm.frame.dispatch`` / one-off handlers

"""
from types import SimpleNamespace
from copy import deepcopy

from tangl.type_hints import UnstructuredData
from .entity import Entity
from .selector import Selector
from .registry import Registry
from .behavior import RuntimeCtx, BehaviorRegistry, DispatchLayer, CallReceipt
from .graph import GraphItem, Node

dispatch = BehaviorRegistry(label="global_dispatch", default_dispatch_layer=DispatchLayer.GLOBAL)

# Creation hooks
# --------------

[docs] def on_init(func, **kwargs): """Register an ``init`` hook on the global dispatch registry.""" return dispatch.register(func=func, task="init", **kwargs)
[docs] def do_init(*, caller: Entity, ctx: RuntimeCtx): """Run ``init`` hooks for a newly constructed caller.""" receipts = dispatch.execute_all( ctx=ctx, call_kwargs={'caller': caller}, selector=Selector(caller_kind=type(caller)), # only match caller-compatible behaviors task="init", ) CallReceipt.gather_results(*receipts) # force results to evaluate
[docs] def on_create(func, **kwargs): """Register a ``create`` hook on the global dispatch registry.""" return dispatch.register(func=func, task="create", **kwargs)
[docs] def do_create(*, data: UnstructuredData, ctx: RuntimeCtx): """Run ``create`` hooks and merge returned mapping updates into structuring input.""" receipts = dispatch.execute_all( ctx=ctx, call_kwargs={'data': data}, task="create", ) update = CallReceipt.merge_results(*receipts) if update: data = deepcopy(data) | update return data
# Indexing hooks # --------------
[docs] def on_add_item(func, **kwargs): """Register an ``add_item`` hook for registry insertion.""" return dispatch.register(func=func, task="add_item", **kwargs)
[docs] def do_add_item(registry: Registry, item: Entity, ctx: RuntimeCtx): """Run ``add_item`` hooks and return the final inserted entity. The last non-``None`` receipt result wins. If no hook returns a replacement, the original ``item`` is returned. """ receipts = dispatch.execute_all( ctx=ctx, call_kwargs={'registry': registry, 'item': item}, task="add_item", ) result = CallReceipt.last_result(*receipts) return result if result is not None else item
[docs] def on_get_item(func, **kwargs): """Register a ``get_item`` hook for registry lookup interception.""" return dispatch.register(func=func, task="get_item", **kwargs)
[docs] def do_get_item(registry: Registry, item: Entity, ctx: RuntimeCtx) -> Entity: """Run ``get_item`` hooks and return the final fetched entity.""" receipts = dispatch.execute_all( ctx=ctx, call_kwargs={'registry': registry, 'item': item}, task="get_item", ) result = CallReceipt.last_result(*receipts) return result if result is not None else item
[docs] def on_remove_item(func, **kwargs): """Register a ``remove_item`` hook for post-removal inspection.""" return dispatch.register(func=func, task="remove_item", **kwargs)
[docs] def do_remove_item(registry: Registry, item: Entity, ctx: RuntimeCtx): """Run ``remove_item`` hooks after registry removal.""" receipts = dispatch.execute_all( ctx=ctx, call_kwargs={'registry': registry, 'item': item}, task="remove_item", ) CallReceipt.gather_results(*receipts) # force receipt evaluation
# Graph hooks # --------------