from __future__ import annotations
from collections.abc import Iterable
from typing import Any
from uuid import UUID
from pydantic import Field, model_validator
from tangl.core import EntityTemplate, Selector, Singleton, TemplateRegistry, TokenCatalog
from tangl.media import get_system_resource_manager
from tangl.media.media_resource import MediaInventory
from tangl.media.story_media import get_story_resource_manager
from tangl.vm import TraversableGraphFactory, TraversableNode
from tangl.vm.ctx import VmPhaseCtx
from ..story_graph import StoryGraph
from .compiler import StoryCompiler
from .materializer import StoryMaterializer
from .types import InitMode, StoryInitResult
def _copy_bundle_value(value: Any) -> Any:
if isinstance(value, dict):
return dict(value)
if isinstance(value, list):
return list(value)
return value
def _template_depth(templ: Any) -> tuple[int, int, str]:
depth = 0
current = getattr(templ, "parent", None)
while current is not None:
depth += 1
current = getattr(current, "parent", None)
seq = getattr(templ, "seq", 0)
label = templ.get_label() if hasattr(templ, "get_label") else ""
return depth, seq, label
def _registry_from_values(values: Iterable[Any]) -> TemplateRegistry | None:
found: TemplateRegistry | None = None
for item in values:
registry = getattr(item, "registry", None)
if not isinstance(registry, TemplateRegistry):
continue
if found is None:
found = registry
continue
if found is not registry:
return None
return found
def _coerce_template_registry_item(value: Any) -> TemplateRegistry | None:
if isinstance(value, TemplateRegistry):
return value
nested = getattr(value, "template_registry", None)
if isinstance(nested, TemplateRegistry):
return nested
if isinstance(value, (str, bytes, dict)) or not isinstance(value, Iterable):
return None
return _registry_from_values(value)
def _coerce_template_registries(value: Any) -> list[TemplateRegistry]:
if value is None:
return []
get_scope_groups = getattr(value, "get_template_scope_groups", None)
raw = get_scope_groups(caller=None, graph=None) if callable(get_scope_groups) else value
if raw is None:
return []
if isinstance(raw, TemplateRegistry):
values = [raw]
elif isinstance(raw, (str, bytes, dict)) or not isinstance(raw, Iterable):
values = [raw]
else:
values = list(raw)
registries: list[TemplateRegistry] = []
seen_ids: set[int] = set()
for item in values:
registry = _coerce_template_registry_item(item)
if registry is None:
continue
registry_id = id(registry)
if registry_id in seen_ids:
continue
seen_ids.add(registry_id)
registries.append(registry)
return registries
def _coerce_token_catalogs(
provider: Any,
*,
caller: Any,
requirement: Any = None,
graph: Any = None,
) -> list[TokenCatalog]:
if provider is None:
return []
get_catalogs = getattr(provider, "get_token_catalogs", None)
if callable(get_catalogs):
raw = get_catalogs(caller=caller, requirement=requirement, graph=graph)
else:
get_tokenizable = getattr(provider, "get_tokenizable", None)
raw = get_tokenizable() if callable(get_tokenizable) else None
if raw is None or isinstance(raw, (str, bytes, dict)):
return []
values = list(raw) if isinstance(raw, Iterable) else [raw]
catalogs: list[TokenCatalog] = []
seen_ids: set[int] = set()
for item in values:
catalog: TokenCatalog | None
if isinstance(item, TokenCatalog):
catalog = item
elif isinstance(item, type) and issubclass(item, Singleton):
catalog = TokenCatalog(wst=item)
else:
catalog = None
if catalog is None:
continue
catalog_id = id(catalog)
if catalog_id in seen_ids:
continue
seen_ids.add(catalog_id)
catalogs.append(catalog)
return catalogs
def _extend_media_inventories(
inventories: list[MediaInventory],
*,
provider: Any,
caller: Any,
requirement: Any = None,
graph: Any = None,
scope: str | None = None,
seen_registry_ids: set[int],
) -> None:
if provider is None:
return
get_inventories = getattr(provider, "get_media_inventories", None)
raw = get_inventories(caller=caller, requirement=requirement, graph=graph) if callable(get_inventories) else provider
if raw is None:
return
if isinstance(raw, (str, bytes, dict)) or not isinstance(raw, Iterable):
values = [raw]
else:
values = list(raw)
for value in values:
inventory = MediaInventory.from_provider(value, scope=scope)
if inventory is None:
continue
registry_id = id(inventory.registry)
if registry_id in seen_registry_ids:
continue
seen_registry_ids.add(registry_id)
inventories.append(inventory)
class _TemplateSubset:
"""Read-only filtered template view for VM lazy seed materialization."""
def __init__(self, registry: TemplateRegistry, selected_ids: set[UUID]) -> None:
self.registry = registry
self._selected_ids = selected_ids
self.label = f"{registry.label}.seed"
def _values(self) -> list[Any]:
return [
value
for value in self.registry.values()
if getattr(value, "uid", None) in self._selected_ids
]
def values(self) -> list[Any]:
return self._values()
def find_all(
self,
selector: Selector | None = None,
*,
sort_key=None,
) -> list[Any]:
values = self._values()
if selector is not None:
values = list(selector.filter(values))
if sort_key is not None:
values = sorted(values, key=sort_key)
return values
def find_one(
self,
selector: Selector | None = None,
*,
sort_key=None,
) -> Any | None:
matches = self.find_all(selector, sort_key=sort_key)
return matches[0] if matches else None
[docs]
class World(TraversableGraphFactory):
"""World(label: str)
Unitary story authority over templates, runtime graph creation, and world
adjunct providers.
Why
----
``World`` is the canonical story-layer authority object. It owns the story
template bundle, runtime behavior authorities, and compatible accessors for
loader/service surfaces while delegating generic graph materialization to
:class:`~tangl.vm.TraversableGraphFactory`.
Key Features
------------
* Acts as the singleton shape and behavior authority for
:class:`~tangl.story.StoryGraph`.
* Preserves bundle-derived metadata, locals, entries, and compile
provenance directly on the world.
* Keeps compatibility alias views for one phase of the world cutover.
API
---
- :meth:`create_story` is the public story initialization entry point.
- :meth:`get_authorities` exposes world-owned dispatch registries.
- :meth:`get_story_info_projector` returns the world-owned projector when
present.
- :meth:`find_template` and :meth:`find_templates` resolve directly against
the world's template registry.
"""
graph_type: type[StoryGraph] = StoryGraph
bundle: Any | None = Field(default=None, exclude=True)
metadata: dict[str, Any] = Field(default_factory=dict)
locals: dict[str, Any] = Field(default_factory=dict)
entry_template_ids: list[str] = Field(default_factory=list)
source_map: dict[str, Any] = Field(default_factory=dict)
codec_state: dict[str, Any] = Field(default_factory=dict)
codec_id: str | None = None
issues: list[Any] = Field(default_factory=list)
extra_template_registries: list[TemplateRegistry] = Field(default_factory=list, exclude=True)
assets: Any | None = None
resources: Any | None = None
class_registry: dict[str, Any] = Field(default_factory=dict)
modules: list[Any] = Field(default_factory=list)
extra_authorities: list[Any] = Field(default_factory=list)
story_info_projector: Any | None = None
@model_validator(mode="before")
@classmethod
def _coerce_bundle_authority(cls, data: Any) -> Any:
if not isinstance(data, dict):
return data
payload = dict(data)
extra_template_registries = list(payload.get("extra_template_registries") or [])
bundle = payload.get("bundle")
bundle_registry = getattr(bundle, "template_registry", None)
if isinstance(bundle_registry, TemplateRegistry):
templates = payload.get("templates")
if templates is None:
payload["templates"] = bundle_registry
elif not isinstance(templates, TemplateRegistry):
raise TypeError("World.templates must be a TemplateRegistry")
bundle_defaults = {
"metadata": getattr(bundle, "metadata", None),
"locals": getattr(bundle, "locals", None),
"entry_template_ids": getattr(bundle, "entry_template_ids", None),
"source_map": getattr(bundle, "source_map", None),
"codec_state": getattr(bundle, "codec_state", None),
"codec_id": getattr(bundle, "codec_id", None),
"issues": getattr(bundle, "issues", None),
}
for field_name, value in bundle_defaults.items():
if field_name in payload:
continue
if value is None:
continue
payload[field_name] = _copy_bundle_value(value)
if extra_template_registries:
deduped: list[TemplateRegistry] = []
seen_ids: set[int] = set()
for registry in extra_template_registries:
registry_id = id(registry)
if registry_id in seen_ids:
continue
seen_ids.add(registry_id)
deduped.append(registry)
payload["extra_template_registries"] = deduped
return payload
def get_authorities(self) -> list[object]:
"""Return world-owned behavior authorities with stable declaration order."""
authorities: list[object] = []
for authority in [self.dispatch, *self.extra_authorities]:
if authority is None or authority in authorities:
continue
authorities.append(authority)
return authorities
def get_story_info_projector(self) -> Any | None:
"""Return the world-owned story-info projector when present."""
return self.story_info_projector
def get_entry_cursor(self, graph: StoryGraph) -> Any | None:
"""Return the default story entry, falling back to generic VM rules."""
entry_templates = self._resolve_story_entry_templates()
if entry_templates:
entry_nodes = self._entry_nodes_for_templates(
graph=graph,
entry_templates=entry_templates,
)
if entry_nodes:
return entry_nodes[0]
return super().get_entry_cursor(graph)
def get_template_scope_groups(
self,
*,
caller: Any = None,
graph: Any = None,
) -> list[TemplateRegistry]:
"""Return authoritative template registries for runtime scope discovery."""
_ = caller, graph
registries = [self.templates, *self.extra_template_registries]
deduped: list[TemplateRegistry] = []
seen_ids: set[int] = set()
for registry in registries:
registry_id = id(registry)
if registry_id in seen_ids:
continue
seen_ids.add(registry_id)
deduped.append(registry)
return deduped
def get_token_catalogs(
self,
*,
caller: Any = None,
requirement: Any = None,
graph: Any = None,
) -> list[TokenCatalog]:
catalogs = _coerce_token_catalogs(
self.assets,
caller=caller,
requirement=requirement,
graph=graph,
)
if catalogs:
return catalogs
return list(self._provide_token_catalogs())
def get_media_inventories(
self,
*,
caller: Any = None,
requirement: Any = None,
graph: Any = None,
) -> list[MediaInventory]:
"""Return optional world-authoritative media inventories."""
story_manager = None
if graph is not None:
story_manager = getattr(graph, "story_resources", None)
if story_manager is None and getattr(graph, "story_id", None) is not None:
story_manager = get_story_resource_manager(graph.story_id, create=False)
if story_manager is not None:
graph.story_resources = story_manager
inventories: list[MediaInventory] = []
seen_registry_ids: set[int] = set()
_extend_media_inventories(
inventories,
provider=graph,
caller=caller,
requirement=requirement,
graph=graph,
scope="story",
seen_registry_ids=seen_registry_ids,
)
_extend_media_inventories(
inventories,
provider=story_manager,
caller=caller,
requirement=requirement,
graph=graph,
scope="story",
seen_registry_ids=seen_registry_ids,
)
_extend_media_inventories(
inventories,
provider=self.resources,
caller=caller,
requirement=requirement,
graph=graph,
scope="world",
seen_registry_ids=seen_registry_ids,
)
_extend_media_inventories(
inventories,
provider=self.assets,
caller=caller,
requirement=requirement,
graph=graph,
scope="world",
seen_registry_ids=seen_registry_ids,
)
_extend_media_inventories(
inventories,
provider=get_system_resource_manager(),
caller=caller,
requirement=requirement,
graph=graph,
scope="sys",
seen_registry_ids=seen_registry_ids,
)
return inventories
def find_template(self, reference: str) -> Any | None:
"""Find one template by selector, uid, identifier, or label."""
if reference is None:
return None
if isinstance(reference, UUID):
return self.templates.get(reference)
key = str(reference)
found = self.templates.find_one(Selector.from_identifier(key))
if found is not None:
return found
return self.templates.find_one(Selector(label=key))
def find_templates(self, selector: Selector | None = None) -> list[Any]:
"""Find all templates matching ``selector``."""
if selector is None:
return list(self.templates.values())
return list(selector.filter(self.templates.values()))
def story_materialize_template(
self,
template,
_ctx: VmPhaseCtx | None = None,
):
"""Compatibility hook delegating story materialization to the helper."""
return StoryMaterializer().story_materialize_template(template, _ctx=_ctx)
def story_post_materialize(
self,
*,
template,
entity: Any,
role,
_ctx: VmPhaseCtx | None = None,
) -> None:
"""Compatibility hook delegating post-materialization policy to the helper."""
StoryMaterializer().story_post_materialize(
template=template,
entity=entity,
role=role,
_ctx=_ctx,
)
def preview_requirement_contract(
self,
*,
requirement,
offer,
graph,
_ctx: VmPhaseCtx | None = None,
):
"""Compatibility hook delegating preview checks to the helper."""
return StoryMaterializer().preview_requirement_contract(
requirement=requirement,
offer=offer,
graph=graph,
_ctx=_ctx,
)
@staticmethod
def _story_entry_selector(default_entry_ref: str) -> Selector:
return Selector.chain_or(
Selector(has_identifier=default_entry_ref),
Selector(has_tags={default_entry_ref}),
)
def _resolve_story_entry_templates(self) -> list[EntityTemplate]:
materializer = StoryMaterializer()
return [
templ
for templ in materializer._resolve_entry_templates(
template_registry=self.templates,
entry_template_ids=self.entry_template_ids,
)
if isinstance(templ, EntityTemplate)
]
def _resolve_seed_entry_templates(self) -> list[EntityTemplate]:
explicit = self._resolve_story_entry_templates()
if explicit:
return explicit
return [
templ
for templ in self.templates.find_all(
self._story_entry_selector(self.default_entry_ref),
sort_key=_template_depth,
)
if isinstance(templ, EntityTemplate)
]
def _seed_template_groups(
self,
entry_templates: list[EntityTemplate],
) -> list[_TemplateSubset]:
selected_ids: set[UUID] = set()
for template in entry_templates:
current: Any | None = template
while current is not None:
uid = getattr(current, "uid", None)
if isinstance(uid, UUID):
selected_ids.add(uid)
current = getattr(current, "parent", None)
return [_TemplateSubset(self.templates, selected_ids)]
def _build_story_graph(
self,
*,
story_label: str,
init_mode: InitMode,
freeze_shape: bool,
) -> StoryGraph:
graph = StoryGraph(
label=story_label,
frozen_shape=(init_mode is InitMode.EAGER and freeze_shape),
locals=dict(self.locals),
factory=self,
)
graph.story_id = graph.uid
graph.story_resources = get_story_resource_manager(graph.story_id, create=False)
return graph
@staticmethod
def _entry_nodes_for_templates(
*,
graph: StoryGraph,
entry_templates: list[EntityTemplate],
) -> list[TraversableNode]:
nodes: list[TraversableNode] = []
seen_ids: set[UUID] = set()
for template in entry_templates:
template_hash = template.content_hash()
node = graph.find_one(
Selector(has_kind=TraversableNode, templ_hash=template_hash),
)
if isinstance(node, TraversableNode) and node.uid not in seen_ids:
seen_ids.add(node.uid)
nodes.append(node)
return nodes
@staticmethod
def _apply_story_entry_ids(
*,
graph: StoryGraph,
entry_templates: list[EntityTemplate],
) -> None:
entry_nodes = World._entry_nodes_for_templates(
graph=graph,
entry_templates=entry_templates,
)
graph.initial_cursor_ids = [node.uid for node in entry_nodes]
if graph.initial_cursor_ids:
graph.initial_cursor_id = graph.initial_cursor_ids[0]
def create_story(
self,
story_label: str,
*,
init_mode: InitMode = InitMode.EAGER,
freeze_shape: bool = False,
namespace: dict[str, Any] | None = None,
) -> StoryInitResult:
if freeze_shape and init_mode is not InitMode.EAGER:
raise ValueError("freeze_shape requires InitMode.EAGER")
materializer = StoryMaterializer()
explicit_entry_templates = self._resolve_story_entry_templates()
seed_entry_templates = self._resolve_seed_entry_templates()
if init_mode is InitMode.LAZY and not seed_entry_templates:
raise ValueError("No entry templates resolved for story initialization")
graph = self._build_story_graph(
story_label=story_label,
init_mode=init_mode,
freeze_shape=freeze_shape,
)
if init_mode is InitMode.EAGER:
graph = super().materialize_graph(graph=graph)
else:
graph = super().materialize_seed_graph(
graph=graph,
template_groups=self._seed_template_groups(seed_entry_templates),
)
graph.rebuild_template_lineage(self.templates)
state = materializer.make_state(
graph=graph,
mode=init_mode,
template_registry=self.templates,
entry_template_ids=list(self.entry_template_ids),
source_map=dict(self.source_map),
codec_state=dict(self.codec_state),
codec_id=self.codec_id,
bundle_id=getattr(self.templates, "label", None),
)
materializer._run_topology_passes(state=state)
if init_mode is InitMode.EAGER:
materializer._run_prelink_passes(state=state)
materializer._recount_materialized(state=state)
self._apply_story_entry_ids(
graph=graph,
entry_templates=explicit_entry_templates or seed_entry_templates,
)
result = materializer._build_story_init_result(state=state)
if namespace is not None:
override_uid = self._resolve_entry_override(result.graph, namespace)
if override_uid is not None:
result.graph.initial_cursor_id = override_uid
result.graph.initial_cursor_ids[:] = [override_uid]
return result
def _resolve_entry_override(
self,
graph: Any,
namespace: dict[str, Any],
) -> Any | None:
"""Return an optional init-time entry override for this world."""
_ = graph, namespace
return None
@classmethod
def from_script_data(
cls,
*,
script_data: dict[str, Any],
label: str | None = None,
compiler: StoryCompiler | None = None,
domain: Any | None = None,
templates: Any | None = None,
assets: Any | None = None,
resources: Any | None = None,
story_info_projector: Any | None = None,
) -> "World":
from .builder import WorldBuilder
compiler = compiler or StoryCompiler()
bundle = compiler.compile(script_data)
return WorldBuilder().build(
label=label or script_data.get("label") or "story_world",
bundle=bundle,
world_type=cls,
assets=assets,
resources=resources,
extra_template_registries=_coerce_template_registries(templates),
domain=domain,
story_info_projector=story_info_projector,
)