Source code for tangl.vm.provision.provisioner

from __future__ import annotations
from enum import Flag, auto
from typing import Any, ClassVar, Callable, Protocol, Iterable, Iterator, TYPE_CHECKING, Mapping
from dataclasses import dataclass
from uuid import UUID, uuid4

from pydantic import ConfigDict, SkipValidation

from tangl.core import (
    Entity,
    Record,
    EntityTemplate,
    Node,
    Selector,
    Priority,
    TemplateRegistry,
    TokenCatalog,
)
from ..traversable import TraversableNode

from .scope import (
    build_plan,
    leaf_identifier,
    scope_distance,
    target_context_candidates,
)

if TYPE_CHECKING:
    from .requirement import Requirement, Affordance


[docs] class ProvisionPolicy(Flag): """Bitflag describing allowed or selected provisioning offer kinds. Requirement instances use these flags to restrict acceptable offer types, while :class:`ProvisionOffer` instances use them to declare what kind of action they represent. """ # For offers only STUB = auto() # debug/preview linkage that suspends requirement fidelity TOKEN = auto() # indicate offer is for a token # Offer may include ONE of these, req may include multiple EXISTING = auto() UPDATE = auto() # find + update CREATE = auto() CLONE = auto() # create + update # for requirements only ANY = EXISTING | UPDATE | CREATE def __int__(self): # should be monotonic, stub is lowest # (create | token) should probably be cheaper than create alone? return self.value
[docs] class ProvisionOffer(Record): """Deferred, ranked candidate for satisfying one requirement. Why ---- Resolution gathers many possible candidates before committing to any single one. ``ProvisionOffer`` keeps the selection process pure by storing a lazy callback instead of materializing providers eagerly. """ # todo: seems like we want to attach the accepted offer to the requirement or # requirement carrier, maybe exclude the callback and serialize as just the # origin, policy, priority? Or just track the accepted-offer-id in the # requirement? model_config = ConfigDict(arbitrary_types_allowed=True) # has arbitrary types, don't allow serialization guard_unstructure: ClassVar[bool] = True policy: ProvisionPolicy # but not ANY callback: Callable priority: int = Priority.NORMAL distance_from_caller: int = 999 specificity: int = 0 exact_kind_match: bool = False scope_distance: int = 0 build_plan: list[str] | None = None target_ctx: str | None = None candidate: Any = None def sort_key(self): from .matching import offer_sort_key return offer_sort_key(self)
[docs] class Provisioner(Protocol): """Structural protocol implemented by offer-producing provisioning helpers.""" def get_dependency_offers(self, requirement: Requirement) -> Iterable[ProvisionOffer]: ... def get_affordance_offers(self, node: Node) -> Iterable[ProvisionOffer]: ...
def _next_provision_uid(*, _ctx: Any = None) -> UUID: """Return a deterministic uid when vm context RNG is available.""" if _ctx is not None: get_random = getattr(_ctx, "get_random", None) if callable(get_random): rng = get_random() if hasattr(rng, "getrandbits"): return UUID(int=rng.getrandbits(128), version=4) return uuid4() def _template_hash_value(template: EntityTemplate) -> bytes: content_hash = template.content_hash() if not isinstance(content_hash, (bytes, bytearray, memoryview)): raise TypeError("EntityTemplate.content_hash() must return bytes-like data") return bytes(content_hash)
[docs] @dataclass class FindProvisioner: """Offer EXISTING providers already present in local entity groups.""" values: SkipValidation[Iterable[Entity]] # current graph, don't copy on create distance: int = 0 def get_dependency_offers(self, requirement: Requirement) -> Iterator[ProvisionOffer]: candidates = (value for value in self.values if requirement.satisfied_by(value)) for c in candidates: yield ProvisionOffer( origin_id = "FindProvisioner", policy = ProvisionPolicy.EXISTING, priority = Priority.NORMAL, distance_from_caller=self.distance, candidate=c, callback = lambda *_, _c=c, **__: _c # need to freeze ref to _this_ c ) def get_affordance_offers(self, node: Node) -> Iterator[ProvisionOffer]: from .requirement import Affordance candidates = Selector(has_kind=Affordance, satisfied_by=node).filter(self.values) for c in candidates: yield ProvisionOffer( origin_id = "FindProvisioner", policy = ProvisionPolicy.EXISTING, priority = Priority.NORMAL, distance_from_caller=self.distance, candidate=c, callback = lambda *_, _c=c, **__: _c # need to freeze ref to _this_ c )
[docs] @dataclass class TemplateProvisioner: """Offer CREATE candidates from template registries using scope matching.""" registries: SkipValidation[Iterable[TemplateRegistry]] = () request_ctx: str = "" graph: Any | None = None story_materialize: Callable[[EntityTemplate, Any], Entity] | None = None materialize_node: Callable[..., Entity] | None = None @staticmethod def _selector_identifier(requirement: "Requirement") -> str | None: extra = requirement.__pydantic_extra__ or {} value = extra.get("has_identifier") return str(value) if isinstance(value, str) and value else None @staticmethod def _is_episode_requirement(requirement: "Requirement") -> bool: extra = requirement.__pydantic_extra__ or {} kind = extra.get("has_kind") return isinstance(kind, type) and issubclass(kind, TraversableNode) @staticmethod def _matches_non_identifier_criteria(requirement: "Requirement", candidate: Any) -> bool: criteria = dict(requirement.__pydantic_extra__ or {}) criteria.pop("has_identifier", None) selector = Selector(predicate=requirement.predicate, **criteria) return selector.matches(candidate) @staticmethod def _selector_without_identifier(requirement: "Requirement") -> Selector: criteria = dict(requirement.__pydantic_extra__ or {}) criteria.pop("has_identifier", None) return Selector(predicate=requirement.predicate, **criteria) @staticmethod def _matches_scope(candidate: EntityTemplate, target_ctx: str) -> bool: return Selector(admitted_to=target_ctx).matches(candidate) @classmethod def _matches_template_identity( cls, requirement: "Requirement", candidate: EntityTemplate, ) -> bool: identifier = cls._selector_identifier(requirement) if identifier is None: return True if candidate.has_identifier(identifier): return True # Explicit dotted identifiers are treated as strict identity requests: # no leaf fallback, so authored absolute/qualified paths cannot silently # collapse to shared leaf-only template labels. if "." in identifier: return False leaf = leaf_identifier(identifier) if leaf is None: return False return candidate.has_identifier(leaf) def _materialize_template(self, template: EntityTemplate, *, _ctx: Any = None) -> Entity: if not callable(self.materialize_node): raise RuntimeError( "TemplateProvisioner requires materialize_node for consistent " "story materialization semantics; instantiate through Resolver " "or inject materialize_node explicitly." ) return self.materialize_node( template, _ctx=_ctx, role="provision_leaf", story_materialize=self.story_materialize, ) def get_dependency_offers(self, requirement: Requirement) -> Iterator[ProvisionOffer]: identifier = self._selector_identifier(requirement) is_episode_requirement = self._is_episode_requirement(requirement) selector = self._selector_without_identifier(requirement) registries = [registry for registry in self.registries if isinstance(registry, TemplateRegistry)] if not registries: return seen_pairs: set[tuple[UUID, str]] = set() if isinstance(identifier, str) and identifier: target_contexts = target_context_candidates( identifier=identifier, request_ctx=self.request_ctx, authored_path=requirement.authored_path, is_qualified=requirement.is_qualified, is_absolute=requirement.is_absolute, ) else: target_contexts = [self.request_ctx or ""] for target_ctx in target_contexts: ranked_candidates = TemplateRegistry.chain_find_all( *registries, selector=selector, sort_key=lambda template: scope_distance(template.admission_scope, target_ctx), ) for candidate in ranked_candidates: if not self._matches_template_identity(requirement, candidate): continue if not self._matches_scope(candidate, target_ctx): continue distance = scope_distance(candidate.admission_scope, target_ctx) if is_episode_requirement and (not requirement.is_qualified) and distance > 0: continue pair = (candidate.uid, target_ctx) if pair in seen_pairs: continue seen_pairs.add(pair) chain = None if is_episode_requirement and requirement.is_qualified: chain = build_plan(target_ctx, self.graph) yield ProvisionOffer( origin_id="TemplateProvisioner", policy=ProvisionPolicy.CREATE, priority=Priority.NORMAL, scope_distance=distance, build_plan=chain, target_ctx=target_ctx, candidate=candidate, callback=lambda *_, _c=candidate, **kwargs: self._materialize_template( _c, _ctx=kwargs.get("_ctx"), ), )
# Not sure what affordance providers look like in template form?
[docs] @dataclass class InlineTemplateProvisioner: """Offer inline requirement templates as normal CREATE candidates.""" materialize_node: Callable[..., Entity] | None = None story_materialize: Callable[[EntityTemplate, Any], Entity] | None = None def iter_dependency_offers(self, requirement: Requirement) -> Iterable[ProvisionOffer]: if requirement.fallback_templ is not None: return [ProvisionOffer( origin_id=requirement.fallback_templ.get_label(), policy=ProvisionPolicy.CREATE, callback=lambda *_, _t=requirement.fallback_templ, **kwargs: self._materialize_inline( _t, _ctx=kwargs.get("_ctx"), ), priority=Priority.LATE, distance_from_caller=0, candidate=requirement.fallback_templ, )] return [] def _materialize_inline(self, template: EntityTemplate, *, _ctx: Any = None) -> Entity: if not callable(self.materialize_node): raise RuntimeError( "InlineTemplateProvisioner requires materialize_node for consistent " "story materialization semantics; instantiate through Resolver " "or inject materialize_node explicitly." ) return self.materialize_node( template, _ctx=_ctx, role="provision_leaf", story_materialize=self.story_materialize, ) @classmethod def get_dependency_offers(cls, requirement: Requirement) -> Iterable[ProvisionOffer]: raise NotImplementedError( "InlineTemplateProvisioner.get_dependency_offers() classmethod shim " "was removed; use Resolver.gather_offers() or " "InlineTemplateProvisioner(...).iter_dependency_offers(requirement)." )
# Can't have a fallback affordance, that's just a structure that's in scope?
[docs] class StubProvisioner: """Preview-oriented provisioner that synthesizes minimal matching entities.""" HIGH_COST_PRIORITY = Priority.LAST + 10_000 @classmethod def _extract_selector_value(cls, requirement: Requirement, key: str): extra = requirement.__pydantic_extra__ or {} return extra.get(key) @classmethod def _synthesize_entity(cls, requirement: Requirement) -> Entity | None: kind = cls._extract_selector_value(requirement, "has_kind") or Entity if not isinstance(kind, type) or not issubclass(kind, Entity): kind = Entity kwargs: dict = {} identifier = cls._extract_selector_value(requirement, "has_identifier") label = cls._extract_selector_value(requirement, "label") tags = cls._extract_selector_value(requirement, "has_tags") if isinstance(identifier, str): kwargs["label"] = identifier elif isinstance(label, str): kwargs["label"] = label if isinstance(tags, (set, list, tuple)): kwargs["tags"] = set(tags) elif isinstance(tags, str): kwargs["tags"] = {tags} try: candidate = kind(**kwargs) except Exception: try: candidate = kind() except Exception: return None if "label" in kwargs and hasattr(candidate, "label"): candidate.label = kwargs["label"] if "tags" in kwargs and hasattr(candidate, "tags"): candidate.tags = kwargs["tags"] if requirement.satisfied_by(candidate): return candidate return candidate @classmethod def get_dependency_offers(cls, requirement: Requirement) -> Iterable[ProvisionOffer]: return [ProvisionOffer( origin_id="StubProvisioner", policy=ProvisionPolicy.STUB, priority=cls.HIGH_COST_PRIORITY, distance_from_caller=999_999, callback=lambda *_, _req=requirement, **__: cls._synthesize_entity(_req), )]
[docs] @dataclass class TokenProvisioner: """Offer CREATE token providers from singleton token catalogs.""" catalogs: SkipValidation[Iterable[TokenCatalog]] = () @staticmethod def _selector(requirement: "Requirement") -> Selector: criteria = dict(requirement.__pydantic_extra__ or {}) # ``label`` is used as the desired token-node label at materialization time. # Do not apply it to singleton candidate filtering. criteria.pop("label", None) return Selector(predicate=requirement.predicate, **criteria) @staticmethod def _explicit_label(requirement: "Requirement") -> str | None: value = (requirement.__pydantic_extra__ or {}).get("label") if isinstance(value, str) and value: return value return None def get_dependency_offers(self, requirement: Requirement) -> Iterable[ProvisionOffer]: selector = self._selector(requirement) label = self._explicit_label(requirement) catalogs = [catalog for catalog in self.catalogs if isinstance(catalog, TokenCatalog)] if not catalogs: return catalogs_by_type = {catalog.wst: catalog for catalog in catalogs} def _catalog_for_instance(instance: Any) -> TokenCatalog | None: instance_type = type(instance) exact = catalogs_by_type.get(instance_type) if exact is not None: return exact # Fallback for mixed catalog hierarchies: nearest ancestor catalog. for ancestor in instance_type.__mro__[1:]: catalog = catalogs_by_type.get(ancestor) if catalog is not None: return catalog return None for instance in TokenCatalog.chain_find_all(*catalogs, selector=selector): catalog = _catalog_for_instance(instance) if catalog is None: continue origin = f"TokenProvisioner:{catalog.wst.__name__}:{instance.get_label()}" yield ProvisionOffer( origin_id=origin, policy=ProvisionPolicy.CREATE | ProvisionPolicy.TOKEN, priority=Priority.EARLY, scope_distance=0, candidate=instance, callback=lambda *_, _catalog=catalog, _inst=instance, _label=label, **kwargs: _catalog.materialize_one( _inst, uid=_next_provision_uid(_ctx=kwargs.get("_ctx")), label=_label, ), ) def get_affordance_offers(self, node: Node) -> Iterable[ProvisionOffer]: _ = node return []
[docs] class UpdateCloneProvisioner: """Synthesizes deferred UPDATE/CLONE offers from selected FIND/CREATE offers. This provisioner never executes upstream callbacks while constructing offers. It selects the best FIND and CREATE candidates by sort key and emits deferred composite callbacks that sub-accept only when the composite offer itself is accepted by the resolver. """ _REFERENCE_SELECTOR_KEYS: ClassVar[tuple[str, ...]] = ( "reference_selector", "reference", "reference_req", ) _TEMPLATE_SELECTOR_KEYS: ClassVar[tuple[str, ...]] = ( "update_template_selector", "template_selector", "update_selector", ) _STRIP_UPDATE_KEYS: ClassVar[set[str]] = { "kind", "uid", "registry", "registry_id", "_registry", } @classmethod def _coerce_selector(cls, value: Any) -> Selector | None: if value is None: return None if isinstance(value, Selector): return value if isinstance(value, Mapping): return Selector(**dict(value)) return None @classmethod def _selector_from_requirement( cls, requirement: "Requirement", *, field_name: str, fallback_keys: tuple[str, ...], ) -> Selector | None: selector = cls._coerce_selector(getattr(requirement, field_name, None)) if selector is not None: return selector extra = requirement.__pydantic_extra__ or {} for key in fallback_keys: selector = cls._coerce_selector(extra.get(key)) if selector is not None: return selector return None @staticmethod def _offer_matches_selector(offer: ProvisionOffer, selector: Selector) -> bool: candidate = offer.candidate if candidate is None: return False try: return selector.matches(candidate) except (TypeError, ValueError): return False @classmethod def _best_offer(cls, offers: Iterable[ProvisionOffer]) -> ProvisionOffer | None: values = list(offers) if not values: return None values.sort(key=lambda offer: offer.sort_key()) return values[0] @classmethod def _sanitize_updates(cls, value: Mapping[str, Any]) -> dict[str, Any]: return { key: val for key, val in dict(value).items() if key not in cls._STRIP_UPDATE_KEYS } @classmethod def _extract_update_payload( cls, offer: ProvisionOffer, *, _ctx: Any = None, ) -> dict[str, Any]: created = offer.callback(_ctx=_ctx) if isinstance(created, EntityTemplate): return cls._sanitize_updates(created.payload.unstructure()) if isinstance(created, Entity): return cls._sanitize_updates(created.unstructure()) if isinstance(created, Mapping): return cls._sanitize_updates(created) candidate = offer.candidate if isinstance(candidate, EntityTemplate): return cls._sanitize_updates(candidate.payload.unstructure()) return {} @staticmethod def _apply_updates_in_place(reference: Any, updates: Mapping[str, Any]) -> Any: if not updates: return reference if hasattr(reference, "update_attrs"): reference.update_attrs(**dict(updates)) return reference for key, value in dict(updates).items(): if hasattr(reference, key): setattr(reference, key, value) return reference @staticmethod def _clone_with_updates( reference: Entity, updates: Mapping[str, Any], *, _ctx: Any = None, ) -> Entity: clone = reference.evolve( uid=_next_provision_uid(_ctx=_ctx), **dict(updates), ) clone.templ_hash = reference.templ_hash return clone @classmethod def _make_offer( cls, *, policy: ProvisionPolicy, find_offer: ProvisionOffer, create_offer: ProvisionOffer, ) -> ProvisionOffer: def _accept(*_, _ctx=None, **__) -> Any: reference = find_offer.callback(_ctx=_ctx) if reference is None: return None if not isinstance(reference, Entity): raise TypeError("UPDATE/CLONE offers require Entity references") updates = cls._extract_update_payload(create_offer, _ctx=_ctx) if policy & ProvisionPolicy.UPDATE: return cls._apply_updates_in_place(reference, updates) if policy & ProvisionPolicy.CLONE: return cls._clone_with_updates(reference, updates, _ctx=_ctx) return None distance = max( find_offer.distance_from_caller, create_offer.distance_from_caller, ) specificity = max( find_offer.specificity, create_offer.specificity, ) priority = max( find_offer.priority, create_offer.priority, ) return ProvisionOffer( origin_id=f"UpdateCloneProvisioner:{policy.name.lower()}", policy=policy, callback=_accept, priority=priority, distance_from_caller=distance, specificity=specificity, candidate=find_offer.candidate, ) @classmethod def get_dependency_offers( cls, requirement: "Requirement", offers: Iterable[ProvisionOffer], ) -> list[ProvisionOffer]: wants_update = bool(requirement.provision_policy & ProvisionPolicy.UPDATE) wants_clone = bool(requirement.provision_policy & ProvisionPolicy.CLONE) if not (wants_update or wants_clone): return [] reference_selector = cls._selector_from_requirement( requirement, field_name="reference_selector", fallback_keys=cls._REFERENCE_SELECTOR_KEYS, ) template_selector = cls._selector_from_requirement( requirement, field_name="update_template_selector", fallback_keys=cls._TEMPLATE_SELECTOR_KEYS, ) if reference_selector is None or template_selector is None: return [] find_offers = [ offer for offer in offers if (offer.policy & ProvisionPolicy.EXISTING) and cls._offer_matches_selector(offer, reference_selector) ] create_offers = [ offer for offer in offers if (offer.policy & ProvisionPolicy.CREATE) and not (offer.policy & ProvisionPolicy.TOKEN) and cls._offer_matches_selector(offer, template_selector) ] best_find = cls._best_offer(find_offers) best_create = cls._best_offer(create_offers) if best_find is None or best_create is None: return [] synthesized: list[ProvisionOffer] = [] if wants_update: synthesized.append( cls._make_offer( policy=ProvisionPolicy.UPDATE, find_offer=best_find, create_offer=best_create, ) ) if wants_clone: synthesized.append( cls._make_offer( policy=ProvisionPolicy.CLONE, find_offer=best_find, create_offer=best_create, ) ) return synthesized
# Backward-compatible alias while feature is still evolving. CloneProvisioner = UpdateCloneProvisioner