Source code for tangl.vm.provision.resolver

from dataclasses import dataclass
import logging
from typing import Any, Iterable, Iterator, Optional, TypeAlias, Union, Self
from uuid import UUID

from tangl.core import (
    Edge,
    Entity,
    EntityGroup,
    EntityTemplate,
    Priority,
    RegistryAware,
    TemplateRegistry,
    Node,
    Selector,
)
from ..dispatch import on_provision
from ..ctx import VmPhaseCtx
from ..runtime.causality import CausalityMode
from ..traversable import TraversableNode
from .materialization import (
    MaterializeRole,
    attach_child,
    materialize_template_entity,
    resolve_story_materialize_hook,
    resolve_story_post_materialize_hook,
    resolve_story_preview_requirement_hook,
)
from .matching import annotate_offer_specificity, summarize_offer
from .preview import Blocker, ViabilityResult
from .provisioner import (
    ProvisionOffer,
    FindProvisioner,
    TemplateProvisioner,
    TokenProvisioner,
    InlineTemplateProvisioner,
    StubProvisioner,
    UpdateCloneProvisioner,
    ProvisionPolicy,
    _template_hash_value,
)
from .requirement import (
    Requirement,
    PT,
    Dependency,
    Affordance,
    Fanout,
    stamp_requirement_resolution,
)
from .scope import (
    build_plan,
    prefix_paths,
    resolve_target_path,
    scope_distance,
    split_path,
    target_context_candidates,
)

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class _StructuralChainPlan:
    target_ctx: str | None
    build_segments: tuple[str, ...] = ()

    @property
    def requires_chain(self) -> bool:
        return bool(self.build_segments)

    @property
    def parent_path(self) -> str | None:
        if not isinstance(self.target_ctx, str) or not self.target_ctx:
            return None
        parent_parts = split_path(self.target_ctx)[:-1]
        if not parent_parts:
            return None
        return ".".join(parent_parts)


[docs] @dataclass class Resolver: """Resolver(location_entity_groups=(), template_scope_groups=()) Gather, rank, preview, and bind provisioning offers for frontier requirements. Why ---- Provisioning is where runtime graph gaps become concrete providers. The resolver centralizes offer discovery and binding so story-layer code can define policy without reimplementing the selection mechanics. Key Features ------------ * Reads candidate providers from ordered entity groups and template scopes. * Merges FIND, CREATE, TOKEN, inline-template, and synthesized UPDATE/CLONE offers into one ranked stream. * Supports preview mode through blocker diagnostics and optional stub linkage. * Writes selected providers and decision metadata back onto dependency requirements. API --- - :meth:`from_ctx` creates a resolver from a compatible vm context. - :meth:`gather_offers` assembles ranked candidates for one requirement. - :meth:`resolve_requirement` and :meth:`resolve_dependency` bind concrete providers. - :meth:`resolve_frontier_node` resolves all open dependencies for one frontier node. - :meth:`preview_requirement` reports viability without mutating topology. """ # order of the groups indicates 'distance' from the requirement carrier, # so resolver can be created per frontier node and then re-used multiple # times to resolve its requirements. # Pushes entire 'scope' discussion into however the frontier wants to define # it and provides a working default with a single group, single distance. location_entity_groups: Iterable[Iterable[Any]] = () template_scope_groups: Iterable[TemplateRegistry] = () # Legacy aliases kept for backward compatibility. entity_groups: Iterable[Iterable[Any]] = () template_groups: Iterable[TemplateRegistry] = () def __post_init__(self) -> None: if not self.location_entity_groups and self.entity_groups: self.location_entity_groups = self.entity_groups if not self.template_scope_groups and self.template_groups: self.template_scope_groups = self.template_groups
[docs] @classmethod def from_ctx(cls, ctx: VmPhaseCtx) -> Self: """Build a resolver from the entity and template groups exposed by ``ctx``.""" if not isinstance(ctx, VmPhaseCtx): raise TypeError( "Resolver.from_ctx() requires a VmPhaseCtx with " "get_location_entity_groups() and get_template_scope_groups()", ) return cls( location_entity_groups=ctx.get_location_entity_groups(), template_scope_groups=ctx.get_template_scope_groups(), )
@staticmethod def _selector_identifier(requirement: Requirement) -> str | None: extra = requirement.__pydantic_extra__ or {} value = extra.get("has_identifier") if isinstance(value, str) and value: return value return None @staticmethod def _is_episode_requirement(requirement: Requirement) -> bool: extra = requirement.__pydantic_extra__ or {} kind = extra.get("has_kind") return isinstance(kind, type) and issubclass(kind, TraversableNode) @staticmethod def _request_ctx_path(_ctx: VmPhaseCtx | None) -> str: if _ctx is None: return "" meta = dict(_ctx.meta or {}) request_ctx_path = meta.get("request_ctx_path") if isinstance(request_ctx_path, str) and request_ctx_path: return request_ctx_path cursor = _ctx.cursor if cursor is None: return "" path = getattr(cursor, "path", None) if isinstance(path, str) and path: return path label = getattr(cursor, "label", None) if isinstance(label, str): return label return "" @classmethod def _stubs_allowed(cls, *, allow_stubs: bool, _ctx: VmPhaseCtx | None = None) -> bool: if allow_stubs: return True if _ctx is None: return False return _ctx.causality_mode == CausalityMode.HARD_DIRTY @staticmethod def _materialize_node( template: EntityTemplate, *, _ctx: VmPhaseCtx | None = None, role: MaterializeRole | str = MaterializeRole.PROVISION_LEAF, story_materialize: Any = None, ) -> Entity: return materialize_template_entity( template, _ctx=_ctx, role=role, story_materialize=story_materialize, ) def _resolve_target_path_for_requirement( self, requirement: Requirement, *, _ctx: VmPhaseCtx | None = None, ) -> str | None: return resolve_target_path( identifier=self._selector_identifier(requirement), request_ctx=self._request_ctx_path(_ctx), authored_path=requirement.authored_path, is_qualified=requirement.is_qualified, is_absolute=requirement.is_absolute, ) def _template_offers_for_requirement( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: request_ctx = self._request_ctx_path(_ctx) graph = _ctx.graph if _ctx is not None else None story_materialize = resolve_story_materialize_hook(_ctx) provisioner = TemplateProvisioner( registries=self.template_scope_groups, request_ctx=request_ctx, graph=graph, story_materialize=story_materialize, materialize_node=materialize_template_entity, ) return list(provisioner.get_dependency_offers(requirement)) def _token_offers_for_requirement( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: if _ctx is None: return [] catalogs = list(_ctx.get_token_catalogs(requirement=requirement)) if not catalogs: return [] return list(TokenProvisioner(catalogs=catalogs).get_dependency_offers(requirement)) def _media_inventory_offers_for_requirement( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: if _ctx is None: return [] inventories = list(_ctx.get_media_inventories(requirement=requirement)) if not inventories: return [] from tangl.media.media_resource.media_provisioning import MediaInventoryProvisioner return list(MediaInventoryProvisioner(inventories=inventories).get_dependency_offers(requirement)) def _media_spec_offers_for_requirement( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: from tangl.media.media_resource.media_provisioning import MediaSpecProvisioner return list( MediaSpecProvisioner(graph=_ctx.graph if _ctx is not None else None).get_dependency_offers( requirement, _ctx=_ctx, ) ) def inspect_template_dependency_offers( self, requirement: Requirement[PT], *, allow_stubs: bool = False, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: """Return template-only dependency offers using resolver matching semantics.""" offers = self._template_offers_for_requirement(requirement, _ctx=_ctx) offers = [annotate_offer_specificity(requirement, offer) for offer in offers] stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx) def _allowed(offer: ProvisionOffer) -> bool: if offer.policy & ProvisionPolicy.STUB: return stubs_allowed return bool(offer.policy & requirement.provision_policy) offers = [offer for offer in offers if _allowed(offer)] offers.sort(key=lambda v: v.sort_key()) return offers def _iter_templates(self) -> Iterator[EntityTemplate]: for registry in self.template_scope_groups: if not isinstance(registry, TemplateRegistry): continue yield from registry.values() def _existing_offers_for_requirement( self, requirement: Requirement[PT], ) -> list[ProvisionOffer]: offers: list[ProvisionOffer] = [] # If there are more than 20 groups, the distance-based priorities will slip # from the NORMAL tier to LATE, maybe just collapse everything after a few scopes # out but not global into "far away" tier and then include globals as the last group. for i, entity_group in enumerate(self.location_entity_groups): offers.extend(FindProvisioner(values=entity_group, distance=i).get_dependency_offers(requirement)) return offers def _inline_template_offers_for_requirement( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: return list( InlineTemplateProvisioner( materialize_node=materialize_template_entity, story_materialize=resolve_story_materialize_hook(_ctx), ).iter_dependency_offers(requirement) ) def _discover_requirement_offers( self, requirement: Requirement[PT], *, preferred_offers: Iterable[ProvisionOffer] = (), _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: offers: list[ProvisionOffer] = list(preferred_offers or []) offers.extend(self._existing_offers_for_requirement(requirement)) offers.extend(self._template_offers_for_requirement(requirement, _ctx=_ctx)) offers.extend(self._token_offers_for_requirement(requirement, _ctx=_ctx)) offers.extend(self._media_inventory_offers_for_requirement(requirement, _ctx=_ctx)) offers.extend(self._media_spec_offers_for_requirement(requirement, _ctx=_ctx)) offers.extend(self._inline_template_offers_for_requirement(requirement, _ctx=_ctx)) offers.extend(UpdateCloneProvisioner.get_dependency_offers(requirement=requirement, offers=offers)) return offers def _discover_fanout_offers( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: offers = self._existing_offers_for_requirement(requirement) offers.extend(self._template_offers_for_requirement(requirement, _ctx=_ctx)) return offers @staticmethod def _apply_resolve_override( requirement: Requirement[PT], offers: list[ProvisionOffer], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: if _ctx is None: return offers from tangl.vm.dispatch import do_resolve try: resolved_offers = do_resolve(requirement=requirement, offers=offers, ctx=_ctx) except TypeError as exc: requirement.resolution_reason = "override_invalid" requirement.resolution_meta = {"error": str(exc)} return offers if resolved_offers is None: return offers return list(resolved_offers) @staticmethod def _annotate_offers( requirement: Requirement[PT], offers: Iterable[ProvisionOffer], ) -> list[ProvisionOffer]: return [annotate_offer_specificity(requirement, offer) for offer in offers] def _filter_requirement_offers( self, requirement: Requirement[PT], offers: list[ProvisionOffer], *, allow_stubs: bool = False, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx) def _allowed(offer: ProvisionOffer) -> bool: if offer.policy & ProvisionPolicy.STUB: return stubs_allowed return bool(offer.policy & requirement.provision_policy) filtered = [offer for offer in offers if _allowed(offer)] if stubs_allowed and not filtered: filtered.extend(StubProvisioner.get_dependency_offers(requirement)) return filtered @staticmethod def _filter_fanout_offers(offers: list[ProvisionOffer]) -> list[ProvisionOffer]: def _allowed(offer: ProvisionOffer) -> bool: if offer.policy & ProvisionPolicy.STUB: return False if offer.policy & ProvisionPolicy.TOKEN: return False if offer.policy & ProvisionPolicy.UPDATE: return False if offer.policy & ProvisionPolicy.CLONE: return False return bool(offer.policy & (ProvisionPolicy.EXISTING | ProvisionPolicy.CREATE)) return [offer for offer in offers if _allowed(offer)] @staticmethod def _sort_offers(offers: list[ProvisionOffer]) -> list[ProvisionOffer]: offers.sort(key=lambda v: v.sort_key()) return offers @staticmethod def _dedupe_offers_by_candidate_target(offers: list[ProvisionOffer]) -> list[ProvisionOffer]: deduped: list[ProvisionOffer] = [] seen_keys: set[tuple[UUID, str | None]] = set() for offer in offers: candidate = offer.candidate candidate_uid = getattr(candidate, "uid", None) if not isinstance(candidate_uid, UUID): deduped.append(offer) continue key = (candidate_uid, offer.target_ctx) if key in seen_keys: continue seen_keys.add(key) deduped.append(offer) return deduped
[docs] def gather_offers( self, requirement: Requirement[PT], *, allow_stubs: bool = False, preferred_offers: Iterable[ProvisionOffer] = (), _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: """Return ranked offers that are currently admissible for ``requirement``. Existing providers, template-driven candidates, token catalogs, inline templates, synthesized UPDATE or CLONE offers, and optional dispatch overrides all participate in this one ordered result set. """ offers = self._discover_requirement_offers( requirement, preferred_offers=preferred_offers, _ctx=_ctx, ) offers = self._apply_resolve_override(requirement, offers, _ctx=_ctx) offers = self._annotate_offers(requirement, offers) offers = self._filter_requirement_offers( requirement, offers, allow_stubs=allow_stubs, _ctx=_ctx, ) return self._sort_offers(offers)
@staticmethod def _iter_local_affordance_providers(frontier: Node | None) -> Iterable[RegistryAware]: """Yield unique providers from affordances pushed out of ``frontier``. Contract: frontier is affordance ``predecessor`` and provider/resource is affordance ``successor``. """ if frontier is None: return seen_ids: set[UUID] = set() if hasattr(frontier, "edges_out"): affordances = frontier.edges_out(Selector(has_kind=Affordance)) else: graph = getattr(frontier, "graph", None) if graph is None: return affordances = graph.find_edges(Selector(has_kind=Affordance, predecessor=frontier)) for affordance in affordances: provider = affordance.successor if provider is None: # Backward compatibility for stale serialized affordances. provider = affordance.provider if not isinstance(provider, RegistryAware): continue if provider.uid in seen_ids: continue seen_ids.add(provider.uid) yield provider def _linked_affordance_offers( self, *, requirement: Requirement[PT], frontier: Node | None, ) -> list[ProvisionOffer]: """Build EXISTING offers from already-linked local affordance providers.""" offers: list[ProvisionOffer] = [] for provider in self._iter_local_affordance_providers(frontier): try: if not requirement.satisfied_by(provider): continue except Exception as exc: logger.debug( "requirement.satisfied_by failed for requirement=%s provider=%s", requirement, provider, exc_info=exc, ) continue offers.append( ProvisionOffer( origin_id="LinkedAffordance", policy=ProvisionPolicy.EXISTING, priority=Priority.EARLY, distance_from_caller=0, candidate=provider, callback=lambda *_, _provider=provider, **__: _provider, ) ) return offers @staticmethod def _clone_requirement(requirement: Requirement[PT]) -> Requirement[PT]: clone = requirement.model_copy(deep=True) clone.provider_id = None clone.unsatisfiable = None clone.unambiguously_resolved = None clone.selected_offer_policy = None clone.resolved_step = None clone.resolved_cursor_id = None clone.resolution_reason = None clone.resolution_meta = None return clone @staticmethod def _fanout_edge_tag(fanout: Fanout[Any]) -> str: return f"fanout:{fanout.uid}" @classmethod def _fanout_affordance_tags(cls, fanout: Fanout[Any]) -> set[str]: return {"dynamic", "fanout", cls._fanout_edge_tag(fanout)} def _iter_fanout_affordances(self, fanout: Fanout[Any]) -> Iterator[Affordance[Any]]: source = fanout.predecessor if source is None or fanout.registry is None: return marker = self._fanout_edge_tag(fanout) for affordance in source.edges_out(Selector(has_kind=Affordance)): tags = getattr(affordance, "tags", set()) or set() if marker not in tags: continue yield affordance def _clear_fanout_affordances( self, fanout: Fanout[Any], *, _ctx: VmPhaseCtx | None = None, ) -> None: graph = fanout.registry if graph is None: return for affordance in list(self._iter_fanout_affordances(fanout)): graph.remove(affordance.uid, _ctx=_ctx) def gather_fanout_offers( self, requirement: Requirement[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[ProvisionOffer]: """Return all admissible EXISTING/CREATE offers for ``requirement``.""" offers = self._discover_fanout_offers(requirement, _ctx=_ctx) offers = self._apply_resolve_override(requirement, offers, _ctx=_ctx) offers = self._annotate_offers(requirement, offers) offers = self._filter_fanout_offers(offers) offers = self._sort_offers(offers) return self._dedupe_offers_by_candidate_target(offers) def _accept_fanout_offer( self, *, requirement: Requirement[PT], offer: ProvisionOffer, graph: Any, _ctx: VmPhaseCtx | None = None, ) -> PT | None: def _existing_instance_for_template() -> PT | None: candidate = offer.candidate if not isinstance(candidate, EntityTemplate): return None candidate_label = candidate.get_label() candidate_hash = _template_hash_value(candidate) for entity_group in self.location_entity_groups: for existing in entity_group: if not isinstance(existing, RegistryAware): continue if existing.registry is not graph: continue existing_path = getattr(existing, "path", None) if isinstance(existing_path, str) and existing_path == candidate_label: return existing if getattr(existing, "templ_hash", None) == candidate_hash: return existing return None target_ctx = offer.target_ctx if isinstance(target_ctx, str) and target_ctx: existing = self._find_existing_path_node(graph, target_ctx) if isinstance(existing, RegistryAware): try: if requirement.satisfied_by(existing): return existing except Exception as exc: logger.debug( "fanout existing-path reuse failed for requirement=%s target_ctx=%s", requirement, target_ctx, exc_info=exc, ) if offer.policy & ProvisionPolicy.CREATE: existing = _existing_instance_for_template() if existing is not None: try: if requirement.satisfied_by(existing): return existing except Exception: pass return None provider = offer.callback(_ctx=_ctx) if provider is None: return None if not isinstance(provider, RegistryAware): raise TypeError("Fanout providers must be RegistryAware") parent = self._execute_build_chain( requirement=requirement, offer=offer, graph=graph, _ctx=_ctx, ) provider = self._attach_and_finalize_provider( requirement=requirement, offer=offer, provider=provider, graph=graph, parent=parent, _ctx=_ctx, ) requirement._validate_satisfied_by(provider) return provider def resolve_fanout( self, fanout: Fanout[PT], *, _ctx: VmPhaseCtx | None = None, ) -> list[PT]: """Resolve and publish every eligible provider for ``fanout``.""" graph = fanout.registry if graph is None: raise ValueError("Fanout must be bound to a registry before resolution") offers = self.gather_fanout_offers(fanout.requirement, _ctx=_ctx) providers: list[PT] = [] accepted_offers: list[ProvisionOffer] = [] seen_provider_ids: set[UUID] = set() for offer in offers: try: provider = self._accept_fanout_offer( requirement=fanout.requirement, offer=offer, graph=graph, _ctx=_ctx, ) except (TypeError, ValueError, RuntimeError) as exc: logger.debug( "Skipping invalid fanout offer origin=%s for fanout=%s", offer.origin_id, fanout, exc_info=exc, ) continue if provider is None: continue if provider.uid in seen_provider_ids: continue seen_provider_ids.add(provider.uid) providers.append(provider) accepted_offers.append(offer) fanout.set_providers(providers, _ctx=_ctx) self._clear_fanout_affordances(fanout, _ctx=_ctx) source = fanout.predecessor if source is not None: affordance_tags = self._fanout_affordance_tags(fanout) for provider in providers: affordance = Affordance( registry=graph, predecessor_id=source.uid, requirement=self._clone_requirement(fanout.requirement), tags=set(affordance_tags), ) affordance.set_provider(provider, _ctx=_ctx) requirement = fanout.requirement stamp_requirement_resolution(requirement, _ctx=_ctx) requirement.unsatisfiable = False requirement.unambiguously_resolved = len(providers) == 1 requirement.selected_offer_policy = accepted_offers[0].policy if accepted_offers else None if providers: requirement.resolution_reason = "resolved" else: requirement.resolution_reason = "provider_none" if offers else "no_offers" requirement.resolution_meta = { "provider_ids": [str(provider.uid) for provider in providers], "selected": [summarize_offer(offer) for offer in accepted_offers[:5]], "alternatives": [summarize_offer(offer) for offer in offers[:5]], } return providers def _resolve_requirement_offer( self, requirement: Requirement[PT], *, allow_stubs: bool = False, preferred_offers: Iterable[ProvisionOffer] = (), _ctx: VmPhaseCtx | None = None, ) -> tuple[Optional[PT], Optional[ProvisionOffer], list[ProvisionOffer]]: offers = self.gather_offers( requirement, allow_stubs=allow_stubs, preferred_offers=preferred_offers, _ctx=_ctx, ) if len(offers) == 0: # No valid offers available requirement.unsatisfiable = True requirement.unambiguously_resolved = None requirement.selected_offer_policy = None requirement.resolved_step = None requirement.resolved_cursor_id = None if requirement.resolution_reason is None: requirement.resolution_reason = "no_offers" requirement.resolution_meta = requirement.resolution_meta or { "alternatives": [], } return None, None, offers else: requirement.unsatisfiable = False requirement.unambiguously_resolved = len(offers) == 1 for selected_offer in offers: requirement.selected_offer_policy = selected_offer.policy requirement.resolution_reason = "resolved" requirement.resolution_meta = { "selected": summarize_offer(selected_offer), "alternatives": [summarize_offer(offer) for offer in offers[:5]], } provider = selected_offer.callback(_ctx=_ctx) if provider is not None: return provider, selected_offer, offers requirement.unsatisfiable = True requirement.selected_offer_policy = None requirement.resolution_reason = "provider_none" return None, None, offers
[docs] def resolve_requirement( self, requirement: Requirement[PT], *, allow_stubs: bool = False, preferred_offers: Iterable[ProvisionOffer] = (), _ctx: VmPhaseCtx | None = None, ) -> Optional[PT]: """Resolve and materialize one provider for ``requirement`` if possible.""" provider, _, _ = self._resolve_requirement_offer( requirement=requirement, allow_stubs=allow_stubs, preferred_offers=preferred_offers, _ctx=_ctx, ) return provider
@staticmethod def _matches_identifier(candidate: Any, identifier: str) -> bool: has_identifier = getattr(candidate, "has_identifier", None) if callable(has_identifier): try: return bool(has_identifier(identifier)) except (TypeError, ValueError, AttributeError): return False return getattr(candidate, "label", None) == identifier def _find_existing_path_node(self, graph: Any, path: str) -> Any | None: segments = split_path(path) if not segments: return None current = None for index, segment in enumerate(segments): if index == 0: current = self._find_top_level_node(graph, segment) else: current = self._find_child_node(current, segment) if current is None: return None return current def _find_top_level_node(self, graph: Any, segment: str) -> Any | None: values = getattr(graph, "values", None) if not callable(values): return None for candidate in values(): if getattr(candidate, "parent", None) is not None: continue if self._matches_identifier(candidate, segment): return candidate return None def _find_child_node(self, parent: Any, segment: str) -> Any | None: if parent is None: return None children = getattr(parent, "children", None) if callable(children): for candidate in children(): if self._matches_identifier(candidate, segment): return candidate return None members = getattr(parent, "members", None) if callable(members): for candidate in members(): if self._matches_identifier(candidate, segment): return candidate return None return None def _find_structural_template(self, *, identifier: str, target_ctx: str) -> EntityTemplate | None: best: tuple[int, int, EntityTemplate] | None = None for template in self._iter_templates(): if not template.has_identifier(identifier): continue if not template.has_payload_kind(TraversableNode): continue if not template.admitted_to(target_ctx): continue key = ( scope_distance(template.admission_scope, target_ctx), int(getattr(template, "seq", 0)), ) if best is None or key < (best[0], best[1]): best = (key[0], key[1], template) if best is None: return None return best[2] def _chain_plan_for_offer( self, requirement: Requirement, *, offer: ProvisionOffer, _ctx: VmPhaseCtx | None = None, ) -> _StructuralChainPlan: offer_target_ctx = offer.target_ctx target_ctx = ( offer_target_ctx if isinstance(offer_target_ctx, str) and offer_target_ctx else self._resolve_target_path_for_requirement(requirement, _ctx=_ctx) ) return _StructuralChainPlan( target_ctx=target_ctx, build_segments=tuple(offer.build_plan or ()), ) @staticmethod def _chain_plan_for_target_ctx(target_ctx: str | None, *, graph: Any) -> _StructuralChainPlan: return _StructuralChainPlan( target_ctx=target_ctx, build_segments=tuple(build_plan(target_ctx, graph)), ) def _chain_paths_resolvable( self, *, build_segments: list[str], target_ctx: str, graph: Any, ) -> bool: if not build_segments: return True if graph is None: return False target_prefix_paths = prefix_paths(target_ctx)[:-1] prefix_segments = split_path(target_ctx)[:-1] if not prefix_segments: return False segment_index = 0 plan_index = 0 while segment_index < len(prefix_segments) and plan_index < len(build_segments): if prefix_segments[segment_index] == build_segments[plan_index]: segment_path = target_prefix_paths[segment_index] if self._find_existing_path_node(graph, segment_path) is None: template = self._find_structural_template( identifier=segment_path, target_ctx=target_ctx, ) if template is None: return False plan_index += 1 segment_index += 1 return plan_index == len(build_segments) def _chain_plan_is_resolvable( self, plan: _StructuralChainPlan, *, graph: Any, ) -> bool: if not plan.requires_chain: return True if graph is None or plan.target_ctx is None: return False return self._chain_paths_resolvable( build_segments=list(plan.build_segments), target_ctx=plan.target_ctx, graph=graph, ) def _parent_for_chain_plan( self, plan: _StructuralChainPlan, *, graph: Any, ) -> Any | None: if not plan.parent_path: return None return self._find_existing_path_node(graph, plan.parent_path) def _find_chain_segment_node( self, *, graph: Any, current: Any | None, segment: str, ) -> Any | None: if current is None: return self._find_top_level_node(graph, segment) return self._find_child_node(current, segment) def _materialize_structural_segment( self, *, segment_path: str, target_ctx: str, graph: Any, _ctx: VmPhaseCtx | None = None, ) -> tuple[RegistryAware, EntityTemplate]: template = self._find_structural_template( identifier=segment_path, target_ctx=target_ctx, ) if template is None: raise ValueError(f"Missing structural template for segment {segment_path!r}") created = self._materialize_node( template, _ctx=_ctx, role=MaterializeRole.PROVISION_INTERMEDIATE, ) if not isinstance(created, RegistryAware): raise TypeError("Structural template materialization must yield RegistryAware nodes") if created.registry is not graph: graph.add(created, _ctx=_ctx) return created, template @staticmethod def _offer_materializes_runtime_entity(offer: ProvisionOffer) -> bool: return bool( offer.policy & (ProvisionPolicy.CREATE | ProvisionPolicy.CLONE) ) or isinstance(offer.candidate, EntityTemplate) @staticmethod def _offer_template(offer: ProvisionOffer) -> EntityTemplate | None: candidate = offer.candidate if isinstance(candidate, EntityTemplate): return candidate return None @staticmethod def _request_ctx_for_node(node: Any, fallback: str | None = None) -> str | None: path = getattr(node, "path", None) if isinstance(path, str) and path: return path label = getattr(node, "label", None) if isinstance(label, str) and label: return label if isinstance(fallback, str) and fallback: return fallback return None def _make_node_ctx( self, *, graph: Any, node: RegistryAware, _ctx: VmPhaseCtx | None = None, request_ctx_path: str | None = None, ) -> VmPhaseCtx: if _ctx is None: raise TypeError( "Nested runtime validation requires a typed context with derive()", ) meta_overrides = None if isinstance(request_ctx_path, str) and request_ctx_path: meta_overrides = {"request_ctx_path": request_ctx_path} return _ctx.derive( cursor_id=node.uid, graph=graph, meta_overrides=meta_overrides, ) def _post_materialize_entity( self, *, entity: RegistryAware, template: EntityTemplate | None, role: MaterializeRole, _ctx: VmPhaseCtx | None = None, ) -> None: story_post_materialize = resolve_story_post_materialize_hook(_ctx) if callable(story_post_materialize): story_post_materialize( template=template, entity=entity, role=role, _ctx=_ctx, ) def _validate_runtime_entity( self, *, entity: RegistryAware, graph: Any, _ctx: VmPhaseCtx | None = None, request_ctx_path: str | None = None, ) -> bool: if not isinstance(entity, TraversableNode): return True node_ctx = self._make_node_ctx( graph=graph, node=entity, _ctx=_ctx, request_ctx_path=self._request_ctx_for_node(entity, fallback=request_ctx_path), ) return Resolver.from_ctx(node_ctx).resolve_frontier_node( entity, allow_stubs=self._stubs_allowed(allow_stubs=False, _ctx=_ctx), _ctx=node_ctx, ) def _existing_target_provider( self, *, requirement: Requirement, offer: ProvisionOffer, graph: Any, ) -> PT | None: target_ctx = offer.target_ctx if not isinstance(target_ctx, str) or not target_ctx: return None existing = self._find_existing_path_node(graph, target_ctx) if not isinstance(existing, RegistryAware): return None try: if requirement.satisfied_by(existing): return existing except Exception as exc: logger.debug( "existing target reuse failed for requirement=%s target_ctx=%s", requirement, target_ctx, exc_info=exc, ) return None def _attach_and_finalize_provider( self, *, requirement: Requirement[PT], offer: ProvisionOffer, provider: PT, graph: Any, parent: Any | None, _ctx: VmPhaseCtx | None = None, ) -> PT: reused = self._existing_target_provider( requirement=requirement, offer=offer, graph=graph, ) if reused is not None: provider = reused if not isinstance(provider, RegistryAware): raise TypeError("Provisioned providers must be RegistryAware") if provider.registry is not graph: graph.add(provider, _ctx=_ctx) if getattr(provider, "parent", None) is not parent: attach_child(parent, provider) if self._offer_materializes_runtime_entity(offer): self._post_materialize_entity( entity=provider, template=self._offer_template(offer), role=MaterializeRole.PROVISION_LEAF, _ctx=_ctx, ) if not self._validate_runtime_entity( entity=provider, graph=graph, _ctx=_ctx, request_ctx_path=offer.target_ctx, ): raise RuntimeError("Provisioned provider failed runtime validation") return provider def _execute_build_chain( self, *, requirement: Requirement, offer: ProvisionOffer, graph: Any, _ctx: VmPhaseCtx | None = None, ) -> Any | None: plan = self._chain_plan_for_offer(requirement, offer=offer, _ctx=_ctx) if not plan.requires_chain: return self._parent_for_chain_plan(plan, graph=graph) if plan.target_ctx is None: raise ValueError("Cannot execute structural chain without a target path") if not self._chain_plan_is_resolvable(plan, graph=graph): raise ValueError("structural build chain is not resolvable") segments = split_path(plan.target_ctx) parent_segments = segments[:-1] if not parent_segments: return None current: Any | None = None for index, segment in enumerate(parent_segments): existing = self._find_chain_segment_node( graph=graph, current=current, segment=segment, ) if existing is not None: current = existing continue created, template = self._materialize_structural_segment( segment_path=".".join(segments[: index + 1]), target_ctx=plan.target_ctx, graph=graph, _ctx=_ctx, ) attach_child(current, created) self._post_materialize_entity( entity=created, template=template, role=MaterializeRole.PROVISION_INTERMEDIATE, _ctx=_ctx, ) if not self._validate_runtime_entity( entity=created, graph=graph, _ctx=_ctx, request_ctx_path=".".join(segments[: index + 1]), ): raise RuntimeError("Structural chain segment failed runtime validation") current = created return current def _bind_dependency_provider( self, *, dependency: Dependency, provider: PT, offer: ProvisionOffer, parent: Any | None = None, _ctx: VmPhaseCtx | None = None, ) -> bool: provider = self._attach_and_finalize_provider( requirement=dependency.requirement, offer=offer, provider=provider, graph=dependency.registry, parent=parent, _ctx=_ctx, ) dependency.set_provider(provider, _ctx=_ctx) return True
[docs] def preview_requirement( self, requirement: Requirement, *, allow_stubs: bool = False, preferred_offers: Iterable[ProvisionOffer] = (), max_depth: int = 8, _ctx: VmPhaseCtx | None = None, ) -> ViabilityResult: """Return a non-mutating viability preview for one requirement.""" # depth/visited are reserved for future recursive chain variants. _ = max_depth offers = self.gather_offers( requirement, allow_stubs=allow_stubs, preferred_offers=preferred_offers, _ctx=_ctx, ) graph = _ctx.graph if _ctx is not None else None preview = self._preview_viable_offer( requirement=requirement, offers=offers, graph=graph, _ctx=_ctx, ) if preview is not None: return preview blockers = self._diagnose_blockers(requirement=requirement, _ctx=_ctx) return ViabilityResult(viable=False, chain=[], scope_distance=0, blockers=blockers)
@staticmethod def _dependency_blocker( dependency: Dependency, *, preview: ViabilityResult, ) -> Blocker: return Blocker( reason="dependency_unprovisionable", context={ "dependency_id": str(dependency.uid), "requirement": repr(dependency.requirement), "blockers": [ { "reason": blocker.reason, "context": dict(blocker.context), } for blocker in preview.blockers ], }, ) def preview_frontier_node( self, node: Node, *, allow_stubs: bool = False, _ctx: VmPhaseCtx | None = None, ) -> ViabilityResult: """Return non-mutating provisionability for one frontier node.""" for dependency in node.edges_out(Selector(has_kind=Dependency)): if dependency.satisfied: continue if not dependency.requirement.hard_requirement: continue preview = self.preview_requirement( dependency.requirement, allow_stubs=allow_stubs, _ctx=_ctx, ) if not preview.viable: return ViabilityResult( viable=False, chain=list(preview.chain), scope_distance=preview.scope_distance, blockers=[self._dependency_blocker(dependency, preview=preview)], ) if isinstance(node, TraversableNode) and node.is_container: if not node.enterable(ctx=_ctx): return ViabilityResult( viable=False, blockers=[ Blocker( reason="entry_unavailable", context={"node_id": str(node.uid)}, ) ], ) target = node.resolve_entry(ctx=_ctx) target_ctx = _ctx.derive(cursor_id=target.uid) if _ctx is not None else None target_resolver = Resolver.from_ctx(target_ctx) if target_ctx is not None else self return target_resolver.preview_frontier_node( target, allow_stubs=allow_stubs, _ctx=target_ctx, ) return ViabilityResult(viable=True) def _preview_viable_offer( self, *, requirement: Requirement, offers: Iterable[ProvisionOffer], graph: Any, _ctx: VmPhaseCtx | None = None, ) -> ViabilityResult | None: story_blockers: list[Blocker] = [] preview_requirement = resolve_story_preview_requirement_hook(_ctx) for offer in offers: plan = self._chain_plan_for_offer(requirement, offer=offer, _ctx=_ctx) if not self._chain_plan_is_resolvable(plan, graph=graph): continue if callable(preview_requirement): blockers = preview_requirement( requirement=requirement, offer=offer, graph=graph, _ctx=_ctx, ) if blockers: story_blockers.extend(list(blockers)) continue return ViabilityResult( viable=True, chain=list(plan.build_segments), scope_distance=offer.scope_distance, blockers=[], ) if story_blockers: return ViabilityResult( viable=False, chain=[], scope_distance=0, blockers=story_blockers, ) return None @staticmethod def _blocker_context( *, identifier: str | None, target_ctxs: list[str], include_candidates: bool = False, **extra: Any, ) -> dict[str, Any]: context: dict[str, Any] = { "identifier": identifier, "target_ctx": target_ctxs[0] if target_ctxs else None, } if include_candidates: context["target_ctx_candidates"] = list(target_ctxs) context.update(extra) return context def _blocker( self, reason: str, *, identifier: str | None, target_ctxs: list[str], include_candidates: bool = False, **context: Any, ) -> Blocker: return Blocker( reason=reason, context=self._blocker_context( identifier=identifier, target_ctxs=target_ctxs, include_candidates=include_candidates, **context, ), ) def _target_contexts_for_requirement( self, requirement: Requirement, *, _ctx: VmPhaseCtx | None = None, ) -> list[str]: identifier = self._selector_identifier(requirement) target_ctxs = target_context_candidates( identifier=identifier, request_ctx=self._request_ctx_path(_ctx), authored_path=requirement.authored_path, is_qualified=requirement.is_qualified, is_absolute=requirement.is_absolute, ) if not target_ctxs: target_ctx = self._resolve_target_path_for_requirement(requirement, _ctx=_ctx) if target_ctx is not None: target_ctxs = [target_ctx] return target_ctxs @staticmethod def _identity_template_candidates( requirement: Requirement, templates: list[EntityTemplate], ) -> list[EntityTemplate]: return [ template for template in templates if TemplateProvisioner._matches_non_identifier_criteria(requirement, template) and TemplateProvisioner._matches_template_identity(requirement, template) ] @staticmethod def _kind_template_candidates( requirement: Requirement, templates: list[EntityTemplate], ) -> list[EntityTemplate]: kind = (requirement.__pydantic_extra__ or {}).get("has_kind") if not isinstance(kind, type): return [] return [template for template in templates if template.has_kind(kind)] @staticmethod def _admitted_template_candidates( identity_candidates: list[EntityTemplate], target_ctxs: list[str], ) -> list[tuple[EntityTemplate, str]]: return [ (template, target_ctx) for template in identity_candidates for target_ctx in target_ctxs if template.admitted_to(target_ctx) ] def _diagnose_identity_blockers( self, *, requirement: Requirement, templates: list[EntityTemplate], target_ctxs: list[str], ) -> list[Blocker] | None: identifier = self._selector_identifier(requirement) if self._identity_template_candidates(requirement, templates): return None if self._kind_template_candidates(requirement, templates): return [ self._blocker( "name_mismatch", identifier=identifier, target_ctxs=target_ctxs, ) ] return [ self._blocker( "no_template", identifier=identifier, target_ctxs=target_ctxs, ) ] def _diagnose_scope_blockers( self, *, requirement: Requirement, identity_candidates: list[EntityTemplate], target_ctxs: list[str], ) -> list[Blocker] | None: if self._admitted_template_candidates(identity_candidates, target_ctxs): return None return [ self._blocker( "scope_rejected", identifier=self._selector_identifier(requirement), target_ctxs=target_ctxs, include_candidates=True, scopes=[template.admission_scope for template in identity_candidates], ) ] def _diagnose_episode_scope_policy_blockers( self, *, requirement: Requirement, admitted_candidates: list[tuple[EntityTemplate, str]], target_ctxs: list[str], _ctx: VmPhaseCtx | None = None, ) -> list[Blocker] | None: if not self._is_episode_requirement(requirement): return None identifier = self._selector_identifier(requirement) distances = [ scope_distance(template.admission_scope, target_ctx) for template, target_ctx in admitted_candidates ] if not requirement.is_qualified and distances and min(distances) > 0: return [ self._blocker( "scope_rejected", identifier=identifier, target_ctxs=target_ctxs, include_candidates=True, distances=distances, policy="unqualified_episode_requires_distance_0", ) ] if not requirement.is_qualified or not target_ctxs: return None graph = _ctx.graph if _ctx is not None else None unresolved: list[dict[str, Any]] = [] for _template, target_ctx in admitted_candidates: plan = self._chain_plan_for_target_ctx(target_ctx, graph=graph) if self._chain_plan_is_resolvable(plan, graph=graph): continue unresolved.append( { "target_ctx": target_ctx, "chain": list(plan.build_segments), } ) if unresolved and len(unresolved) == len(admitted_candidates): return [ self._blocker( "chain_unresolvable", identifier=identifier, target_ctxs=target_ctxs, include_candidates=True, chains=unresolved, ) ] return None def _diagnose_blockers( self, *, requirement: Requirement, _ctx: VmPhaseCtx | None = None, ) -> list[Blocker]: target_ctxs = self._target_contexts_for_requirement(requirement, _ctx=_ctx) templates = list(self._iter_templates()) blockers = self._diagnose_identity_blockers( requirement=requirement, templates=templates, target_ctxs=target_ctxs, ) if blockers is not None: return blockers identity_candidates = self._identity_template_candidates(requirement, templates) blockers = self._diagnose_scope_blockers( requirement=requirement, identity_candidates=identity_candidates, target_ctxs=target_ctxs, ) if blockers is not None: return blockers admitted_candidates = self._admitted_template_candidates(identity_candidates, target_ctxs) blockers = self._diagnose_episode_scope_policy_blockers( requirement=requirement, admitted_candidates=admitted_candidates, target_ctxs=target_ctxs, _ctx=_ctx, ) if blockers is not None: return blockers return [ self._blocker( "no_template", identifier=self._selector_identifier(requirement), target_ctxs=target_ctxs, include_candidates=True, ) ]
[docs] def resolve_dependency( self, dependency: Dependency[PT], *, allow_stubs: bool = False, _ctx: VmPhaseCtx | None = None, ) -> bool: """Resolve one dependency edge and bind its provider into the graph.""" preferred_offers = self._linked_affordance_offers( requirement=dependency.requirement, frontier=dependency.predecessor, ) provider, selected_offer, _offers = self._resolve_requirement_offer( requirement=dependency.requirement, allow_stubs=allow_stubs, preferred_offers=preferred_offers, _ctx=_ctx, ) if provider is None: return False if selected_offer is None: dependency.requirement.resolution_reason = "provider_none" return False if selected_offer.policy & ProvisionPolicy.STUB: stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx) if not stubs_allowed: raise AssertionError( "Invariant violation: STUB offer selected while stubs are not allowed" ) # Debug/preview mode: STUB linking intentionally bypasses requirement # predicate fidelity to keep traversal running under illegal traversal. if not isinstance(provider, RegistryAware): raise TypeError( "STUB provider must be RegistryAware for dependency linkage" ) if provider.registry is not dependency.registry: dependency.registry.add(provider, _ctx=_ctx) dependency.requirement.provider_id = provider.uid stamp_requirement_resolution(dependency.requirement, _ctx=_ctx) dependency.requirement.resolution_reason = "stub_link_resolved" if _ctx is not None: _ctx.escalate_to_hard_dirty("stub_link_accepted", step_id=str(dependency.uid)) Edge.set_successor(dependency, provider, _ctx=_ctx) return True parent = None try: parent = self._execute_build_chain( requirement=dependency.requirement, offer=selected_offer, graph=dependency.registry, _ctx=_ctx, ) self._bind_dependency_provider( dependency=dependency, provider=provider, offer=selected_offer, parent=parent, _ctx=_ctx, ) except (ValueError, TypeError, RuntimeError): dependency.requirement.resolution_reason = "provider_rejected" return False if dependency.requirement.resolution_reason is None: dependency.requirement.resolution_reason = "resolved" return True
[docs] def resolve_frontier_node( self, node: Node, *, allow_stubs: bool = False, _ctx: VmPhaseCtx | None = None, ) -> bool: """Resolve all open dependencies on ``node`` and verify traversal viability.""" # Note this is not unsatisfied deps, it's anyone without a provider # satisfied could mean not a hard req open_deps = list(node.edges_out(Selector(has_kind=Dependency, provider=None))) for dep in open_deps: self.resolve_dependency(dep, allow_stubs=allow_stubs, _ctx=_ctx) graph = getattr(node, "graph", None) if not bool(getattr(graph, "frozen_shape", False)): for fanout in list(node.edges_out(Selector(has_kind=Fanout))): self.resolve_fanout(fanout, _ctx=_ctx) # Find unsat blockers with no provider and a hard-requirement unsatisfied_deps = node.edges_out(Selector(has_kind=Dependency, satisfied=False)) if next(unsatisfied_deps, None) is not None: return False # Container frontier viability delegates to the active entry target. # Whole-container route-to-sink analysis is a static-analysis concern, # not a runtime availability gate. if isinstance(node, TraversableNode) and node.is_container: if not node.enterable(ctx=_ctx): return False target = node.resolve_entry(ctx=_ctx) target_ctx = _ctx.derive(cursor_id=target.uid) if _ctx is not None else None target_resolver = Resolver.from_ctx(target_ctx) if target_ctx is not None else self preview = target_resolver.preview_frontier_node( target, allow_stubs=allow_stubs, _ctx=target_ctx, ) if not preview.viable: return False return True
# Register the resolution process with dispatch so it will be invoked from the phase bus @on_provision def provision_node(caller: Node, *, ctx, allow_stubs: bool = False): Resolver.from_ctx(ctx).resolve_frontier_node( node=caller, allow_stubs=allow_stubs, _ctx=ctx, )