Source code for tangl.story.fabula.compiler

from __future__ import annotations

from dataclasses import dataclass, field
from importlib import import_module
from typing import Any

from tangl.core import Entity, EntityTemplate, Selector, TemplateRegistry
from tangl.core.template import TemplateGroup
from tangl.ir.story_ir import StoryScript
from tangl.vm import TraversableNode

from ..concepts import Actor, Location
from ..episode import Action, Block, MenuBlock, Scene
from .types import AuthoredRef, CompileIssue, CompileSeverity, JsonValue


ISSUE_DUPLICATE_LABEL = "compile:duplicate_label"
ISSUE_DANGLING_SUCCESSOR_REF = "compile:dangling_successor_ref"
ISSUE_DANGLING_ACTOR_REF = "compile:dangling_actor_ref"
ISSUE_DANGLING_LOCATION_REF = "compile:dangling_location_ref"
ISSUE_EMPTY_ENTRY_RESOLUTION = "compile:empty_entry_resolution"

# Allowed ``details`` keys per issue code. Keep this close to the compiler
# helpers so the JSON-like payload shape stays explicit and testable.
_COMPILE_ISSUE_DETAIL_KEYS: dict[str, tuple[str, ...]] = {
    ISSUE_DUPLICATE_LABEL: ("normalized_label", "occurrences"),
    ISSUE_DANGLING_SUCCESSOR_REF: ("field", "authored_ref", "canonical_ref"),
    ISSUE_DANGLING_ACTOR_REF: ("reference_key", "missing_ref"),
    ISSUE_DANGLING_LOCATION_REF: ("reference_key", "missing_ref"),
    ISSUE_EMPTY_ENTRY_RESOLUTION: ("requested_entry_ids", "resolution_strategy"),
}


@dataclass(slots=True)
class _DeclaredTemplate:
    template_label: str
    payload_label: str | None
    payload_kind: type[Entity]
    source_ref: AuthoredRef | None


@dataclass(slots=True)
class _PendingDiagnostic:
    code: str
    subject_label: str | None
    source_ref: AuthoredRef | None
    related_identifiers: list[str] = field(default_factory=list)
    details: dict[str, JsonValue] = field(default_factory=dict)


@dataclass(slots=True)
class _CompileCollector:
    """Compiler-internal declaration and reference tracker.

    This collector is intentionally ephemeral. It exists only during
    compilation so validation can reason about normalized labels before lookup
    behavior hides collisions. Only ``CompileIssue`` records persist on the
    returned bundle.
    """

    default_source_path: str | None = None
    default_story_key: str | None = None
    declarations: list[_DeclaredTemplate] = field(default_factory=list)
    declarations_by_template_label: dict[str, list[_DeclaredTemplate]] = field(default_factory=dict)
    pending: list[_PendingDiagnostic] = field(default_factory=list)

    @classmethod
    def from_source_map(cls, source_map: dict[str, Any] | None) -> "_CompileCollector":
        if not isinstance(source_map, dict):
            return cls()
        refs = source_map.get("__source_files__")
        if isinstance(refs, list):
            if len(refs) != 1:
                return cls()
            first = refs[0]
        elif refs is not None:
            first = refs
        else:
            return cls()
        if isinstance(first, dict):
            return cls(
                default_source_path=_coerce_text(first.get("path")),
                default_story_key=_coerce_text(first.get("story_key")),
            )
        return cls(
            default_source_path=_coerce_text(getattr(first, "path", None)),
            default_story_key=_coerce_text(getattr(first, "story_key", None)),
        )

    def build_source_ref(
        self,
        *,
        authored_path: str | None,
        label: str | None,
        note: str | None = None,
    ) -> AuthoredRef | None:
        source_ref = AuthoredRef(
            path=self.default_source_path,
            story_key=self.default_story_key,
            authored_path=authored_path,
            label=label,
            note=note,
        )
        if any(
            (
                source_ref.path,
                source_ref.story_key,
                source_ref.authored_path,
                source_ref.label,
                source_ref.note,
            )
        ):
            return source_ref
        return None

    def add_declaration(
        self,
        *,
        template_label: str,
        payload: Entity,
        authored_path: str,
    ) -> None:
        declared = _DeclaredTemplate(
            template_label=template_label,
            payload_label=_coerce_text(payload.get_label() if hasattr(payload, "get_label") else None),
            payload_kind=payload.__class__,
            source_ref=self.build_source_ref(
                authored_path=authored_path,
                label=template_label,
            ),
        )
        self.declarations.append(declared)
        self.declarations_by_template_label.setdefault(template_label, []).append(declared)

    def add_pending(
        self,
        *,
        code: str,
        subject_label: str | None,
        authored_path: str,
        related_identifiers: list[str] | None = None,
        details: dict[str, JsonValue] | None = None,
    ) -> None:
        self.pending.append(
            _PendingDiagnostic(
                code=code,
                subject_label=subject_label,
                source_ref=self.build_source_ref(
                    authored_path=authored_path,
                    label=subject_label,
                ),
                related_identifiers=list(related_identifiers or []),
                details=_sanitize_issue_details(code, details or {}),
            )
        )

    def has_candidate(self, identifier: str, *, kind: type[Entity]) -> bool:
        for declaration in self.declarations:
            if not issubclass(declaration.payload_kind, kind):
                continue
            if identifier == declaration.template_label:
                return True
            if declaration.payload_label and identifier == declaration.payload_label:
                return True
        return False

    def build_issues(
        self,
        *,
        story_label: str,
        entry_template_ids: list[str],
        resolution_strategy: str,
    ) -> list[CompileIssue]:
        issues: list[CompileIssue] = []
        issues.extend(self._build_duplicate_issues())
        issues.extend(self._build_pending_issues())
        entry_issue = self._build_entry_issue(
            story_label=story_label,
            entry_template_ids=entry_template_ids,
            resolution_strategy=resolution_strategy,
        )
        if entry_issue is not None:
            issues.append(entry_issue)
        return sorted(issues, key=_compile_issue_sort_key)

    def _build_duplicate_issues(self) -> list[CompileIssue]:
        issues: list[CompileIssue] = []
        for template_label, declarations in self.declarations_by_template_label.items():
            if len(declarations) < 2:
                continue
            occurrences = [
                source_ref.authored_path
                for declaration in declarations
                if (source_ref := declaration.source_ref) is not None and source_ref.authored_path
            ]
            issue = CompileIssue(
                code=ISSUE_DUPLICATE_LABEL,
                severity=CompileSeverity.ERROR,
                message=(
                    f"Compile label {template_label!r} was declared more than once "
                    "in the compiled bundle namespace."
                ),
                subject_label=template_label,
                source_ref=declarations[0].source_ref,
                details=_sanitize_issue_details(
                    ISSUE_DUPLICATE_LABEL,
                    {
                    "normalized_label": template_label,
                    "occurrences": occurrences,
                    },
                ),
            )
            issues.append(issue)
        return issues

    def _build_pending_issues(self) -> list[CompileIssue]:
        issues: list[CompileIssue] = []
        for pending in self.pending:
            missing_identifier = next(iter(pending.related_identifiers), None)
            if missing_identifier is None:
                continue
            required_kind = _required_kind_for_issue_code(pending.code)
            if self.has_candidate(missing_identifier, kind=required_kind):
                continue
            issues.append(
                CompileIssue(
                    code=pending.code,
                    severity=CompileSeverity.ERROR,
                    message=_message_for_pending_issue(
                        code=pending.code,
                        subject_label=pending.subject_label,
                        identifier=missing_identifier,
                    ),
                    subject_label=pending.subject_label,
                    source_ref=pending.source_ref,
                    related_identifiers=list(pending.related_identifiers),
                    details=dict(pending.details),
                )
            )
        return issues

    def _build_entry_issue(
        self,
        *,
        story_label: str,
        entry_template_ids: list[str],
        resolution_strategy: str,
    ) -> CompileIssue | None:
        if entry_template_ids and any(
            self.has_candidate(identifier, kind=TraversableNode)
            for identifier in entry_template_ids
        ):
            return None

        authored_path = "metadata.start_at" if resolution_strategy == "metadata.start_at" else "metadata"
        return CompileIssue(
            code=ISSUE_EMPTY_ENTRY_RESOLUTION,
            severity=CompileSeverity.ERROR,
            message=(
                f"Story {story_label!r} resolved no usable entry templates during compile "
                "normalization."
            ),
            subject_label=story_label,
            source_ref=self.build_source_ref(
                authored_path=authored_path,
                label=story_label,
            ),
            details=_sanitize_issue_details(
                ISSUE_EMPTY_ENTRY_RESOLUTION,
                {
                    "requested_entry_ids": list(entry_template_ids),
                    "resolution_strategy": resolution_strategy,
                },
            ),
        )


def _compile_issue_sort_key(issue: CompileIssue) -> tuple[str, str, str, str, str]:
    source_ref = issue.source_ref
    return (
        _coerce_text(source_ref.path if source_ref is not None else None) or "",
        _coerce_text(source_ref.authored_path if source_ref is not None else None) or "",
        issue.subject_label or "",
        issue.code,
        issue.message,
    )


def _message_for_pending_issue(*, code: str, subject_label: str | None, identifier: str) -> str:
    subject = subject_label or "unknown"
    if code == ISSUE_DANGLING_SUCCESSOR_REF:
        return f"Successor reference {identifier!r} from {subject!r} does not resolve in this bundle."
    if code == ISSUE_DANGLING_ACTOR_REF:
        return f"Actor reference {identifier!r} from {subject!r} does not resolve in this bundle."
    if code == ISSUE_DANGLING_LOCATION_REF:
        return f"Location reference {identifier!r} from {subject!r} does not resolve in this bundle."
    return f"Compile diagnostic {code!r} for {subject!r} references {identifier!r}."


def _required_kind_for_issue_code(code: str) -> type[Entity]:
    if code == ISSUE_DANGLING_SUCCESSOR_REF:
        return TraversableNode
    if code == ISSUE_DANGLING_ACTOR_REF:
        return Actor
    if code == ISSUE_DANGLING_LOCATION_REF:
        return Location
    return Entity


def _sanitize_issue_details(code: str, details: dict[str, JsonValue]) -> dict[str, JsonValue]:
    allowed_keys = _COMPILE_ISSUE_DETAIL_KEYS.get(code)
    if not allowed_keys:
        return dict(details)
    return {key: details[key] for key in allowed_keys if key in details}


def _authored_item_path(prefix: str, index: int, label: str | None) -> str:
    if label:
        return f"{prefix}[{index}].{label}"
    return f"{prefix}[{index}]"


def _coerce_text(value: Any) -> str | None:
    if value is None:
        return None
    text = str(value)
    return text or None


[docs] @dataclass(slots=True) class StoryTemplateBundle: """StoryTemplateBundle() Canonical output of :class:`StoryCompiler`. Why ---- Separating compilation from materialization lets one validated script bundle produce many independent story graphs without reparsing authored input. Key Features ------------ * Carries the validated :class:`~tangl.core.TemplateRegistry` tree used by the materializer. * Preserves ``metadata``, story ``locals``, source mapping, and codec state alongside the template hierarchy. * Records ``entry_template_ids`` so materialization can resolve the graph's initial cursor positions deterministically. * Stores structured compiler diagnostics for cheap bundle-local integrity problems without forcing materialization first. API --- - :attr:`metadata` stores story-level metadata used by runtime setup. - :attr:`locals` stores authored top-level namespace values. - :attr:`template_registry` contains the validated template hierarchy. - :attr:`entry_template_ids` lists the template ids used for initial cursor resolution. - :attr:`issues` stores structured compile diagnostics for later inspection. - :attr:`source_map`, :attr:`codec_state`, and :attr:`codec_id` preserve compile-time provenance and codec context. """ metadata: dict[str, Any] locals: dict[str, Any] template_registry: TemplateRegistry entry_template_ids: list[str] source_map: dict[str, Any] codec_state: dict[str, Any] codec_id: str | None issues: list[CompileIssue] = field(default_factory=list)
[docs] class StoryCompiler: """StoryCompiler() Validate and normalize authored story script data into a :class:`StoryTemplateBundle`. Why ---- Authored story scripts are intentionally lightweight. The compiler turns that loose authoring shape into a typed, scoped template tree that runtime materialization and provisioning can trust. Key Features ------------ * Accepts raw dicts or validated :class:`~tangl.ir.story_ir.StoryScript` instances. * Builds scene and block template hierarchy used by runtime scope matching. * Canonicalizes action references so authored shorthand and qualified references resolve into a stable form. * Attempts to resolve authored ``kind`` references during compilation when an override cannot be imported. API --- - :meth:`compile` is the supported public entry point. """ @staticmethod def validate_ir(script_data: dict[str, Any]) -> StoryScript: """Validate raw script data against the near-native IR schema. Use this when authored near-native YAML should be linted explicitly. Compilation itself accepts runtime-ready dicts directly so codecs are not forced through the at-rest IR model. """ return StoryScript.model_validate(script_data)
[docs] def compile( self, script_data: dict[str, Any] | StoryScript, *, source_map: dict[str, Any] | None = None, codec_state: dict[str, Any] | None = None, codec_id: str | None = None, ) -> StoryTemplateBundle: """Compile authored story data into a reusable template bundle. Accepts raw script dictionaries or validated :class:`~tangl.ir.story_ir.StoryScript` objects. Raw dicts are compiled directly. Use :meth:`validate_ir` separately when authored near-native data should be linted against the IR schema. """ if isinstance(script_data, StoryScript): data = script_data.model_dump(by_alias=True, exclude_none=True) label = script_data.label else: data = dict(script_data) label = str(data.get("label") or "story") metadata = dict(data.get("metadata") or {}) locals_ns = dict(data.get("globals") or data.get("locals") or {}) collector = _CompileCollector.from_source_map(source_map) registry = TemplateRegistry(label=f"{label}_templates") root = TemplateGroup( label=label, payload=Entity(label=label), registry=registry, ) self._compile_section( parent=root, items=data.get("templates"), fallback_kind=TraversableNode, collector=collector, authored_path_prefix="templates", ) self._compile_section( parent=root, items=data.get("actors"), fallback_kind=Actor, collector=collector, authored_path_prefix="actors", ) self._compile_section( parent=root, items=data.get("locations"), fallback_kind=Location, collector=collector, authored_path_prefix="locations", ) scenes = self._normalize_mapping(data.get("scenes")) root_scene_labels = {scene_label for _scene_index, scene_label, _scene_data in scenes} for scene_index, scene_label, scene_data in scenes: scene_authored_path = _authored_item_path("scenes", scene_index, scene_label) scene_payload = self._build_payload( kind=self._resolve_kind( scene_data.get("kind"), fallback=Scene, ), payload={ **scene_data, "label": scene_data.get("label") or scene_label, "title": scene_data.get("title") or scene_data.get("text") or "", "roles": self._normalize_list(scene_data.get("roles")), "settings": self._normalize_list(scene_data.get("settings")), }, default_label=scene_label, ) collector.add_declaration( template_label=scene_label, payload=scene_payload, authored_path=scene_authored_path, ) scene_templ = TemplateGroup( label=scene_label, payload=scene_payload, registry=registry, ) root.add_child(scene_templ) self._collect_provider_ref_issues( collector=collector, specs=scene_payload.roles, source_label=scene_label, authored_path_prefix=f"{scene_authored_path}.roles", field_name="roles", issue_code=ISSUE_DANGLING_ACTOR_REF, reference_keys=("actor_ref", "actor_template_ref"), ) self._collect_provider_ref_issues( collector=collector, specs=scene_payload.settings, source_label=scene_label, authored_path_prefix=f"{scene_authored_path}.settings", field_name="settings", issue_code=ISSUE_DANGLING_LOCATION_REF, reference_keys=("location_ref", "location_template_ref"), ) self._compile_section( parent=scene_templ, items=scene_data.get("templates"), fallback_kind=TraversableNode, collector=collector, authored_path_prefix=f"{scene_authored_path}.templates", ) blocks = self._normalize_mapping(scene_data.get("blocks")) for block_index, block_label, block_data in blocks: block_authored_path = _authored_item_path( f"{scene_authored_path}.blocks", block_index, block_label, ) qualified_label = f"{scene_label}.{block_label}" actions = self._canonicalize_action_specs( self._normalize_list(block_data.get("actions")), scene_label=scene_label, root_scene_labels=root_scene_labels, ) continues = self._canonicalize_action_specs( self._normalize_list(block_data.get("continues")), scene_label=scene_label, root_scene_labels=root_scene_labels, ) redirects = self._canonicalize_action_specs( self._normalize_list(block_data.get("redirects")), scene_label=scene_label, root_scene_labels=root_scene_labels, ) next_qualified = self._next_block_label(blocks, block_index, scene_label) for spec_list in (actions, continues, redirects): for spec in spec_list: if not spec.get("successor_ref") and next_qualified is not None: spec["successor_ref"] = next_qualified spec["successor_is_absolute"] = False spec["successor_is_inferred"] = True self._collect_successor_issues( collector=collector, specs=actions, source_label=qualified_label, authored_path_prefix=f"{block_authored_path}.actions", field_name="actions", ) self._collect_successor_issues( collector=collector, specs=continues, source_label=qualified_label, authored_path_prefix=f"{block_authored_path}.continues", field_name="continues", ) self._collect_successor_issues( collector=collector, specs=redirects, source_label=qualified_label, authored_path_prefix=f"{block_authored_path}.redirects", field_name="redirects", ) block_payload = self._build_payload( kind=self._resolve_kind( block_data.get("kind") or block_data.get("block_cls"), fallback=Block, ), payload={ **block_data, "label": block_data.get("label") or block_label, "actions": actions, "continues": continues, "redirects": redirects, "roles": self._normalize_list(block_data.get("roles")), "settings": self._normalize_list(block_data.get("settings")), "media": self._normalize_list(block_data.get("media")), }, default_label=block_label, ) collector.add_declaration( template_label=qualified_label, payload=block_payload, authored_path=block_authored_path, ) block_templ = TemplateGroup( label=qualified_label, payload=block_payload, registry=registry, ) scene_templ.add_child(block_templ) self._collect_provider_ref_issues( collector=collector, specs=block_payload.roles, source_label=qualified_label, authored_path_prefix=f"{block_authored_path}.roles", field_name="roles", issue_code=ISSUE_DANGLING_ACTOR_REF, reference_keys=("actor_ref", "actor_template_ref"), ) self._collect_provider_ref_issues( collector=collector, specs=block_payload.settings, source_label=qualified_label, authored_path_prefix=f"{block_authored_path}.settings", field_name="settings", issue_code=ISSUE_DANGLING_LOCATION_REF, reference_keys=("location_ref", "location_template_ref"), ) self._compile_section( parent=block_templ, items=block_data.get("templates"), fallback_kind=TraversableNode, collector=collector, authored_path_prefix=f"{block_authored_path}.templates", ) entry_template_ids, resolution_strategy = self._resolve_entry_template_ids( metadata=metadata, registry=registry, ) issues = collector.build_issues( story_label=label, entry_template_ids=entry_template_ids, resolution_strategy=resolution_strategy, ) return StoryTemplateBundle( metadata=metadata, locals=locals_ns, template_registry=registry, entry_template_ids=entry_template_ids, issues=issues, source_map=source_map or {}, codec_state=codec_state or {}, codec_id=codec_id, )
def _compile_section( self, *, parent: TemplateGroup, items: Any, fallback_kind: type[Entity], collector: _CompileCollector, authored_path_prefix: str, ) -> None: for item_index, label, item_data in self._normalize_mapping(items): parent_label = parent.get_label() scoped_label = ( label if getattr(parent, "parent", None) is None else f"{parent_label}.{label}" ) item_authored_path = _authored_item_path( authored_path_prefix, item_index, label, ) payload = self._build_payload( kind=self._resolve_kind( item_data.get("kind"), fallback=fallback_kind, ), payload={**item_data, "label": item_data.get("label") or label}, default_label=label, ) collector.add_declaration( template_label=scoped_label, payload=payload, authored_path=item_authored_path, ) templ = TemplateGroup( label=scoped_label, payload=payload, registry=parent.registry, ) parent.add_child(templ) self._compile_section( parent=templ, items=item_data.get("templates"), fallback_kind=fallback_kind, collector=collector, authored_path_prefix=f"{item_authored_path}.templates", ) @staticmethod def _normalize_mapping(value: Any) -> list[tuple[int, str, dict[str, Any]]]: if not value: return [] if isinstance(value, dict): items: list[tuple[int, str, dict[str, Any]]] = [] for index, (label, data) in enumerate(value.items()): if isinstance(data, dict): payload = dict(data) else: payload = dict(getattr(data, "model_dump", lambda **_: {})()) payload.setdefault("label", label) items.append((index, str(label), payload)) return items items = [] anon_counter = 0 for index, item in enumerate(value): if isinstance(item, dict): payload = dict(item) else: payload = dict(getattr(item, "model_dump", lambda **_: {})()) label = payload.get("label") if not label: label = f"_anon_{anon_counter}" anon_counter += 1 payload["label"] = label payload["_is_anonymous"] = True items.append((index, str(label), payload)) return items @staticmethod def _normalize_list(value: Any) -> list[dict[str, Any]]: if not value: return [] if isinstance(value, dict): out: list[dict[str, Any]] = [] for label, data in value.items(): if isinstance(data, dict): payload = dict(data) else: payload = dict(getattr(data, "model_dump", lambda **_: {})()) payload.setdefault("label", label) out.append(payload) return out out = [] for item in value: if isinstance(item, dict): out.append(dict(item)) else: out.append(dict(getattr(item, "model_dump", lambda **_: {})())) return out @staticmethod def _canonicalize_action_specs( specs: list[dict[str, Any]], *, scene_label: str, root_scene_labels: set[str], ) -> list[dict[str, Any]]: """Return canonical action specs for one scene. Part A policy: when a bare successor token collides with a root scene label, it is treated as an absolute scene destination by design. """ normalized: list[dict[str, Any]] = [] for spec in specs: payload = dict(spec) authored = payload.get("authored_successor_ref") if not (isinstance(authored, str) and authored): authored = payload.get("successor_ref") if authored is None: authored = ( payload.get("successor") or payload.get("next") or payload.get("target_ref") or payload.get("target_node") ) if isinstance(authored, str) and authored: payload["authored_successor_ref"] = authored canonical = payload.get("successor_ref") if not (isinstance(canonical, str) and canonical): canonical = authored if isinstance(canonical, str) and canonical: if "." in canonical: payload["successor_ref"] = canonical payload["successor_is_absolute"] = False elif canonical in root_scene_labels: payload["successor_ref"] = canonical payload["successor_is_absolute"] = True else: payload["successor_ref"] = f"{scene_label}.{canonical}" payload["successor_is_absolute"] = False normalized.append(payload) return normalized @staticmethod def _collect_successor_issues( *, collector: _CompileCollector, specs: list[dict[str, Any]], source_label: str, authored_path_prefix: str, field_name: str, ) -> None: for index, spec in enumerate(specs): canonical_ref = _coerce_text(spec.get("successor_ref")) if not canonical_ref: continue authored_ref = _coerce_text(spec.get("authored_successor_ref")) or canonical_ref subject_label = StoryCompiler._diagnostic_subject_label( source_label=source_label, spec=spec, field_name=field_name, index=index, ) collector.add_pending( code=ISSUE_DANGLING_SUCCESSOR_REF, subject_label=subject_label, authored_path=f"{authored_path_prefix}[{index}]", related_identifiers=[canonical_ref], details={ "field": field_name, "authored_ref": authored_ref, "canonical_ref": canonical_ref, }, ) @staticmethod def _collect_provider_ref_issues( *, collector: _CompileCollector, specs: list[dict[str, Any]], source_label: str, authored_path_prefix: str, field_name: str, issue_code: str, reference_keys: tuple[str, str], ) -> None: for index, spec in enumerate(specs): reference_key, identifier = StoryCompiler._first_reference(spec, *reference_keys) if not identifier: continue subject_label = StoryCompiler._diagnostic_subject_label( source_label=source_label, spec=spec, field_name=field_name, index=index, ) collector.add_pending( code=issue_code, subject_label=subject_label, authored_path=f"{authored_path_prefix}[{index}]", related_identifiers=[identifier], details={ "reference_key": reference_key, "missing_ref": identifier, }, ) @staticmethod def _first_reference(spec: dict[str, Any], *keys: str) -> tuple[str, str | None]: for key in keys: value = _coerce_text(spec.get(key)) if value: return key, value return keys[0], None @staticmethod def _diagnostic_subject_label( *, source_label: str, spec: dict[str, Any], field_name: str, index: int, ) -> str: explicit_label = _coerce_text(spec.get("label")) if explicit_label: return f"{source_label}.{explicit_label}" return f"{source_label}.{field_name}[{index}]" @staticmethod def _resolve_entry_template_ids( *, metadata: dict[str, Any], registry: TemplateRegistry, ) -> tuple[list[str], str]: """Resolve compile-time entry template ids using authored priority rules.""" start_at = metadata.get("start_at") if isinstance(start_at, str) and start_at: return [start_at], "metadata.start_at" if isinstance(start_at, list): values = [str(v) for v in start_at if str(v)] if values: return values, "metadata.start_at" block_templates = [ template for template in registry.values() if hasattr(template, "has_payload_kind") and template.has_payload_kind(Block) ] for tag_name in ("start", "entry"): for template in block_templates: if template.has_tags({tag_name}): return [template.get_label()], f"tag:{tag_name}" for template in block_templates: payload = getattr(template, "payload", None) if payload is None: continue block_locals = getattr(payload, "locals", None) or {} if isinstance(block_locals, dict) and ( block_locals.get("is_start") or block_locals.get("start_at") ): return [template.get_label()], "locals:start" for template in block_templates: label = template.get_label() short_label = label.rsplit(".", 1)[-1] if "." in label else label if short_label.lower() == "start": return [label], "label:start" first_block = registry.find_one(Selector(has_payload_kind=Block)) if first_block is not None: return [first_block.get_label()], "first_block" return [], "none" @staticmethod def _next_block_label( blocks: list[tuple[int, str, dict[str, Any]]], current_index: int, scene_label: str, ) -> str | None: next_index = current_index + 1 if next_index >= len(blocks): return None return f"{scene_label}.{blocks[next_index][1]}" def _resolve_kind(self, raw_kind: Any, *, fallback: type[Entity]) -> type[Entity]: if isinstance(raw_kind, type): mapped = self._map_external_kind(raw_kind.__name__, fallback=fallback) if mapped is not fallback or raw_kind is fallback: return mapped if issubclass(raw_kind, Entity): return raw_kind return fallback if isinstance(raw_kind, str): mapped = self._map_external_kind(raw_kind.split(".")[-1], fallback=fallback) if mapped is not fallback: return mapped try: module_name, class_name = raw_kind.rsplit(".", 1) cls = getattr(import_module(module_name), class_name) if isinstance(cls, type): mapped = self._map_external_kind(cls.__name__, fallback=fallback) if mapped is not fallback: return mapped if issubclass(cls, Entity): return cls except Exception: return fallback return fallback @staticmethod def _map_external_kind(kind_name: str, *, fallback: type[Entity]) -> type[Entity]: mapping: dict[str, type[Entity]] = { "Actor": Actor, "Location": Location, "Role": Actor, "Setting": Location, "Scene": Scene, "Block": Block, "MenuBlock": MenuBlock, "Action": Action, "Node": TraversableNode, "TraversableNode": TraversableNode, } return mapping.get(kind_name, fallback) @staticmethod def _build_payload(kind: type[Entity], payload: dict[str, Any], default_label: str) -> Entity: payload = dict(payload) if isinstance(payload.get("effects"), list): normalized_effects: list[dict[str, Any]] = [] for effect in payload["effects"]: if isinstance(effect, str): normalized_effects.append({"expr": effect}) elif isinstance(effect, dict): normalized_effects.append(dict(effect)) payload["effects"] = normalized_effects if kind is Action: if payload.get("successor_ref") is None: mapped_ref = ( payload.get("successor") or payload.get("next") or payload.get("target_ref") or payload.get("target_node") ) if mapped_ref is not None: payload["successor_ref"] = mapped_ref if not payload.get("text") and payload.get("content"): payload["text"] = payload.get("content") if issubclass(kind, Block) and payload.get("_is_anonymous"): payload["is_anonymous"] = True allowed = set(getattr(kind, "model_fields", {}).keys()) filtered = {k: v for k, v in payload.items() if k in allowed} filtered.setdefault("label", payload.get("label") or default_label) try: return kind(**filtered) except Exception: fallback = TraversableNode(label=filtered.get("label", default_label)) if "locals" in payload and isinstance(payload["locals"], dict): fallback.locals.update(payload["locals"]) return fallback