from dataclasses import dataclass
import logging
from typing import Any, Iterable, Iterator, Optional, TypeAlias, Union, Self
from uuid import UUID
from tangl.core import (
Edge,
Entity,
EntityGroup,
EntityTemplate,
Priority,
RegistryAware,
TemplateRegistry,
Node,
Selector,
)
from ..dispatch import on_provision
from ..ctx import VmPhaseCtx
from ..runtime.causality import CausalityMode
from ..traversable import TraversableNode
from .materialization import (
MaterializeRole,
attach_child,
materialize_template_entity,
resolve_story_materialize_hook,
resolve_story_post_materialize_hook,
resolve_story_preview_requirement_hook,
)
from .matching import annotate_offer_specificity, summarize_offer
from .preview import Blocker, ViabilityResult
from .provisioner import (
ProvisionOffer,
FindProvisioner,
TemplateProvisioner,
TokenProvisioner,
InlineTemplateProvisioner,
StubProvisioner,
UpdateCloneProvisioner,
ProvisionPolicy,
_template_hash_value,
)
from .requirement import (
Requirement,
PT,
Dependency,
Affordance,
Fanout,
stamp_requirement_resolution,
)
from .scope import (
build_plan,
prefix_paths,
resolve_target_path,
scope_distance,
split_path,
target_context_candidates,
)
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class _StructuralChainPlan:
target_ctx: str | None
build_segments: tuple[str, ...] = ()
@property
def requires_chain(self) -> bool:
return bool(self.build_segments)
@property
def parent_path(self) -> str | None:
if not isinstance(self.target_ctx, str) or not self.target_ctx:
return None
parent_parts = split_path(self.target_ctx)[:-1]
if not parent_parts:
return None
return ".".join(parent_parts)
[docs]
@dataclass
class Resolver:
"""Resolver(location_entity_groups=(), template_scope_groups=())
Gather, rank, preview, and bind provisioning offers for frontier
requirements.
Why
----
Provisioning is where runtime graph gaps become concrete providers. The
resolver centralizes offer discovery and binding so story-layer code can
define policy without reimplementing the selection mechanics.
Key Features
------------
* Reads candidate providers from ordered entity groups and template scopes.
* Merges FIND, CREATE, TOKEN, inline-template, and synthesized
UPDATE/CLONE offers into one ranked stream.
* Supports preview mode through blocker diagnostics and optional stub
linkage.
* Writes selected providers and decision metadata back onto dependency
requirements.
API
---
- :meth:`from_ctx` creates a resolver from a compatible vm context.
- :meth:`gather_offers` assembles ranked candidates for one requirement.
- :meth:`resolve_requirement` and :meth:`resolve_dependency` bind concrete
providers.
- :meth:`resolve_frontier_node` resolves all open dependencies for one
frontier node.
- :meth:`preview_requirement` reports viability without mutating topology.
"""
# order of the groups indicates 'distance' from the requirement carrier,
# so resolver can be created per frontier node and then re-used multiple
# times to resolve its requirements.
# Pushes entire 'scope' discussion into however the frontier wants to define
# it and provides a working default with a single group, single distance.
location_entity_groups: Iterable[Iterable[Any]] = ()
template_scope_groups: Iterable[TemplateRegistry] = ()
# Legacy aliases kept for backward compatibility.
entity_groups: Iterable[Iterable[Any]] = ()
template_groups: Iterable[TemplateRegistry] = ()
def __post_init__(self) -> None:
if not self.location_entity_groups and self.entity_groups:
self.location_entity_groups = self.entity_groups
if not self.template_scope_groups and self.template_groups:
self.template_scope_groups = self.template_groups
[docs]
@classmethod
def from_ctx(cls, ctx: VmPhaseCtx) -> Self:
"""Build a resolver from the entity and template groups exposed by ``ctx``."""
if not isinstance(ctx, VmPhaseCtx):
raise TypeError(
"Resolver.from_ctx() requires a VmPhaseCtx with "
"get_location_entity_groups() and get_template_scope_groups()",
)
return cls(
location_entity_groups=ctx.get_location_entity_groups(),
template_scope_groups=ctx.get_template_scope_groups(),
)
@staticmethod
def _selector_identifier(requirement: Requirement) -> str | None:
extra = requirement.__pydantic_extra__ or {}
value = extra.get("has_identifier")
if isinstance(value, str) and value:
return value
return None
@staticmethod
def _is_episode_requirement(requirement: Requirement) -> bool:
extra = requirement.__pydantic_extra__ or {}
kind = extra.get("has_kind")
return isinstance(kind, type) and issubclass(kind, TraversableNode)
@staticmethod
def _request_ctx_path(_ctx: VmPhaseCtx | None) -> str:
if _ctx is None:
return ""
meta = dict(_ctx.meta or {})
request_ctx_path = meta.get("request_ctx_path")
if isinstance(request_ctx_path, str) and request_ctx_path:
return request_ctx_path
cursor = _ctx.cursor
if cursor is None:
return ""
path = getattr(cursor, "path", None)
if isinstance(path, str) and path:
return path
label = getattr(cursor, "label", None)
if isinstance(label, str):
return label
return ""
@classmethod
def _stubs_allowed(cls, *, allow_stubs: bool, _ctx: VmPhaseCtx | None = None) -> bool:
if allow_stubs:
return True
if _ctx is None:
return False
return _ctx.causality_mode == CausalityMode.HARD_DIRTY
@staticmethod
def _materialize_node(
template: EntityTemplate,
*,
_ctx: VmPhaseCtx | None = None,
role: MaterializeRole | str = MaterializeRole.PROVISION_LEAF,
story_materialize: Any = None,
) -> Entity:
return materialize_template_entity(
template,
_ctx=_ctx,
role=role,
story_materialize=story_materialize,
)
def _resolve_target_path_for_requirement(
self,
requirement: Requirement,
*,
_ctx: VmPhaseCtx | None = None,
) -> str | None:
return resolve_target_path(
identifier=self._selector_identifier(requirement),
request_ctx=self._request_ctx_path(_ctx),
authored_path=requirement.authored_path,
is_qualified=requirement.is_qualified,
is_absolute=requirement.is_absolute,
)
def _template_offers_for_requirement(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
request_ctx = self._request_ctx_path(_ctx)
graph = _ctx.graph if _ctx is not None else None
story_materialize = resolve_story_materialize_hook(_ctx)
provisioner = TemplateProvisioner(
registries=self.template_scope_groups,
request_ctx=request_ctx,
graph=graph,
story_materialize=story_materialize,
materialize_node=materialize_template_entity,
)
return list(provisioner.get_dependency_offers(requirement))
def _token_offers_for_requirement(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
if _ctx is None:
return []
catalogs = list(_ctx.get_token_catalogs(requirement=requirement))
if not catalogs:
return []
return list(TokenProvisioner(catalogs=catalogs).get_dependency_offers(requirement))
def _media_inventory_offers_for_requirement(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
if _ctx is None:
return []
inventories = list(_ctx.get_media_inventories(requirement=requirement))
if not inventories:
return []
from tangl.media.media_resource.media_provisioning import MediaInventoryProvisioner
return list(MediaInventoryProvisioner(inventories=inventories).get_dependency_offers(requirement))
def _media_spec_offers_for_requirement(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
from tangl.media.media_resource.media_provisioning import MediaSpecProvisioner
return list(
MediaSpecProvisioner(graph=_ctx.graph if _ctx is not None else None).get_dependency_offers(
requirement,
_ctx=_ctx,
)
)
def inspect_template_dependency_offers(
self,
requirement: Requirement[PT],
*,
allow_stubs: bool = False,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
"""Return template-only dependency offers using resolver matching semantics."""
offers = self._template_offers_for_requirement(requirement, _ctx=_ctx)
offers = [annotate_offer_specificity(requirement, offer) for offer in offers]
stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx)
def _allowed(offer: ProvisionOffer) -> bool:
if offer.policy & ProvisionPolicy.STUB:
return stubs_allowed
return bool(offer.policy & requirement.provision_policy)
offers = [offer for offer in offers if _allowed(offer)]
offers.sort(key=lambda v: v.sort_key())
return offers
def _iter_templates(self) -> Iterator[EntityTemplate]:
for registry in self.template_scope_groups:
if not isinstance(registry, TemplateRegistry):
continue
yield from registry.values()
def _existing_offers_for_requirement(
self,
requirement: Requirement[PT],
) -> list[ProvisionOffer]:
offers: list[ProvisionOffer] = []
# If there are more than 20 groups, the distance-based priorities will slip
# from the NORMAL tier to LATE, maybe just collapse everything after a few scopes
# out but not global into "far away" tier and then include globals as the last group.
for i, entity_group in enumerate(self.location_entity_groups):
offers.extend(FindProvisioner(values=entity_group, distance=i).get_dependency_offers(requirement))
return offers
def _inline_template_offers_for_requirement(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
return list(
InlineTemplateProvisioner(
materialize_node=materialize_template_entity,
story_materialize=resolve_story_materialize_hook(_ctx),
).iter_dependency_offers(requirement)
)
def _discover_requirement_offers(
self,
requirement: Requirement[PT],
*,
preferred_offers: Iterable[ProvisionOffer] = (),
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
offers: list[ProvisionOffer] = list(preferred_offers or [])
offers.extend(self._existing_offers_for_requirement(requirement))
offers.extend(self._template_offers_for_requirement(requirement, _ctx=_ctx))
offers.extend(self._token_offers_for_requirement(requirement, _ctx=_ctx))
offers.extend(self._media_inventory_offers_for_requirement(requirement, _ctx=_ctx))
offers.extend(self._media_spec_offers_for_requirement(requirement, _ctx=_ctx))
offers.extend(self._inline_template_offers_for_requirement(requirement, _ctx=_ctx))
offers.extend(UpdateCloneProvisioner.get_dependency_offers(requirement=requirement, offers=offers))
return offers
def _discover_fanout_offers(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
offers = self._existing_offers_for_requirement(requirement)
offers.extend(self._template_offers_for_requirement(requirement, _ctx=_ctx))
return offers
@staticmethod
def _apply_resolve_override(
requirement: Requirement[PT],
offers: list[ProvisionOffer],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
if _ctx is None:
return offers
from tangl.vm.dispatch import do_resolve
try:
resolved_offers = do_resolve(requirement=requirement, offers=offers, ctx=_ctx)
except TypeError as exc:
requirement.resolution_reason = "override_invalid"
requirement.resolution_meta = {"error": str(exc)}
return offers
if resolved_offers is None:
return offers
return list(resolved_offers)
@staticmethod
def _annotate_offers(
requirement: Requirement[PT],
offers: Iterable[ProvisionOffer],
) -> list[ProvisionOffer]:
return [annotate_offer_specificity(requirement, offer) for offer in offers]
def _filter_requirement_offers(
self,
requirement: Requirement[PT],
offers: list[ProvisionOffer],
*,
allow_stubs: bool = False,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx)
def _allowed(offer: ProvisionOffer) -> bool:
if offer.policy & ProvisionPolicy.STUB:
return stubs_allowed
return bool(offer.policy & requirement.provision_policy)
filtered = [offer for offer in offers if _allowed(offer)]
if stubs_allowed and not filtered:
filtered.extend(StubProvisioner.get_dependency_offers(requirement))
return filtered
@staticmethod
def _filter_fanout_offers(offers: list[ProvisionOffer]) -> list[ProvisionOffer]:
def _allowed(offer: ProvisionOffer) -> bool:
if offer.policy & ProvisionPolicy.STUB:
return False
if offer.policy & ProvisionPolicy.TOKEN:
return False
if offer.policy & ProvisionPolicy.UPDATE:
return False
if offer.policy & ProvisionPolicy.CLONE:
return False
return bool(offer.policy & (ProvisionPolicy.EXISTING | ProvisionPolicy.CREATE))
return [offer for offer in offers if _allowed(offer)]
@staticmethod
def _sort_offers(offers: list[ProvisionOffer]) -> list[ProvisionOffer]:
offers.sort(key=lambda v: v.sort_key())
return offers
@staticmethod
def _dedupe_offers_by_candidate_target(offers: list[ProvisionOffer]) -> list[ProvisionOffer]:
deduped: list[ProvisionOffer] = []
seen_keys: set[tuple[UUID, str | None]] = set()
for offer in offers:
candidate = offer.candidate
candidate_uid = getattr(candidate, "uid", None)
if not isinstance(candidate_uid, UUID):
deduped.append(offer)
continue
key = (candidate_uid, offer.target_ctx)
if key in seen_keys:
continue
seen_keys.add(key)
deduped.append(offer)
return deduped
[docs]
def gather_offers(
self,
requirement: Requirement[PT],
*,
allow_stubs: bool = False,
preferred_offers: Iterable[ProvisionOffer] = (),
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
"""Return ranked offers that are currently admissible for ``requirement``.
Existing providers, template-driven candidates, token catalogs, inline
templates, synthesized UPDATE or CLONE offers, and optional dispatch
overrides all participate in this one ordered result set.
"""
offers = self._discover_requirement_offers(
requirement,
preferred_offers=preferred_offers,
_ctx=_ctx,
)
offers = self._apply_resolve_override(requirement, offers, _ctx=_ctx)
offers = self._annotate_offers(requirement, offers)
offers = self._filter_requirement_offers(
requirement,
offers,
allow_stubs=allow_stubs,
_ctx=_ctx,
)
return self._sort_offers(offers)
@staticmethod
def _iter_local_affordance_providers(frontier: Node | None) -> Iterable[RegistryAware]:
"""Yield unique providers from affordances pushed out of ``frontier``.
Contract: frontier is affordance ``predecessor`` and provider/resource is
affordance ``successor``.
"""
if frontier is None:
return
seen_ids: set[UUID] = set()
if hasattr(frontier, "edges_out"):
affordances = frontier.edges_out(Selector(has_kind=Affordance))
else:
graph = getattr(frontier, "graph", None)
if graph is None:
return
affordances = graph.find_edges(Selector(has_kind=Affordance, predecessor=frontier))
for affordance in affordances:
provider = affordance.successor
if provider is None:
# Backward compatibility for stale serialized affordances.
provider = affordance.provider
if not isinstance(provider, RegistryAware):
continue
if provider.uid in seen_ids:
continue
seen_ids.add(provider.uid)
yield provider
def _linked_affordance_offers(
self,
*,
requirement: Requirement[PT],
frontier: Node | None,
) -> list[ProvisionOffer]:
"""Build EXISTING offers from already-linked local affordance providers."""
offers: list[ProvisionOffer] = []
for provider in self._iter_local_affordance_providers(frontier):
try:
if not requirement.satisfied_by(provider):
continue
except Exception as exc:
logger.debug(
"requirement.satisfied_by failed for requirement=%s provider=%s",
requirement,
provider,
exc_info=exc,
)
continue
offers.append(
ProvisionOffer(
origin_id="LinkedAffordance",
policy=ProvisionPolicy.EXISTING,
priority=Priority.EARLY,
distance_from_caller=0,
candidate=provider,
callback=lambda *_, _provider=provider, **__: _provider,
)
)
return offers
@staticmethod
def _clone_requirement(requirement: Requirement[PT]) -> Requirement[PT]:
clone = requirement.model_copy(deep=True)
clone.provider_id = None
clone.unsatisfiable = None
clone.unambiguously_resolved = None
clone.selected_offer_policy = None
clone.resolved_step = None
clone.resolved_cursor_id = None
clone.resolution_reason = None
clone.resolution_meta = None
return clone
@staticmethod
def _fanout_edge_tag(fanout: Fanout[Any]) -> str:
return f"fanout:{fanout.uid}"
@classmethod
def _fanout_affordance_tags(cls, fanout: Fanout[Any]) -> set[str]:
return {"dynamic", "fanout", cls._fanout_edge_tag(fanout)}
def _iter_fanout_affordances(self, fanout: Fanout[Any]) -> Iterator[Affordance[Any]]:
source = fanout.predecessor
if source is None or fanout.registry is None:
return
marker = self._fanout_edge_tag(fanout)
for affordance in source.edges_out(Selector(has_kind=Affordance)):
tags = getattr(affordance, "tags", set()) or set()
if marker not in tags:
continue
yield affordance
def _clear_fanout_affordances(
self,
fanout: Fanout[Any],
*,
_ctx: VmPhaseCtx | None = None,
) -> None:
graph = fanout.registry
if graph is None:
return
for affordance in list(self._iter_fanout_affordances(fanout)):
graph.remove(affordance.uid, _ctx=_ctx)
def gather_fanout_offers(
self,
requirement: Requirement[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[ProvisionOffer]:
"""Return all admissible EXISTING/CREATE offers for ``requirement``."""
offers = self._discover_fanout_offers(requirement, _ctx=_ctx)
offers = self._apply_resolve_override(requirement, offers, _ctx=_ctx)
offers = self._annotate_offers(requirement, offers)
offers = self._filter_fanout_offers(offers)
offers = self._sort_offers(offers)
return self._dedupe_offers_by_candidate_target(offers)
def _accept_fanout_offer(
self,
*,
requirement: Requirement[PT],
offer: ProvisionOffer,
graph: Any,
_ctx: VmPhaseCtx | None = None,
) -> PT | None:
def _existing_instance_for_template() -> PT | None:
candidate = offer.candidate
if not isinstance(candidate, EntityTemplate):
return None
candidate_label = candidate.get_label()
candidate_hash = _template_hash_value(candidate)
for entity_group in self.location_entity_groups:
for existing in entity_group:
if not isinstance(existing, RegistryAware):
continue
if existing.registry is not graph:
continue
existing_path = getattr(existing, "path", None)
if isinstance(existing_path, str) and existing_path == candidate_label:
return existing
if getattr(existing, "templ_hash", None) == candidate_hash:
return existing
return None
target_ctx = offer.target_ctx
if isinstance(target_ctx, str) and target_ctx:
existing = self._find_existing_path_node(graph, target_ctx)
if isinstance(existing, RegistryAware):
try:
if requirement.satisfied_by(existing):
return existing
except Exception as exc:
logger.debug(
"fanout existing-path reuse failed for requirement=%s target_ctx=%s",
requirement,
target_ctx,
exc_info=exc,
)
if offer.policy & ProvisionPolicy.CREATE:
existing = _existing_instance_for_template()
if existing is not None:
try:
if requirement.satisfied_by(existing):
return existing
except Exception:
pass
return None
provider = offer.callback(_ctx=_ctx)
if provider is None:
return None
if not isinstance(provider, RegistryAware):
raise TypeError("Fanout providers must be RegistryAware")
parent = self._execute_build_chain(
requirement=requirement,
offer=offer,
graph=graph,
_ctx=_ctx,
)
provider = self._attach_and_finalize_provider(
requirement=requirement,
offer=offer,
provider=provider,
graph=graph,
parent=parent,
_ctx=_ctx,
)
requirement._validate_satisfied_by(provider)
return provider
def resolve_fanout(
self,
fanout: Fanout[PT],
*,
_ctx: VmPhaseCtx | None = None,
) -> list[PT]:
"""Resolve and publish every eligible provider for ``fanout``."""
graph = fanout.registry
if graph is None:
raise ValueError("Fanout must be bound to a registry before resolution")
offers = self.gather_fanout_offers(fanout.requirement, _ctx=_ctx)
providers: list[PT] = []
accepted_offers: list[ProvisionOffer] = []
seen_provider_ids: set[UUID] = set()
for offer in offers:
try:
provider = self._accept_fanout_offer(
requirement=fanout.requirement,
offer=offer,
graph=graph,
_ctx=_ctx,
)
except (TypeError, ValueError, RuntimeError) as exc:
logger.debug(
"Skipping invalid fanout offer origin=%s for fanout=%s",
offer.origin_id,
fanout,
exc_info=exc,
)
continue
if provider is None:
continue
if provider.uid in seen_provider_ids:
continue
seen_provider_ids.add(provider.uid)
providers.append(provider)
accepted_offers.append(offer)
fanout.set_providers(providers, _ctx=_ctx)
self._clear_fanout_affordances(fanout, _ctx=_ctx)
source = fanout.predecessor
if source is not None:
affordance_tags = self._fanout_affordance_tags(fanout)
for provider in providers:
affordance = Affordance(
registry=graph,
predecessor_id=source.uid,
requirement=self._clone_requirement(fanout.requirement),
tags=set(affordance_tags),
)
affordance.set_provider(provider, _ctx=_ctx)
requirement = fanout.requirement
stamp_requirement_resolution(requirement, _ctx=_ctx)
requirement.unsatisfiable = False
requirement.unambiguously_resolved = len(providers) == 1
requirement.selected_offer_policy = accepted_offers[0].policy if accepted_offers else None
if providers:
requirement.resolution_reason = "resolved"
else:
requirement.resolution_reason = "provider_none" if offers else "no_offers"
requirement.resolution_meta = {
"provider_ids": [str(provider.uid) for provider in providers],
"selected": [summarize_offer(offer) for offer in accepted_offers[:5]],
"alternatives": [summarize_offer(offer) for offer in offers[:5]],
}
return providers
def _resolve_requirement_offer(
self,
requirement: Requirement[PT],
*,
allow_stubs: bool = False,
preferred_offers: Iterable[ProvisionOffer] = (),
_ctx: VmPhaseCtx | None = None,
) -> tuple[Optional[PT], Optional[ProvisionOffer], list[ProvisionOffer]]:
offers = self.gather_offers(
requirement,
allow_stubs=allow_stubs,
preferred_offers=preferred_offers,
_ctx=_ctx,
)
if len(offers) == 0:
# No valid offers available
requirement.unsatisfiable = True
requirement.unambiguously_resolved = None
requirement.selected_offer_policy = None
requirement.resolved_step = None
requirement.resolved_cursor_id = None
if requirement.resolution_reason is None:
requirement.resolution_reason = "no_offers"
requirement.resolution_meta = requirement.resolution_meta or {
"alternatives": [],
}
return None, None, offers
else:
requirement.unsatisfiable = False
requirement.unambiguously_resolved = len(offers) == 1
for selected_offer in offers:
requirement.selected_offer_policy = selected_offer.policy
requirement.resolution_reason = "resolved"
requirement.resolution_meta = {
"selected": summarize_offer(selected_offer),
"alternatives": [summarize_offer(offer) for offer in offers[:5]],
}
provider = selected_offer.callback(_ctx=_ctx)
if provider is not None:
return provider, selected_offer, offers
requirement.unsatisfiable = True
requirement.selected_offer_policy = None
requirement.resolution_reason = "provider_none"
return None, None, offers
[docs]
def resolve_requirement(
self,
requirement: Requirement[PT],
*,
allow_stubs: bool = False,
preferred_offers: Iterable[ProvisionOffer] = (),
_ctx: VmPhaseCtx | None = None,
) -> Optional[PT]:
"""Resolve and materialize one provider for ``requirement`` if possible."""
provider, _, _ = self._resolve_requirement_offer(
requirement=requirement,
allow_stubs=allow_stubs,
preferred_offers=preferred_offers,
_ctx=_ctx,
)
return provider
@staticmethod
def _matches_identifier(candidate: Any, identifier: str) -> bool:
has_identifier = getattr(candidate, "has_identifier", None)
if callable(has_identifier):
try:
return bool(has_identifier(identifier))
except (TypeError, ValueError, AttributeError):
return False
return getattr(candidate, "label", None) == identifier
def _find_existing_path_node(self, graph: Any, path: str) -> Any | None:
segments = split_path(path)
if not segments:
return None
current = None
for index, segment in enumerate(segments):
if index == 0:
current = self._find_top_level_node(graph, segment)
else:
current = self._find_child_node(current, segment)
if current is None:
return None
return current
def _find_top_level_node(self, graph: Any, segment: str) -> Any | None:
values = getattr(graph, "values", None)
if not callable(values):
return None
for candidate in values():
if getattr(candidate, "parent", None) is not None:
continue
if self._matches_identifier(candidate, segment):
return candidate
return None
def _find_child_node(self, parent: Any, segment: str) -> Any | None:
if parent is None:
return None
children = getattr(parent, "children", None)
if callable(children):
for candidate in children():
if self._matches_identifier(candidate, segment):
return candidate
return None
members = getattr(parent, "members", None)
if callable(members):
for candidate in members():
if self._matches_identifier(candidate, segment):
return candidate
return None
return None
def _find_structural_template(self, *, identifier: str, target_ctx: str) -> EntityTemplate | None:
best: tuple[int, int, EntityTemplate] | None = None
for template in self._iter_templates():
if not template.has_identifier(identifier):
continue
if not template.has_payload_kind(TraversableNode):
continue
if not template.admitted_to(target_ctx):
continue
key = (
scope_distance(template.admission_scope, target_ctx),
int(getattr(template, "seq", 0)),
)
if best is None or key < (best[0], best[1]):
best = (key[0], key[1], template)
if best is None:
return None
return best[2]
def _chain_plan_for_offer(
self,
requirement: Requirement,
*,
offer: ProvisionOffer,
_ctx: VmPhaseCtx | None = None,
) -> _StructuralChainPlan:
offer_target_ctx = offer.target_ctx
target_ctx = (
offer_target_ctx
if isinstance(offer_target_ctx, str) and offer_target_ctx
else self._resolve_target_path_for_requirement(requirement, _ctx=_ctx)
)
return _StructuralChainPlan(
target_ctx=target_ctx,
build_segments=tuple(offer.build_plan or ()),
)
@staticmethod
def _chain_plan_for_target_ctx(target_ctx: str | None, *, graph: Any) -> _StructuralChainPlan:
return _StructuralChainPlan(
target_ctx=target_ctx,
build_segments=tuple(build_plan(target_ctx, graph)),
)
def _chain_paths_resolvable(
self,
*,
build_segments: list[str],
target_ctx: str,
graph: Any,
) -> bool:
if not build_segments:
return True
if graph is None:
return False
target_prefix_paths = prefix_paths(target_ctx)[:-1]
prefix_segments = split_path(target_ctx)[:-1]
if not prefix_segments:
return False
segment_index = 0
plan_index = 0
while segment_index < len(prefix_segments) and plan_index < len(build_segments):
if prefix_segments[segment_index] == build_segments[plan_index]:
segment_path = target_prefix_paths[segment_index]
if self._find_existing_path_node(graph, segment_path) is None:
template = self._find_structural_template(
identifier=segment_path,
target_ctx=target_ctx,
)
if template is None:
return False
plan_index += 1
segment_index += 1
return plan_index == len(build_segments)
def _chain_plan_is_resolvable(
self,
plan: _StructuralChainPlan,
*,
graph: Any,
) -> bool:
if not plan.requires_chain:
return True
if graph is None or plan.target_ctx is None:
return False
return self._chain_paths_resolvable(
build_segments=list(plan.build_segments),
target_ctx=plan.target_ctx,
graph=graph,
)
def _parent_for_chain_plan(
self,
plan: _StructuralChainPlan,
*,
graph: Any,
) -> Any | None:
if not plan.parent_path:
return None
return self._find_existing_path_node(graph, plan.parent_path)
def _find_chain_segment_node(
self,
*,
graph: Any,
current: Any | None,
segment: str,
) -> Any | None:
if current is None:
return self._find_top_level_node(graph, segment)
return self._find_child_node(current, segment)
def _materialize_structural_segment(
self,
*,
segment_path: str,
target_ctx: str,
graph: Any,
_ctx: VmPhaseCtx | None = None,
) -> tuple[RegistryAware, EntityTemplate]:
template = self._find_structural_template(
identifier=segment_path,
target_ctx=target_ctx,
)
if template is None:
raise ValueError(f"Missing structural template for segment {segment_path!r}")
created = self._materialize_node(
template,
_ctx=_ctx,
role=MaterializeRole.PROVISION_INTERMEDIATE,
)
if not isinstance(created, RegistryAware):
raise TypeError("Structural template materialization must yield RegistryAware nodes")
if created.registry is not graph:
graph.add(created, _ctx=_ctx)
return created, template
@staticmethod
def _offer_materializes_runtime_entity(offer: ProvisionOffer) -> bool:
return bool(
offer.policy & (ProvisionPolicy.CREATE | ProvisionPolicy.CLONE)
) or isinstance(offer.candidate, EntityTemplate)
@staticmethod
def _offer_template(offer: ProvisionOffer) -> EntityTemplate | None:
candidate = offer.candidate
if isinstance(candidate, EntityTemplate):
return candidate
return None
@staticmethod
def _request_ctx_for_node(node: Any, fallback: str | None = None) -> str | None:
path = getattr(node, "path", None)
if isinstance(path, str) and path:
return path
label = getattr(node, "label", None)
if isinstance(label, str) and label:
return label
if isinstance(fallback, str) and fallback:
return fallback
return None
def _make_node_ctx(
self,
*,
graph: Any,
node: RegistryAware,
_ctx: VmPhaseCtx | None = None,
request_ctx_path: str | None = None,
) -> VmPhaseCtx:
if _ctx is None:
raise TypeError(
"Nested runtime validation requires a typed context with derive()",
)
meta_overrides = None
if isinstance(request_ctx_path, str) and request_ctx_path:
meta_overrides = {"request_ctx_path": request_ctx_path}
return _ctx.derive(
cursor_id=node.uid,
graph=graph,
meta_overrides=meta_overrides,
)
def _post_materialize_entity(
self,
*,
entity: RegistryAware,
template: EntityTemplate | None,
role: MaterializeRole,
_ctx: VmPhaseCtx | None = None,
) -> None:
story_post_materialize = resolve_story_post_materialize_hook(_ctx)
if callable(story_post_materialize):
story_post_materialize(
template=template,
entity=entity,
role=role,
_ctx=_ctx,
)
def _validate_runtime_entity(
self,
*,
entity: RegistryAware,
graph: Any,
_ctx: VmPhaseCtx | None = None,
request_ctx_path: str | None = None,
) -> bool:
if not isinstance(entity, TraversableNode):
return True
node_ctx = self._make_node_ctx(
graph=graph,
node=entity,
_ctx=_ctx,
request_ctx_path=self._request_ctx_for_node(entity, fallback=request_ctx_path),
)
return Resolver.from_ctx(node_ctx).resolve_frontier_node(
entity,
allow_stubs=self._stubs_allowed(allow_stubs=False, _ctx=_ctx),
_ctx=node_ctx,
)
def _existing_target_provider(
self,
*,
requirement: Requirement,
offer: ProvisionOffer,
graph: Any,
) -> PT | None:
target_ctx = offer.target_ctx
if not isinstance(target_ctx, str) or not target_ctx:
return None
existing = self._find_existing_path_node(graph, target_ctx)
if not isinstance(existing, RegistryAware):
return None
try:
if requirement.satisfied_by(existing):
return existing
except Exception as exc:
logger.debug(
"existing target reuse failed for requirement=%s target_ctx=%s",
requirement,
target_ctx,
exc_info=exc,
)
return None
def _attach_and_finalize_provider(
self,
*,
requirement: Requirement[PT],
offer: ProvisionOffer,
provider: PT,
graph: Any,
parent: Any | None,
_ctx: VmPhaseCtx | None = None,
) -> PT:
reused = self._existing_target_provider(
requirement=requirement,
offer=offer,
graph=graph,
)
if reused is not None:
provider = reused
if not isinstance(provider, RegistryAware):
raise TypeError("Provisioned providers must be RegistryAware")
if provider.registry is not graph:
graph.add(provider, _ctx=_ctx)
if getattr(provider, "parent", None) is not parent:
attach_child(parent, provider)
if self._offer_materializes_runtime_entity(offer):
self._post_materialize_entity(
entity=provider,
template=self._offer_template(offer),
role=MaterializeRole.PROVISION_LEAF,
_ctx=_ctx,
)
if not self._validate_runtime_entity(
entity=provider,
graph=graph,
_ctx=_ctx,
request_ctx_path=offer.target_ctx,
):
raise RuntimeError("Provisioned provider failed runtime validation")
return provider
def _execute_build_chain(
self,
*,
requirement: Requirement,
offer: ProvisionOffer,
graph: Any,
_ctx: VmPhaseCtx | None = None,
) -> Any | None:
plan = self._chain_plan_for_offer(requirement, offer=offer, _ctx=_ctx)
if not plan.requires_chain:
return self._parent_for_chain_plan(plan, graph=graph)
if plan.target_ctx is None:
raise ValueError("Cannot execute structural chain without a target path")
if not self._chain_plan_is_resolvable(plan, graph=graph):
raise ValueError("structural build chain is not resolvable")
segments = split_path(plan.target_ctx)
parent_segments = segments[:-1]
if not parent_segments:
return None
current: Any | None = None
for index, segment in enumerate(parent_segments):
existing = self._find_chain_segment_node(
graph=graph,
current=current,
segment=segment,
)
if existing is not None:
current = existing
continue
created, template = self._materialize_structural_segment(
segment_path=".".join(segments[: index + 1]),
target_ctx=plan.target_ctx,
graph=graph,
_ctx=_ctx,
)
attach_child(current, created)
self._post_materialize_entity(
entity=created,
template=template,
role=MaterializeRole.PROVISION_INTERMEDIATE,
_ctx=_ctx,
)
if not self._validate_runtime_entity(
entity=created,
graph=graph,
_ctx=_ctx,
request_ctx_path=".".join(segments[: index + 1]),
):
raise RuntimeError("Structural chain segment failed runtime validation")
current = created
return current
def _bind_dependency_provider(
self,
*,
dependency: Dependency,
provider: PT,
offer: ProvisionOffer,
parent: Any | None = None,
_ctx: VmPhaseCtx | None = None,
) -> bool:
provider = self._attach_and_finalize_provider(
requirement=dependency.requirement,
offer=offer,
provider=provider,
graph=dependency.registry,
parent=parent,
_ctx=_ctx,
)
dependency.set_provider(provider, _ctx=_ctx)
return True
[docs]
def preview_requirement(
self,
requirement: Requirement,
*,
allow_stubs: bool = False,
preferred_offers: Iterable[ProvisionOffer] = (),
max_depth: int = 8,
_ctx: VmPhaseCtx | None = None,
) -> ViabilityResult:
"""Return a non-mutating viability preview for one requirement."""
# depth/visited are reserved for future recursive chain variants.
_ = max_depth
offers = self.gather_offers(
requirement,
allow_stubs=allow_stubs,
preferred_offers=preferred_offers,
_ctx=_ctx,
)
graph = _ctx.graph if _ctx is not None else None
preview = self._preview_viable_offer(
requirement=requirement,
offers=offers,
graph=graph,
_ctx=_ctx,
)
if preview is not None:
return preview
blockers = self._diagnose_blockers(requirement=requirement, _ctx=_ctx)
return ViabilityResult(viable=False, chain=[], scope_distance=0, blockers=blockers)
def _preview_viable_offer(
self,
*,
requirement: Requirement,
offers: Iterable[ProvisionOffer],
graph: Any,
_ctx: VmPhaseCtx | None = None,
) -> ViabilityResult | None:
story_blockers: list[Blocker] = []
preview_requirement = resolve_story_preview_requirement_hook(_ctx)
for offer in offers:
plan = self._chain_plan_for_offer(requirement, offer=offer, _ctx=_ctx)
if not self._chain_plan_is_resolvable(plan, graph=graph):
continue
if callable(preview_requirement):
blockers = preview_requirement(
requirement=requirement,
offer=offer,
graph=graph,
_ctx=_ctx,
)
if blockers:
story_blockers.extend(list(blockers))
continue
return ViabilityResult(
viable=True,
chain=list(plan.build_segments),
scope_distance=offer.scope_distance,
blockers=[],
)
if story_blockers:
return ViabilityResult(
viable=False,
chain=[],
scope_distance=0,
blockers=story_blockers,
)
return None
@staticmethod
def _blocker_context(
*,
identifier: str | None,
target_ctxs: list[str],
include_candidates: bool = False,
**extra: Any,
) -> dict[str, Any]:
context: dict[str, Any] = {
"identifier": identifier,
"target_ctx": target_ctxs[0] if target_ctxs else None,
}
if include_candidates:
context["target_ctx_candidates"] = list(target_ctxs)
context.update(extra)
return context
def _blocker(
self,
reason: str,
*,
identifier: str | None,
target_ctxs: list[str],
include_candidates: bool = False,
**context: Any,
) -> Blocker:
return Blocker(
reason=reason,
context=self._blocker_context(
identifier=identifier,
target_ctxs=target_ctxs,
include_candidates=include_candidates,
**context,
),
)
def _target_contexts_for_requirement(
self,
requirement: Requirement,
*,
_ctx: VmPhaseCtx | None = None,
) -> list[str]:
identifier = self._selector_identifier(requirement)
target_ctxs = target_context_candidates(
identifier=identifier,
request_ctx=self._request_ctx_path(_ctx),
authored_path=requirement.authored_path,
is_qualified=requirement.is_qualified,
is_absolute=requirement.is_absolute,
)
if not target_ctxs:
target_ctx = self._resolve_target_path_for_requirement(requirement, _ctx=_ctx)
if target_ctx is not None:
target_ctxs = [target_ctx]
return target_ctxs
@staticmethod
def _identity_template_candidates(
requirement: Requirement,
templates: list[EntityTemplate],
) -> list[EntityTemplate]:
return [
template
for template in templates
if TemplateProvisioner._matches_non_identifier_criteria(requirement, template)
and TemplateProvisioner._matches_template_identity(requirement, template)
]
@staticmethod
def _kind_template_candidates(
requirement: Requirement,
templates: list[EntityTemplate],
) -> list[EntityTemplate]:
kind = (requirement.__pydantic_extra__ or {}).get("has_kind")
if not isinstance(kind, type):
return []
return [template for template in templates if template.has_kind(kind)]
@staticmethod
def _admitted_template_candidates(
identity_candidates: list[EntityTemplate],
target_ctxs: list[str],
) -> list[tuple[EntityTemplate, str]]:
return [
(template, target_ctx)
for template in identity_candidates
for target_ctx in target_ctxs
if template.admitted_to(target_ctx)
]
def _diagnose_identity_blockers(
self,
*,
requirement: Requirement,
templates: list[EntityTemplate],
target_ctxs: list[str],
) -> list[Blocker] | None:
identifier = self._selector_identifier(requirement)
if self._identity_template_candidates(requirement, templates):
return None
if self._kind_template_candidates(requirement, templates):
return [
self._blocker(
"name_mismatch",
identifier=identifier,
target_ctxs=target_ctxs,
)
]
return [
self._blocker(
"no_template",
identifier=identifier,
target_ctxs=target_ctxs,
)
]
def _diagnose_scope_blockers(
self,
*,
requirement: Requirement,
identity_candidates: list[EntityTemplate],
target_ctxs: list[str],
) -> list[Blocker] | None:
if self._admitted_template_candidates(identity_candidates, target_ctxs):
return None
return [
self._blocker(
"scope_rejected",
identifier=self._selector_identifier(requirement),
target_ctxs=target_ctxs,
include_candidates=True,
scopes=[template.admission_scope for template in identity_candidates],
)
]
def _diagnose_episode_scope_policy_blockers(
self,
*,
requirement: Requirement,
admitted_candidates: list[tuple[EntityTemplate, str]],
target_ctxs: list[str],
_ctx: VmPhaseCtx | None = None,
) -> list[Blocker] | None:
if not self._is_episode_requirement(requirement):
return None
identifier = self._selector_identifier(requirement)
distances = [
scope_distance(template.admission_scope, target_ctx)
for template, target_ctx in admitted_candidates
]
if not requirement.is_qualified and distances and min(distances) > 0:
return [
self._blocker(
"scope_rejected",
identifier=identifier,
target_ctxs=target_ctxs,
include_candidates=True,
distances=distances,
policy="unqualified_episode_requires_distance_0",
)
]
if not requirement.is_qualified or not target_ctxs:
return None
graph = _ctx.graph if _ctx is not None else None
unresolved: list[dict[str, Any]] = []
for _template, target_ctx in admitted_candidates:
plan = self._chain_plan_for_target_ctx(target_ctx, graph=graph)
if self._chain_plan_is_resolvable(plan, graph=graph):
continue
unresolved.append(
{
"target_ctx": target_ctx,
"chain": list(plan.build_segments),
}
)
if unresolved and len(unresolved) == len(admitted_candidates):
return [
self._blocker(
"chain_unresolvable",
identifier=identifier,
target_ctxs=target_ctxs,
include_candidates=True,
chains=unresolved,
)
]
return None
def _diagnose_blockers(
self,
*,
requirement: Requirement,
_ctx: VmPhaseCtx | None = None,
) -> list[Blocker]:
target_ctxs = self._target_contexts_for_requirement(requirement, _ctx=_ctx)
templates = list(self._iter_templates())
blockers = self._diagnose_identity_blockers(
requirement=requirement,
templates=templates,
target_ctxs=target_ctxs,
)
if blockers is not None:
return blockers
identity_candidates = self._identity_template_candidates(requirement, templates)
blockers = self._diagnose_scope_blockers(
requirement=requirement,
identity_candidates=identity_candidates,
target_ctxs=target_ctxs,
)
if blockers is not None:
return blockers
admitted_candidates = self._admitted_template_candidates(identity_candidates, target_ctxs)
blockers = self._diagnose_episode_scope_policy_blockers(
requirement=requirement,
admitted_candidates=admitted_candidates,
target_ctxs=target_ctxs,
_ctx=_ctx,
)
if blockers is not None:
return blockers
return [
self._blocker(
"no_template",
identifier=self._selector_identifier(requirement),
target_ctxs=target_ctxs,
include_candidates=True,
)
]
[docs]
def resolve_dependency(
self,
dependency: Dependency[PT],
*,
allow_stubs: bool = False,
_ctx: VmPhaseCtx | None = None,
) -> bool:
"""Resolve one dependency edge and bind its provider into the graph."""
preferred_offers = self._linked_affordance_offers(
requirement=dependency.requirement,
frontier=dependency.predecessor,
)
provider, selected_offer, _offers = self._resolve_requirement_offer(
requirement=dependency.requirement,
allow_stubs=allow_stubs,
preferred_offers=preferred_offers,
_ctx=_ctx,
)
if provider is None:
return False
if selected_offer is None:
dependency.requirement.resolution_reason = "provider_none"
return False
if selected_offer.policy & ProvisionPolicy.STUB:
stubs_allowed = self._stubs_allowed(allow_stubs=allow_stubs, _ctx=_ctx)
if not stubs_allowed:
raise AssertionError(
"Invariant violation: STUB offer selected while stubs are not allowed"
)
# Debug/preview mode: STUB linking intentionally bypasses requirement
# predicate fidelity to keep traversal running under illegal traversal.
if not isinstance(provider, RegistryAware):
raise TypeError(
"STUB provider must be RegistryAware for dependency linkage"
)
if provider.registry is not dependency.registry:
dependency.registry.add(provider, _ctx=_ctx)
dependency.requirement.provider_id = provider.uid
stamp_requirement_resolution(dependency.requirement, _ctx=_ctx)
dependency.requirement.resolution_reason = "stub_link_resolved"
if _ctx is not None:
_ctx.escalate_to_hard_dirty("stub_link_accepted", step_id=str(dependency.uid))
Edge.set_successor(dependency, provider, _ctx=_ctx)
return True
parent = None
try:
parent = self._execute_build_chain(
requirement=dependency.requirement,
offer=selected_offer,
graph=dependency.registry,
_ctx=_ctx,
)
self._bind_dependency_provider(
dependency=dependency,
provider=provider,
offer=selected_offer,
parent=parent,
_ctx=_ctx,
)
except (ValueError, TypeError, RuntimeError):
dependency.requirement.resolution_reason = "provider_rejected"
return False
if dependency.requirement.resolution_reason is None:
dependency.requirement.resolution_reason = "resolved"
return True
[docs]
def resolve_frontier_node(
self,
node: Node,
*,
allow_stubs: bool = False,
_ctx: VmPhaseCtx | None = None,
) -> bool:
"""Resolve all open dependencies on ``node`` and verify traversal viability."""
# Note this is not unsatisfied deps, it's anyone without a provider
# satisfied could mean not a hard req
open_deps = list(node.edges_out(Selector(has_kind=Dependency, provider=None)))
for dep in open_deps:
self.resolve_dependency(dep, allow_stubs=allow_stubs, _ctx=_ctx)
graph = getattr(node, "graph", None)
if not bool(getattr(graph, "frozen_shape", False)):
for fanout in list(node.edges_out(Selector(has_kind=Fanout))):
self.resolve_fanout(fanout, _ctx=_ctx)
# Find unsat blockers with no provider and a hard-requirement
unsatisfied_deps = node.edges_out(Selector(has_kind=Dependency, satisfied=False))
if next(unsatisfied_deps, None) is not None:
return False
# Containers must have a reachable sink from their source.
if isinstance(node, TraversableNode) and node.is_container:
source = node.source
sink = node.sink
if source is None or sink is None:
return False
ns = None
if _ctx is not None and hasattr(_ctx, "get_ns"):
ns = dict(_ctx.get_ns(source))
if not node.has_forward_progress(source, ns=ns):
return False
return True
# Register the resolution process with dispatch so it will be invoked from the phase bus
@on_provision
def provision_node(caller: Node, *, ctx, allow_stubs: bool = False):
Resolver.from_ctx(ctx).resolve_frontier_node(
node=caller,
allow_stubs=allow_stubs,
_ctx=ctx,
)