Source code for tangl.story.fabula.materializer

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Mapping
from uuid import UUID

from tangl.core import EntityTemplate, GraphFactory, GraphItem, Selector, TemplateRegistry
from tangl.media.media_creators.media_spec import MediaSpec
from tangl.media.media_resource import MediaDep
from tangl.vm.ctx import VmPhaseCtx
from tangl.vm.runtime.frame import PhaseCtx
from tangl.vm.provision.materialization import resolve_story_materialize_hook
from tangl.vm import (
    Affordance,
    Blocker,
    Dependency,
    Fanout,
    ProvisionOffer,
    ProvisionPolicy,
    Requirement,
    Resolver,
    TraversableNode,
    assert_traversal_contracts,
)
from tangl.vm.provision import MaterializeRole, attach_child, materialize_template_entity
from tangl.vm.provision.provisioner import _next_provision_uid

from ..concepts import Actor, Location, Role, Setting
from ..episode import Action, Block, MenuBlock, Scene
from ..story_graph import StoryGraph
from .types import (
    GraphInitializationError,
    InitMode,
    InitReport,
    ResolutionError,
    ResolutionFailureReason,
    StoryInitResult,
    UnresolvedDependency,
)


@dataclass(slots=True)
class _MaterializationState:
    graph: StoryGraph
    template_registry: TemplateRegistry
    report: InitReport
    entry_template_ids: list[str] = field(default_factory=list)
    source_map: dict[str, Any] = field(default_factory=dict)
    codec_state: dict[str, Any] = field(default_factory=dict)
    codec_id: str | None = None
    bundle_id: str | None = None

[docs] class StoryMaterializer: """StoryMaterializer() Story-policy helper for runtime graph wiring and compatibility delegation. Why ---- ``World.create_story(...)`` now owns graph creation directly by layering over :class:`~tangl.vm.TraversableGraphFactory`. ``StoryMaterializer`` remains as the focused helper that applies story-specific topology, eager prelink policy, and runtime materialization hooks on top of that lower-layer graph creation. Key Features ------------ * Wires story-specific topology onto an already-materialized graph. * Supports eager prelink/report passes once generic graph creation completes. * Preserves template-to-entity lineage for runtime scope lookup. * Finalizes scene contracts and wires node destinations after instantiation. * Uses vm resolver semantics instead of reimplementing provisioning logic. API --- - :meth:`create_story` delegates to a bound world for compatibility. - :meth:`make_state` builds story wiring state from direct runtime authorities. """ def story_materialize_template( self, template: EntityTemplate, _ctx: VmPhaseCtx | None = None, ) -> GraphItem: """Materialize one story template payload with resolver-stable uid rules.""" return template.materialize(uid=_next_provision_uid(_ctx=_ctx)) def story_post_materialize( self, *, template: EntityTemplate | None, entity: Any, role: MaterializeRole | str = MaterializeRole.PROVISION_LEAF, _ctx: VmPhaseCtx | None = None, ) -> None: """Finalize runtime story contracts for one newly attached entity.""" if isinstance(role, str): role = MaterializeRole(role) graph = entity.graph if isinstance(entity, GraphItem) else None if graph is None and _ctx is not None: graph = _ctx.graph if not isinstance(graph, StoryGraph): return if isinstance(template, EntityTemplate) and isinstance(entity, GraphItem): graph.record_runtime_template(entity, template) if not isinstance(entity, TraversableNode): return if graph.is_runtime_wired_node(entity): graph.wired_node_ids.add(entity.uid) return if isinstance(template, EntityTemplate) and self._template_is_container(template): self._ensure_runtime_container_entry( graph=graph, template=template, container=entity, _ctx=_ctx, ) state = self._runtime_state(graph=graph) self._run_runtime_topology_passes(nodes=[entity], state=state) def preview_requirement_contract( self, *, requirement: Requirement, offer: ProvisionOffer, graph: Any, _ctx: VmPhaseCtx | None = None, ) -> list[Blocker]: """Return story-level selected-path blockers for one preview offer.""" if not isinstance(graph, StoryGraph): return [] template = offer.candidate target_ctx = offer.target_ctx if not isinstance(template, EntityTemplate) or not isinstance(target_ctx, str) or not target_ctx: return [] blockers: list[Blocker] = [] build_segments = list(offer.build_plan or ()) parent_paths = self._parent_prefix_paths(target_ctx) missing_paths = parent_paths[-len(build_segments) :] if build_segments else [] for segment_path in missing_paths: segment_template = self._find_template_for_path(graph=graph, reference=segment_path) if not isinstance(segment_template, EntityTemplate): continue blockers.extend( self._preview_immediate_hard_dependencies( graph=graph, template=segment_template, request_ctx_path=segment_path, _ctx=_ctx, ) ) if blockers: return blockers blockers.extend( self._preview_immediate_hard_dependencies( graph=graph, template=template, request_ctx_path=target_ctx, _ctx=_ctx, ) ) return blockers
[docs] def create_story( self, *, story_label: str, init_mode: InitMode, freeze_shape: bool = False, world: object | None = None, ) -> StoryInitResult: if world is not None: create_story = getattr(world, "create_story", None) if callable(create_story): return create_story( story_label, init_mode=init_mode, freeze_shape=freeze_shape, ) msg = "StoryMaterializer.create_story now requires a World authority" raise ValueError(msg)
def make_state( self, *, graph: StoryGraph, mode: InitMode, template_registry: TemplateRegistry | None = None, entry_template_ids: list[str] | None = None, source_map: dict[str, Any] | None = None, codec_state: dict[str, Any] | None = None, codec_id: str | None = None, bundle_id: str | None = None, ) -> _MaterializationState: """Build a story wiring/prelink state from direct runtime authority.""" resolved_registry = template_registry or self._template_registry_for_graph(graph) if bundle_id is None: bundle_id = getattr(resolved_registry, "label", None) state = _MaterializationState( graph=graph, template_registry=resolved_registry, report=InitReport(mode=mode), entry_template_ids=list(entry_template_ids or []), source_map=dict(source_map or {}), codec_state=dict(codec_state or {}), codec_id=codec_id, bundle_id=bundle_id, ) return state def _run_topology_passes(self, *, state: _MaterializationState) -> None: nodes = self._unwired_traversable_nodes(state=state) self._finalize_scene_contracts(state=state) self._wire_role_and_setting_dependencies(nodes=nodes, state=state) self._wire_menu_fanouts(nodes=nodes, state=state) self._wire_block_actions(nodes=nodes, state=state) self._wire_media_dependencies(nodes=nodes, state=state) self._mark_nodes_wired(nodes=nodes, state=state) @staticmethod def _recount_materialized(*, state: _MaterializationState) -> None: state.report.materialized_counts.clear() for entity in state.graph.values(): if isinstance(entity, GraphItem): state.report.bump_materialized(entity.__class__.__name__) def _run_prelink_passes(self, *, state: _MaterializationState) -> None: if state.report.mode is not InitMode.EAGER: return dependencies = self._sorted_dependencies(state=state) self._prelink_dependencies(dependencies=dependencies, state=state) self._project_action_successors_from_dependencies(dependencies=dependencies) if state.graph.frozen_shape: fanouts = self._sorted_fanouts(state=state) self._prelink_fanouts(fanouts=fanouts, state=state) self._project_prelinked_menu_actions_for_menus( menus=self._sorted_menu_blocks(state=state), state=state, ) self._verify_prelinked_story_graph(state=state) self._raise_on_unresolved_hard_dependencies(state=state) @staticmethod def _build_story_init_result(*, state: _MaterializationState) -> StoryInitResult: graph = state.graph return StoryInitResult( graph=graph, report=state.report, entry_ids=graph.initial_cursor_ids, source_map=dict(state.source_map), codec_state=dict(state.codec_state), codec_id=state.codec_id, ) @staticmethod def _template_registry_for_graph(graph: StoryGraph) -> TemplateRegistry: registry = getattr(graph, "factory", None) if isinstance(registry, TemplateRegistry): return registry if isinstance(registry, GraphFactory) and isinstance(registry.templates, TemplateRegistry): return registry.templates return TemplateRegistry(label="story_runtime_templates") def _runtime_state(self, *, graph: StoryGraph, mode: InitMode = InitMode.LAZY) -> _MaterializationState: world = getattr(graph, "world", None) return self.make_state( graph=graph, mode=mode, template_registry=self._template_registry_for_graph(graph), entry_template_ids=list(getattr(world, "entry_template_ids", []) or []), source_map=dict(getattr(world, "source_map", {}) or {}), codec_state=dict(getattr(world, "codec_state", {}) or {}), codec_id=getattr(world, "codec_id", None), bundle_id=getattr(self._template_registry_for_graph(graph), "label", None), ) def _run_runtime_topology_passes( self, *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: unwired: list[TraversableNode] = [] for node in nodes: if state.graph.is_runtime_wired_node(node): state.graph.wired_node_ids.add(node.uid) continue unwired.append(node) if not unwired: return self._wire_role_and_setting_dependencies(nodes=unwired, state=state) self._wire_menu_fanouts(nodes=unwired, state=state) self._wire_block_actions(nodes=unwired, state=state) self._wire_media_dependencies(nodes=unwired, state=state) self._mark_nodes_wired(nodes=unwired, state=state) def _template_child_templates(self, template: EntityTemplate) -> list[EntityTemplate]: members = getattr(template, "members", None) if not callable(members): return [] children = [ member for member in members() if isinstance(member, EntityTemplate) and getattr(member, "parent", None) is template and member.has_payload_kind(TraversableNode) ] return sorted(children, key=self._template_depth) def _template_is_container(self, template: EntityTemplate) -> bool: return bool(self._template_child_templates(template)) def _entry_template_for_container(self, template: EntityTemplate) -> EntityTemplate | None: children = self._template_child_templates(template) if not children: return None for tag_name in ("start", "entry"): for child in children: if child.has_tags({tag_name}): return child for child in children: payload = getattr(child, "payload", None) child_locals = getattr(payload, "locals", None) or {} if isinstance(child_locals, dict) and ( child_locals.get("is_start") or child_locals.get("start_at") ): return child for child in children: label = child.get_label() short_label = label.rsplit(".", 1)[-1] if "." in label else label if short_label.lower() == "start": return child return children[0] @staticmethod def _parent_prefix_paths(path: str) -> list[str]: parts = [segment for segment in path.split(".") if segment] return [".".join(parts[: idx + 1]) for idx in range(len(parts) - 1)] def _find_template_for_path( self, *, graph: StoryGraph, reference: str, ) -> EntityTemplate | None: world = getattr(graph, "world", None) find_template = getattr(world, "find_template", None) if callable(find_template): found = find_template(reference) if isinstance(found, EntityTemplate): return found registry = self._template_registry_for_graph(graph) found = registry.find_one(Selector(has_identifier=reference)) if isinstance(found, EntityTemplate): return found found = registry.find_one(Selector(label=reference)) if isinstance(found, EntityTemplate): return found return None def _find_runtime_entity( self, *, graph: StoryGraph, reference: str, kind: type[GraphItem] | None = None, ) -> GraphItem | None: selectors = [ Selector(has_kind=kind, has_path=reference) if kind is not None else Selector(has_path=reference), Selector(has_kind=kind, has_identifier=reference) if kind is not None else Selector(has_identifier=reference), Selector(has_kind=kind, label=reference) if kind is not None else Selector(label=reference), ] for selector in selectors: found = graph.find_one(selector) if isinstance(found, GraphItem): return found template = self._find_template_for_path(graph=graph, reference=reference) if isinstance(template, EntityTemplate): selector = ( Selector(has_kind=kind, templ_hash=template.content_hash()) if kind is not None else Selector(templ_hash=template.content_hash()) ) found = graph.find_one(selector) if isinstance(found, GraphItem): return found return None def _find_existing_child_for_template( self, *, graph: StoryGraph, container: TraversableNode, template: EntityTemplate, ) -> TraversableNode | None: children = getattr(container, "children", None) if not callable(children): return None for child in children(): if not isinstance(child, TraversableNode): continue if getattr(child, "templ_hash", None) == template.content_hash(): return child path = getattr(child, "path", None) if isinstance(path, str) and path == template.get_label(): return child return None def _ensure_runtime_container_entry( self, *, graph: StoryGraph, template: EntityTemplate, container: TraversableNode, _ctx: VmPhaseCtx | None = None, ) -> TraversableNode | None: entry_template = self._entry_template_for_container(template) if entry_template is None: return None entry_node = self._find_existing_child_for_template( graph=graph, container=container, template=entry_template, ) if entry_node is None: story_materialize = resolve_story_materialize_hook(_ctx) entry_node = materialize_template_entity( entry_template, _ctx=_ctx, role=MaterializeRole.PROVISION_LEAF, story_materialize=story_materialize, ) if not isinstance(entry_node, TraversableNode): return None graph.add(entry_node, _ctx=_ctx) attach_child(container, entry_node) graph.record_runtime_template(entry_node, entry_template) container.finalize_container_contract() self.story_post_materialize( template=entry_template, entity=entry_node, role=MaterializeRole.PROVISION_LEAF, _ctx=_ctx, ) return entry_node def _make_preview_requirement_ctx( self, *, graph: StoryGraph, request_ctx_path: str, _ctx: VmPhaseCtx | None = None, ) -> PhaseCtx: if _ctx is not None: return _ctx.derive( cursor_id=None, graph=graph, meta_overrides={"request_ctx_path": request_ctx_path}, ) return PhaseCtx( graph=graph, cursor_id=None, meta={"request_ctx_path": request_ctx_path}, ) def _preview_immediate_hard_dependencies( self, *, graph: StoryGraph, template: EntityTemplate, request_ctx_path: str, _ctx: VmPhaseCtx | None = None, ) -> list[Blocker]: payload = getattr(template, "payload", None) if not isinstance(payload, TraversableNode): return [] ctx = self._make_preview_requirement_ctx( graph=graph, request_ctx_path=request_ctx_path, _ctx=_ctx, ) resolver = Resolver.from_ctx(ctx) blockers: list[Blocker] = [] for label, dep_requirement in self._iter_immediate_hard_requirements(payload): preview = resolver.preview_requirement(dep_requirement, _ctx=ctx) if preview.viable: continue blockers.append( Blocker( reason="immediate_dependency_unresolvable", context={ "target_ctx": request_ctx_path, "template": template.get_label(), "dependency_label": label, "dependency": self._serialize_selector(dep_requirement), "blockers": [ {"reason": blocker.reason, "context": dict(blocker.context or {})} for blocker in preview.blockers ], }, ) ) return blockers def _iter_immediate_hard_requirements( self, node: TraversableNode, ) -> list[tuple[str, Requirement]]: requirements: list[tuple[str, Requirement]] = [] if isinstance(node, (Scene, Block)): requirements.extend( self._requirements_from_specs( specs=node.roles, provider_kind=Actor, ref_key="actor_ref", templ_ref_key="actor_template_ref", ) ) requirements.extend( self._requirements_from_specs( specs=node.settings, provider_kind=Location, ref_key="location_ref", templ_ref_key="location_template_ref", ) ) if isinstance(node, Block): requirements.extend(self._media_requirements_from_specs(node=node)) return requirements def _requirements_from_specs( self, *, specs: list[dict[str, Any]], provider_kind: type[GraphItem], ref_key: str, templ_ref_key: str, ) -> list[tuple[str, Requirement]]: requirements: list[tuple[str, Requirement]] = [] for index, spec in enumerate(specs): if not isinstance(spec, dict) or not bool(spec.get("hard", True)): continue label = self._coerce_str(spec.get("label")) or f"dep_{index}" identifier = self._coerce_str(spec.get(ref_key) or spec.get(templ_ref_key)) requirement_kwargs: dict[str, Any] = { "has_kind": provider_kind, "provision_policy": self._resolve_policy( spec.get("policy") or spec.get("requirement_policy") ), "hard_requirement": True, } if identifier is not None: requirement_kwargs["has_identifier"] = identifier requirement_kwargs["authored_path"] = identifier requirement_kwargs["is_qualified"] = self._is_qualified_path(identifier) requirements.append((label, Requirement(**requirement_kwargs))) return requirements def _media_requirements_from_specs(self, *, node: Block) -> list[tuple[str, Requirement]]: requirements: list[tuple[str, Requirement]] = [] for index, spec in enumerate(node.media): if not isinstance(spec, dict) or not bool(spec.get("hard", False)): continue source_kind = self._media_source_kind(spec) label = self._coerce_str(spec.get("label")) or f"media_{node.get_label()}_{index}" payload: dict[str, Any] = { "label": label, "hard": True, "scope": self._coerce_str(spec.get("scope")), "media_role": self._coerce_str(spec.get("media_role")), } if source_kind == "inventory": payload["media_id"] = self._coerce_str(spec.get("name")) elif source_kind == "potential": payload["media_spec"] = spec.get("spec") else: continue requirement = MediaDep._pre_resolve(payload).get("requirement") if isinstance(requirement, Requirement): requirements.append((label, requirement)) return requirements @staticmethod def _template_depth(templ: Any) -> tuple[int, int, str]: depth = 0 current = getattr(templ, "parent", None) while current is not None: depth += 1 current = getattr(current, "parent", None) seq = getattr(templ, "seq", 0) label = templ.get_label() if hasattr(templ, "get_label") else "" return depth, seq, label def _resolve_entry_templates( self, *, template_registry: TemplateRegistry, entry_template_ids: list[str], ) -> list[Any]: templates = [] for identifier in entry_template_ids: templ = template_registry.find_one( Selector(has_identifier=identifier), ) if templ is None: templ = template_registry.find_one(Selector(label=identifier)) if templ is not None: templates.append(templ) return templates def _finalize_scene_contracts(self, *, state: _MaterializationState) -> None: for scene in Selector(has_kind=Scene).filter(state.graph.values()): scene.finalize_container_contract() def _unwired_traversable_nodes(self, *, state: _MaterializationState) -> list[TraversableNode]: nodes = sorted( Selector(has_kind=TraversableNode).filter(state.graph.values()), key=self._order_key, ) return [node for node in nodes if node.uid not in state.graph.wired_node_ids] def _wire_role_and_setting_dependencies( self, *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: for node in nodes: if isinstance(node, Scene): self._wire_dependencies_for_specs( source=node, specs=node.roles, dependency_kind=Role, provider_kind=Actor, ref_key="actor_ref", templ_ref_key="actor_template_ref", state=state, ) self._wire_dependencies_for_specs( source=node, specs=node.settings, dependency_kind=Setting, provider_kind=Location, ref_key="location_ref", templ_ref_key="location_template_ref", state=state, ) if isinstance(node, Block): self._wire_dependencies_for_specs( source=node, specs=node.roles, dependency_kind=Role, provider_kind=Actor, ref_key="actor_ref", templ_ref_key="actor_template_ref", state=state, ) self._wire_dependencies_for_specs( source=node, specs=node.settings, dependency_kind=Setting, provider_kind=Location, ref_key="location_ref", templ_ref_key="location_template_ref", state=state, ) def _wire_menu_fanouts( self, *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: for node in nodes: if isinstance(node, MenuBlock): self._wire_menu_fanout_for_block(node=node, state=state) def _wire_block_actions( self, *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: for node in nodes: if isinstance(node, Block): self._wire_actions_for_block(node=node, specs=node.redirects, state=state) self._wire_actions_for_block(node=node, specs=node.continues, state=state) self._wire_actions_for_block(node=node, specs=node.actions, state=state) def _wire_media_dependencies( self, *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: for node in nodes: if isinstance(node, Block): self._wire_media_for_block(node=node, state=state) @staticmethod def _mark_nodes_wired( *, nodes: list[TraversableNode], state: _MaterializationState, ) -> None: for node in nodes: state.graph.wired_node_ids.add(node.uid) def _wire_media_for_block( self, *, node: Block, state: _MaterializationState, ) -> None: for index, spec in enumerate(node.media): if not isinstance(spec, dict): continue source_kind = self._media_source_kind(spec) spec.setdefault("source_kind", source_kind) if source_kind == "potential": raw_spec = spec.get("spec") spec.setdefault("script_spec", raw_spec) spec.setdefault("realized_spec", None) spec.setdefault("final_spec", None) spec.setdefault("fallback_text", self._coerce_str(spec.get("text"))) try: media_spec = MediaSpec.from_authoring(raw_spec) except (TypeError, ValueError) as exc: spec["spec_error"] = str(exc) state.report.warnings.append( f"Skipped inline media spec on block {node.get_label()!r}: {exc}" ) continue dep = MediaDep( registry=state.graph, label=self._coerce_str(spec.get("label")) or f"media_{node.get_label()}_{index}", predecessor_id=node.uid, media_spec=media_spec, media_role=self._coerce_str(spec.get("media_role")), caption=self._coerce_str(spec.get("text") or spec.get("caption")), scope=self._coerce_str(spec.get("scope")) or "story", hard=bool(spec.get("hard", False)), script_spec=dict(raw_spec) if isinstance(raw_spec, dict) else None, ) spec["dependency_id"] = dep.uid continue if source_kind != "inventory": continue media_id = self._coerce_str(spec.get("name")) if not media_id: continue dep = MediaDep( registry=state.graph, label=self._coerce_str(spec.get("label")) or f"media_{node.get_label()}_{index}", predecessor_id=node.uid, media_id=media_id, media_role=self._coerce_str(spec.get("media_role")), caption=self._coerce_str(spec.get("text") or spec.get("caption")), scope=self._coerce_str(spec.get("scope")), hard=bool(spec.get("hard", False)), ) spec["dependency_id"] = dep.uid spec.setdefault("fallback_text", self._coerce_str(spec.get("text"))) def _wire_actions_for_block( self, *, node: Block, specs: list[dict[str, Any]], state: _MaterializationState, ) -> None: for index, spec in enumerate(specs): authored_successor_ref = self._coerce_str(spec.get("authored_successor_ref")) successor_ref = self._coerce_str(spec.get("successor_ref")) successor_is_absolute = bool(spec.get("successor_is_absolute", False)) if successor_ref is None: successor_ref = self._coerce_str( spec.get("successor") or spec.get("next") or spec.get("target_ref") or spec.get("target_node") ) if not successor_ref: msg = ( f"Block '{node.get_label()}' action[{index}] is missing successor " "(expected one of: successor, next, successor_ref, target_ref, target_node)" ) raise ValueError(msg) activation = self._coerce_str(spec.get("trigger") or spec.get("activation")) trigger_phase = Action.trigger_phase_from_activation(activation) # Authored ``conditions``/``effects`` on actions, continues, and # redirects must reach the edge as availability predicates and # effects; without this they are silently dropped and the edge # gates/mutates nothing. def _as_exprs(value: Any) -> list[dict[str, Any]]: if not isinstance(value, list): return [] out: list[dict[str, Any]] = [] for item in value: if isinstance(item, str): out.append({"expr": item}) elif isinstance(item, dict): out.append(dict(item)) return out action = Action( registry=state.graph, label=spec.get("label") or f"action_{node.label}_{index}", predecessor_id=node.uid, text=self._coerce_str(spec.get("text") or spec.get("content") or spec.get("label")) or "", successor_ref=successor_ref, activation=activation, predicate=self._coerce_str(spec.get("predicate")), availability=[ *_as_exprs(spec.get("availability")), *_as_exprs(spec.get("conditions")), ], effects=_as_exprs(spec.get("effects")), payload=spec.get("payload"), accepts=spec.get("accepts") or spec.get("payload_schema"), ui_hints=( spec.get("ui_hints") or spec.get("ui_hint") or spec.get("hints") or spec.get("presentation_hints") ), trigger_phase=trigger_phase, ) target = self._find_runtime_entity( graph=state.graph, reference=successor_ref, kind=TraversableNode, ) if isinstance(target, TraversableNode): action.set_successor(target) continue requirement = Requirement( has_kind=TraversableNode, has_identifier=successor_ref, authored_path=authored_successor_ref or successor_ref, is_qualified=self._is_qualified_path(authored_successor_ref or successor_ref), is_absolute=successor_is_absolute, provision_policy=ProvisionPolicy.ANY, hard_requirement=True, ) if state.report.mode is InitMode.LAZY: self._validate_lazy_destination( state=state, source=node, action=action, authored_ref=authored_successor_ref or successor_ref, canonical_ref=successor_ref, requirement=requirement, ) Dependency( registry=state.graph, label="destination", predecessor_id=action.uid, requirement=requirement, ) if state.report.mode is InitMode.LAZY: state.report.warnings.append( f"{state.report.mode.value.upper()} init left action destination unresolved; " f"action={action.get_label()!r}, expected={successor_ref!r}" ) def _wire_menu_fanout_for_block( self, *, node: MenuBlock, state: _MaterializationState, ) -> None: for index, selector_spec in enumerate(MenuBlock.normalize_menu_selectors(node.menu_items)): requirement_data = dict(selector_spec) requirement_data.setdefault("has_kind", TraversableNode) requirement = Requirement( hard_requirement=False, **requirement_data, ) Fanout( registry=state.graph, label=f"fanout_{node.get_label()}_{index}", predecessor_id=node.uid, requirement=requirement, tags={"dynamic", "fanout", "menu"}, ) def _wire_dependencies_for_specs( self, *, source: TraversableNode, specs: list[dict[str, Any]], dependency_kind: type[Dependency], provider_kind: type[GraphItem], ref_key: str, templ_ref_key: str, state: _MaterializationState, ) -> None: for index, spec in enumerate(specs): label = self._coerce_str(spec.get("label")) or f"dep_{index}" identifier = self._coerce_str(spec.get(ref_key) or spec.get(templ_ref_key)) policy = self._resolve_policy(spec.get("policy") or spec.get("requirement_policy")) hard = bool(spec.get("hard", True)) requirement_kwargs: dict[str, Any] = { "has_kind": provider_kind, "provision_policy": policy, "hard_requirement": hard, } if identifier is not None: requirement_kwargs["has_identifier"] = identifier requirement_kwargs["authored_path"] = identifier requirement_kwargs["is_qualified"] = self._is_qualified_path(identifier) requirement = Requirement(**requirement_kwargs) dep = dependency_kind( registry=state.graph, label=label, predecessor_id=source.uid, requirement=requirement, ) if identifier: candidate = self._find_runtime_entity( graph=state.graph, reference=identifier, kind=provider_kind, ) if isinstance(candidate, provider_kind): dep.set_provider(candidate) @staticmethod def _media_source_kind(spec: Mapping[str, Any]) -> str: if spec.get("url") is not None: return "url" if spec.get("data") is not None: return "data" if spec.get("name"): return "inventory" if spec.get("spec") is not None: return "potential" return "legacy" def _prelink_dependencies( self, *, dependencies: list[Dependency], state: _MaterializationState, ) -> None: for dep in dependencies: ctx = self._make_prelink_ctx(state=state, cursor_id=dep.predecessor_id) resolver = Resolver.from_ctx(ctx) was_satisfied = dep.satisfied resolved = resolver.resolve_dependency(dep, allow_stubs=False, _ctx=ctx) if resolved and dep.satisfied: if not was_satisfied: state.report.bump_prelinked("resolved") else: state.report.bump_prelinked("unresolved") unresolved = UnresolvedDependency( dependency_id=dep.uid, source_id=dep.predecessor_id, label=dep.label, identifier=self._requirement_identifier(dep.requirement), hard_requirement=dep.requirement.hard_requirement, ) if dep.requirement.hard_requirement: state.report.unresolved_hard.append(unresolved) else: state.report.unresolved_soft.append(unresolved) def _project_action_successors_from_dependencies( self, *, dependencies: list[Dependency], ) -> None: for dep in dependencies: predecessor = dep.predecessor if isinstance(predecessor, Action) and isinstance(dep.successor, TraversableNode): predecessor.set_successor(dep.successor) def _prelink_fanouts( self, *, fanouts: list[Fanout], state: _MaterializationState, ) -> None: for fanout in fanouts: ctx = self._make_prelink_ctx(state=state, cursor_id=fanout.predecessor_id) resolver = Resolver.from_ctx(ctx) resolver.resolve_fanout(fanout, _ctx=ctx) state.report.bump_prelinked("fanout_resolved") def _project_prelinked_menu_actions_for_menus( self, *, menus: list[MenuBlock], state: _MaterializationState, ) -> None: for menu in menus: self._project_prelinked_menu_actions(menu=menu, state=state) @staticmethod def _verify_prelinked_story_graph(*, state: _MaterializationState) -> None: assert_traversal_contracts(state.graph) @staticmethod def _raise_on_unresolved_hard_dependencies(*, state: _MaterializationState) -> None: if state.report.unresolved_hard: raise GraphInitializationError(state.report) def _sorted_dependencies(self, *, state: _MaterializationState) -> list[Dependency]: return sorted( Selector(has_kind=Dependency).filter(state.graph.values()), key=self._order_key, ) def _sorted_fanouts(self, *, state: _MaterializationState) -> list[Fanout]: return sorted( Selector(has_kind=Fanout).filter(state.graph.values()), key=self._order_key, ) def _sorted_menu_blocks(self, *, state: _MaterializationState) -> list[MenuBlock]: return sorted( Selector(has_kind=MenuBlock).filter(state.graph.values()), key=self._order_key, ) @staticmethod def _make_prelink_ctx( *, state: _MaterializationState, cursor_id: UUID | None, ) -> PhaseCtx: return PhaseCtx( graph=state.graph, cursor_id=cursor_id, ) def _project_prelinked_menu_actions( self, *, menu: MenuBlock, state: _MaterializationState, ) -> None: graph = state.graph for edge in list(menu.edges_out(Selector(has_kind=Action, trigger_phase=None))): tags = getattr(edge, "tags", set()) or set() if {"dynamic", "fanout", "menu"}.issubset(tags): graph.remove(edge.uid) affordances = [ affordance for affordance in menu.edges_out(Selector(has_kind=Affordance)) if {"dynamic", "fanout"}.issubset(getattr(affordance, "tags", set()) or set()) ] for index, affordance in enumerate(affordances): provider = affordance.successor or affordance.provider if provider is None: continue Action( registry=graph, label=f"menu_{menu.get_label()}_{index}", predecessor_id=menu.uid, successor_id=provider.uid, text=MenuBlock.action_text_for(provider), tags={"dynamic", "fanout", "menu"}, ) @staticmethod def _coerce_str(value: Any) -> str | None: if value is None: return None return str(value) @staticmethod def _order_key(item: GraphItem) -> tuple[int, int, str, str, str]: """Return deterministic ordering key for graph items with optional seq.""" seq = getattr(item, "seq", None) has_seq = isinstance(seq, int) label = item.get_label() if hasattr(item, "get_label") else "" return ( 0 if has_seq else 1, int(seq) if has_seq else 0, item.__class__.__name__, str(label), str(item.uid), ) @staticmethod def _resolve_policy(value: Any) -> ProvisionPolicy: if isinstance(value, ProvisionPolicy): return value if isinstance(value, str): key = value.upper() if key in ProvisionPolicy.__members__: return ProvisionPolicy[key] return ProvisionPolicy.ANY @staticmethod def _requirement_identifier(requirement: Requirement) -> str | None: extra = requirement.__pydantic_extra__ or {} value = extra.get("has_identifier") return str(value) if value is not None else None def _validate_lazy_destination( self, *, state: _MaterializationState, source: TraversableNode, action: Action, authored_ref: str | None, canonical_ref: str, requirement: Requirement, ) -> None: ctx = PhaseCtx( graph=state.graph, cursor_id=source.uid, ) resolver = Resolver.from_ctx(ctx) offers = resolver.inspect_template_dependency_offers(requirement, _ctx=ctx) candidates = self._unique_template_candidates(offers) if len(candidates) == 1: return reason = ( ResolutionFailureReason.NO_TEMPLATE if len(candidates) == 0 else ResolutionFailureReason.AMBIGUOUS_TEMPLATE ) raise ResolutionError( source_node_id=source.uid, source_node_label=source.get_label(), action_id=action.uid, action_label=action.get_label(), authored_ref=authored_ref, canonical_ref=canonical_ref, reason=reason, selector=self._serialize_selector(requirement), world_id=self._world_id(state), bundle_id=self._bundle_id(state), ) @staticmethod def _unique_template_candidates(offers: list[ProvisionOffer]) -> list[EntityTemplate]: candidates: list[EntityTemplate] = [] seen: set[UUID] = set() for offer in offers: candidate = offer.candidate if not isinstance(candidate, EntityTemplate): continue candidate_uid = candidate.uid if candidate_uid in seen: continue seen.add(candidate_uid) candidates.append(candidate) return candidates @staticmethod def _serialize_selector(requirement: Requirement) -> dict[str, Any]: selector: dict[str, Any] = {} extra = requirement.__pydantic_extra__ or {} has_kind = extra.get("has_kind") if isinstance(has_kind, type): selector["has_kind"] = f"{has_kind.__module__}.{has_kind.__name__}" elif has_kind is not None: selector["has_kind"] = str(has_kind) has_identifier = extra.get("has_identifier") if has_identifier is not None: selector["has_identifier"] = str(has_identifier) has_tags = extra.get("has_tags") if isinstance(has_tags, (set, tuple, list)): selector["has_tags"] = [str(tag) for tag in has_tags] elif has_tags is not None: selector["has_tags"] = str(has_tags) selector["authored_path"] = requirement.authored_path selector["is_qualified"] = requirement.is_qualified selector["is_absolute"] = requirement.is_absolute selector["provision_policy"] = requirement.provision_policy.name return selector @staticmethod def _world_id(state: _MaterializationState) -> str | None: world = getattr(state.graph, "world", None) label = getattr(world, "label", None) if isinstance(label, str) and label: return label return None @staticmethod def _bundle_id(state: _MaterializationState) -> str | None: label = state.bundle_id if isinstance(label, str) and label: return label return None @staticmethod def _qualify_successor_ref(*, successor_ref: str | None, source: TraversableNode) -> str | None: # Legacy helper retained while other non-action callers migrate. # Compiler canonicalization is authoritative for action destinations. if not successor_ref: return None if StoryMaterializer._is_qualified_path(successor_ref): return successor_ref parent = source.parent parent_label = getattr(parent, "label", None) if parent_label: return f"{parent_label}.{successor_ref}" return successor_ref @staticmethod def _is_qualified_path(path: str | None) -> bool: return isinstance(path, str) and "." in path