"""Canonical explicit service-manager API for StoryTangl."""
from __future__ import annotations
from collections import OrderedDict
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterator, TYPE_CHECKING
from uuid import UUID
import yaml
from tangl.core import BaseFragment
from tangl.persistence import PersistenceManager
from tangl.story import InitMode, World
from tangl.type_hints import Identifier, UnstructuredData
from tangl.utils.hash_secret import key_for_secret
from tangl.vm.runtime.ledger import Ledger
from .exceptions import AuthMismatchError
from ._user_support import parse_bool_flag, parse_datetime_field
from .media import resolve_world_media
from .response import (
ProjectedState,
RuntimeEnvelope,
RuntimeInfo,
SystemInfo,
UserInfo,
UserSecret,
WorldInfo,
)
from .service_method import (
BlockingMode,
ServiceAccess,
ServiceContext,
ServiceMethodSpec,
ServiceWriteback,
get_service_method_spec,
service_method,
)
from .system_info import get_system_info, reset_system
from .story_info import resolve_story_info_projector
from .user import User
from .world_registry import (
WorldRegistry,
iter_manual_worlds,
legacy_world_label,
pop_manual_world,
register_manual_world,
resolve_world,
)
if TYPE_CHECKING: # pragma: no cover
from tangl.media import MediaDataType
from tangl.media.media_resource import MediaResourceInventoryTag as MediaRIT
from .auth import UserAuthInfo
[docs]
@dataclass
class ServiceSession:
"""Live user/ledger/frame bundle opened by :class:`ServiceManager`."""
user: User | None
ledger: Ledger
frame: Any
[docs]
class ServiceManager:
"""Explicit public service API over persistence-backed story resources."""
def __init__(self, persistence_manager: PersistenceManager | None = None) -> None:
self.persistence = persistence_manager
[docs]
@classmethod
def get_service_methods(cls) -> "OrderedDict[str, ServiceMethodSpec]":
"""Return canonical public service-method metadata in declaration order."""
methods: "OrderedDict[str, ServiceMethodSpec]" = OrderedDict()
for base in reversed(cls.__mro__):
for name, value in base.__dict__.items():
spec = get_service_method_spec(value)
if spec is None:
continue
methods[name] = spec.with_name(name)
return methods
def _validate_user_auth(
self,
*,
user_id: UUID | None = None,
user_auth: "UserAuthInfo | None" = None,
) -> None:
if user_auth is None or user_id is None:
return
if user_auth.user_id != user_id:
raise AuthMismatchError(
f"user_id {user_id} does not match authenticated user {user_auth.user_id}",
)
def _require_persistence(self) -> PersistenceManager:
if self.persistence is None:
raise RuntimeError("Persistence manager required for resource access")
return self.persistence
def _load_user(self, user_id: UUID) -> User:
persistence = self._require_persistence()
if user_id not in persistence:
raise ValueError(f"User {user_id} not found")
user = persistence.load(user_id)
if not isinstance(user, User):
raise TypeError(f"Expected User for {user_id}, got {type(user).__name__}")
return user
def _load_ledger(self, ledger_id: UUID) -> Ledger:
persistence = self._require_persistence()
if ledger_id not in persistence:
raise ValueError(f"Ledger {ledger_id} not found")
ledger = persistence.load(ledger_id)
if not isinstance(ledger, Ledger):
raise TypeError(f"Expected Ledger for {ledger_id}, got {type(ledger).__name__}")
return ledger
def _save(self, payload: Any) -> None:
persistence = self._require_persistence()
persistence.save(payload)
def _delete(self, uid: UUID) -> bool:
if self.persistence is None:
return False
try:
self.persistence.remove(uid)
return True
except KeyError:
return False
[docs]
@contextmanager
def open_user(
self,
user_id: UUID,
*,
write_back: bool = False,
) -> Iterator[User]:
"""Open one persisted user resource."""
persistence = self._require_persistence()
if user_id not in persistence:
raise ValueError(f"User {user_id} not found")
with persistence.open(user_id, write_back=write_back) as user:
if not isinstance(user, User):
raise TypeError(f"Expected User for {user_id}, got {type(user).__name__}")
yield user
[docs]
@contextmanager
def open_ledger(
self,
ledger_id: UUID,
*,
write_back: bool = False,
) -> Iterator[Ledger]:
"""Open one persisted ledger resource."""
persistence = self._require_persistence()
if ledger_id not in persistence:
raise ValueError(f"Ledger {ledger_id} not found")
with persistence.open(ledger_id, write_back=write_back) as ledger:
if not isinstance(ledger, Ledger):
raise TypeError(f"Expected Ledger for {ledger_id}, got {type(ledger).__name__}")
yield ledger
[docs]
def open_world(self, world_id: str, /) -> World:
"""Resolve one world by id."""
world = resolve_world(world_id)
if not isinstance(world, World):
raise TypeError(f"Expected Story world for '{world_id}', got {type(world).__name__}")
return world
[docs]
@contextmanager
def open_session(
self,
*,
user_id: UUID | None = None,
ledger_id: UUID | None = None,
write_back: bool = False,
user_auth: "UserAuthInfo | None" = None,
) -> Iterator[ServiceSession]:
"""Open a linked user/ledger session context."""
self._validate_user_auth(user_id=user_id, user_auth=user_auth)
if user_id is not None:
with self.open_user(user_id, write_back=write_back) as user:
effective_ledger_id = ledger_id or user.current_ledger_id
if effective_ledger_id is None:
raise ValueError("User has no active ledger")
with self.open_ledger(effective_ledger_id, write_back=write_back) as ledger:
ledger.user = user
ledger.user_id = user.uid
yield ServiceSession(user=user, ledger=ledger, frame=ledger.get_frame())
return
if ledger_id is None:
raise ValueError("user_id or ledger_id is required to open a session")
with self.open_ledger(ledger_id, write_back=write_back) as ledger:
effective_user_id = ledger.user_id
if effective_user_id is None:
yield ServiceSession(user=None, ledger=ledger, frame=ledger.get_frame())
return
if user_auth is not None and user_auth.user_id != effective_user_id:
raise AuthMismatchError(
f"user_id {effective_user_id} does not match authenticated user {user_auth.user_id}",
)
with self.open_user(effective_user_id, write_back=write_back) as user:
ledger.user = user
ledger.user_id = user.uid
yield ServiceSession(user=user, ledger=ledger, frame=ledger.get_frame())
@staticmethod
def _resolve_world_id(ledger: Ledger) -> str | None:
world = getattr(getattr(ledger, "graph", None), "world", None)
if world is None:
return None
for attr in ("label", "uid"):
value = getattr(world, attr, None)
if value is not None:
return str(value)
return None
@staticmethod
def _build_runtime_envelope(
ledger: Ledger,
*,
fragments: list[BaseFragment],
metadata: dict[str, Any] | None = None,
) -> RuntimeEnvelope:
merged_metadata = dict(metadata or {})
world_id = ServiceManager._resolve_world_id(ledger)
if world_id is not None:
merged_metadata.setdefault("world_id", world_id)
merged_metadata.setdefault("ledger_id", str(ledger.uid))
return RuntimeEnvelope(
cursor_id=ledger.cursor_id,
step=ledger.step,
fragments=list(fragments),
last_redirect=ledger.last_redirect,
redirect_trace=list(ledger.redirect_trace),
metadata=merged_metadata,
)
@staticmethod
def _prime_initial_update(ledger: Ledger) -> None:
"""Seed entry JOURNAL output for a freshly created ledger."""
if ledger.get_journal():
return
frame = ledger.get_frame()
frame.goto_node(ledger.cursor)
prev_id = ledger.cursor_history[-1] if ledger.cursor_history else None
for node_id in frame.cursor_trace:
if prev_id is not None and node_id == prev_id:
ledger.reentrant_steps += 1
prev_id = node_id
ledger.cursor_steps += frame.cursor_steps
ledger.cursor_id = frame.cursor.uid
ledger.cursor_history.extend(frame.cursor_trace)
ledger.call_stack_ids = [edge.uid for edge in frame.return_stack]
ledger.last_redirect = frame.last_redirect
ledger.redirect_trace = list(frame.redirect_trace)
ledger.save_snapshot(cadence=ledger.checkpoint_cadence)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.USER,
writeback=ServiceWriteback.SESSION,
operation_id="story.create",
)
def create_story(
self,
*,
user_id: UUID,
world_id: str,
user_auth: "UserAuthInfo | None" = None,
**kwargs: Any,
) -> RuntimeEnvelope:
"""Create a story session and return the initial runtime envelope."""
self._validate_user_auth(user_id=user_id, user_auth=user_auth)
import tangl.story # noqa: F401 # ensure story-level hooks are registered
with self.open_user(user_id, write_back=True) as user:
world = kwargs.pop("world", None) or self.open_world(world_id)
story_label = kwargs.get("story_label") or f"story_{user.uid}"
mode_raw = kwargs.get("init_mode") or kwargs.get("mode") or InitMode.EAGER.value
mode = InitMode(mode_raw.lower()) if isinstance(mode_raw, str) else InitMode(mode_raw)
freeze_shape = bool(kwargs.get("freeze_shape", False))
worker_dispatcher = kwargs.get("worker_dispatcher")
namespace = dict(kwargs.pop("namespace", None) or {})
namespace.setdefault("user", user)
init_result = world.create_story(
story_label,
init_mode=mode,
freeze_shape=freeze_shape,
namespace=namespace,
)
story_graph = init_result.graph
if story_graph.initial_cursor_id is None:
raise RuntimeError("Story graph did not define an initial cursor")
ledger = Ledger.from_graph(
graph=story_graph,
entry_id=story_graph.initial_cursor_id,
uid=story_graph.story_id or story_graph.uid,
)
ledger.user = user
ledger.user_id = user.uid
ledger.worker_dispatcher = worker_dispatcher
self._prime_initial_update(ledger)
user.current_ledger_id = ledger.uid
self._save(ledger)
return self._build_runtime_envelope(
ledger,
fragments=list(ledger.get_journal()),
metadata={"world_id": world_id},
)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.SESSION,
writeback=ServiceWriteback.SESSION,
operation_id="story.do",
)
def resolve_choice(
self,
*,
choice_id: UUID,
user_id: UUID | None = None,
ledger_id: UUID | None = None,
user_auth: "UserAuthInfo | None" = None,
choice_payload: Any = None,
) -> RuntimeEnvelope:
"""Resolve one choice edge and return the newest runtime envelope."""
with self.open_session(
user_id=user_id,
ledger_id=ledger_id,
write_back=True,
user_auth=user_auth,
) as session:
before_step = session.ledger.step
session.ledger.resolve_choice(choice_id, choice_payload=choice_payload)
fragments = list(session.ledger.get_journal(since_step=max(before_step + 1, 0)))
return self._build_runtime_envelope(session.ledger, fragments=fragments)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.SESSION,
writeback=ServiceWriteback.NONE,
operation_id="story.update",
)
def get_story_update(
self,
*,
user_id: UUID | None = None,
ledger_id: UUID | None = None,
user_auth: "UserAuthInfo | None" = None,
since_step: int | None = None,
limit: int = 0,
) -> RuntimeEnvelope:
"""Return ordered runtime fragments for the active story session."""
with self.open_session(
user_id=user_id,
ledger_id=ledger_id,
write_back=False,
user_auth=user_auth,
) as session:
effective_since = 0 if since_step is None else since_step
fragments = list(session.ledger.get_journal(since_step=effective_since, limit=limit))
return self._build_runtime_envelope(session.ledger, fragments=fragments)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.SESSION,
writeback=ServiceWriteback.NONE,
operation_id="story.info",
)
def get_story_info(
self,
*,
user_id: UUID | None = None,
ledger_id: UUID | None = None,
user_auth: "UserAuthInfo | None" = None,
) -> ProjectedState:
"""Return projected current-state sections for the active story."""
with self.open_session(
user_id=user_id,
ledger_id=ledger_id,
write_back=False,
user_auth=user_auth,
) as session:
projector = resolve_story_info_projector(session.ledger)
return projector.project(ledger=session.ledger)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.SESSION,
writeback=ServiceWriteback.EXPLICIT,
operation_id="story.drop",
)
def drop_story(
self,
*,
user_id: UUID | None = None,
ledger_id: UUID | None = None,
user_auth: "UserAuthInfo | None" = None,
archive: bool = False,
) -> RuntimeInfo:
"""Clear the active story and optionally delete the persisted ledger."""
with self.open_session(
user_id=user_id,
ledger_id=ledger_id,
write_back=False,
user_auth=user_auth,
) as session:
if session.user is None:
raise ValueError("Story drop requires a user-bound session")
current_ledger_id = session.user.current_ledger_id or session.ledger.uid
session.user.current_ledger_id = None
self._save(session.user)
details: dict[str, Any] = {
"dropped_ledger_id": str(current_ledger_id),
"archived": archive,
}
if not archive:
from tangl.media.story_media import remove_story_media
details["story_media_deleted"] = remove_story_media(current_ledger_id)
details["persistence_deleted"] = self._delete(current_ledger_id)
return RuntimeInfo.ok(
cursor_id=session.ledger.cursor_id,
step=session.ledger.step,
message="Story dropped",
**details,
)
[docs]
@service_method(
access=ServiceAccess.PUBLIC,
context=ServiceContext.NONE,
writeback=ServiceWriteback.EXPLICIT,
operation_id="user.create",
)
def create_user(self, *, secret: str | None = None, **kwargs: Any) -> RuntimeInfo:
"""Create and persist a service user."""
user = User(**kwargs)
if isinstance(secret, str) and secret:
user.set_secret(secret)
self._save(user)
return RuntimeInfo.ok(
message="User created",
user_id=str(user.uid),
)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.USER,
writeback=ServiceWriteback.USER,
operation_id="user.update",
)
def update_user(
self,
*,
user_id: UUID,
user_auth: "UserAuthInfo | None" = None,
**kwargs: Any,
) -> RuntimeInfo:
"""Update one persisted user record."""
self._validate_user_auth(user_id=user_id, user_auth=user_auth)
with self.open_user(user_id, write_back=True) as user:
secret = kwargs.pop("secret", None)
api_key: str | None = None
if isinstance(secret, str) and secret:
user.set_secret(secret)
api_key = key_for_secret(secret)
if "last_played_dt" in kwargs:
user.last_played_dt = parse_datetime_field(
kwargs["last_played_dt"],
field_name="last_played_dt",
)
if "privileged" in kwargs:
next_privileged = parse_bool_flag(
kwargs["privileged"],
field_name="privileged",
)
if bool(getattr(user, "privileged", False)):
user.privileged = next_privileged
details: dict[str, Any] = {"user_id": str(user.uid)}
if api_key is not None:
details["api_key"] = api_key
return RuntimeInfo.ok(message="User updated", **details)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.USER,
writeback=ServiceWriteback.NONE,
operation_id="user.info",
)
def get_user_info(
self,
*,
user_id: UUID,
user_auth: "UserAuthInfo | None" = None,
**kwargs: Any,
) -> UserInfo:
"""Return persisted user profile information."""
self._validate_user_auth(user_id=user_id, user_auth=user_auth)
with self.open_user(user_id, write_back=False) as user:
return UserInfo.from_user(user, **kwargs)
[docs]
@service_method(
access=ServiceAccess.CLIENT,
context=ServiceContext.USER,
writeback=ServiceWriteback.EXPLICIT,
operation_id="user.drop",
)
def drop_user(
self,
*,
user_id: UUID,
user_auth: "UserAuthInfo | None" = None,
) -> RuntimeInfo:
"""Delete one persisted user record."""
self._validate_user_auth(user_id=user_id, user_auth=user_auth)
with self.open_user(user_id, write_back=False) as user:
dropped_ledger_id = user.current_ledger_id
deleted = self._delete(user_id)
details: dict[str, Any] = {
"user_id": str(user_id),
"persistence_deleted": deleted,
}
if dropped_ledger_id is not None:
details["dropped_ledger_id"] = str(dropped_ledger_id)
return RuntimeInfo.ok(message="User dropped", **details)
[docs]
@service_method(
access=ServiceAccess.PUBLIC,
context=ServiceContext.NONE,
writeback=ServiceWriteback.NONE,
operation_id="user.key",
)
def get_key_for_secret(self, *, secret: str) -> UserSecret:
"""Encode a user secret as an API key."""
return UserSecret(api_key=key_for_secret(secret), user_secret=secret)
[docs]
@service_method(
access=ServiceAccess.PUBLIC,
context=ServiceContext.NONE,
writeback=ServiceWriteback.NONE,
operation_id="world.list",
)
def list_worlds(self) -> list[WorldInfo]:
"""List available worlds as typed info models."""
registry = WorldRegistry()
worlds = registry.list_worlds()
manual_worlds = dict(iter_manual_worlds())
if manual_worlds:
known = {item.get("label") for item in worlds}
for label, world in manual_worlds.items():
if label in known:
continue
worlds.append(
{
"label": label,
"metadata": world.metadata or {},
"is_anthology": False,
},
)
info_models: list[WorldInfo] = []
for world in worlds:
metadata = dict(world.get("metadata") or {})
info_models.append(
WorldInfo(
label=world["label"],
title=str(metadata.get("title", world["label"])),
author=metadata.get("author") or "Unknown",
),
)
return info_models
[docs]
@service_method(
access=ServiceAccess.PUBLIC,
context=ServiceContext.WORLD,
writeback=ServiceWriteback.NONE,
operation_id="world.info",
)
def get_world_info(self, *, world_id: str) -> WorldInfo:
"""Return metadata for one resolved world."""
world = self.open_world(world_id)
metadata = dict(world.metadata or {})
metadata.pop("label", None)
metadata.setdefault("title", world.label)
metadata.setdefault("author", "Unknown")
return WorldInfo(label=world.label, **metadata)
[docs]
@service_method(
access=ServiceAccess.DEV,
context=ServiceContext.NONE,
writeback=ServiceWriteback.EXPLICIT,
blocking=BlockingMode.MAY_BLOCK,
capability="world_mutation",
operation_id="world.load",
)
def load_world(
self,
*,
script_path: str | Path | None = None,
script_data: UnstructuredData = None,
) -> RuntimeInfo:
"""Load one ad hoc world into the process-local manual registry."""
if script_path is not None:
path = Path(script_path)
if not path.exists():
raise FileNotFoundError(f"Script not found: {script_path}")
script_data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(script_data, dict):
raise ValueError("script_data is required to load a world")
legacy_label = legacy_world_label(script_data)
world = World.from_script_data(
script_data=script_data,
label=legacy_label,
)
register_manual_world(world)
return RuntimeInfo.ok(message="World loaded", world_label=world.label)
[docs]
@service_method(
access=ServiceAccess.DEV,
context=ServiceContext.WORLD,
writeback=ServiceWriteback.EXPLICIT,
capability="world_mutation",
operation_id="world.unload",
)
def unload_world(self, *, world_id: str) -> RuntimeInfo:
"""Unload one process-local manual world."""
world = self.open_world(world_id)
removed = pop_manual_world(world.label)
if removed is None:
return RuntimeInfo.error(
code="WORLD_NOT_MANUAL",
message="No manual world to unload",
world_label=world.label,
)
return RuntimeInfo.ok(message="World unloaded", world_label=world.label)
[docs]
@service_method(
access=ServiceAccess.PUBLIC,
context=ServiceContext.NONE,
writeback=ServiceWriteback.NONE,
operation_id="system.info",
)
def get_system_info(self) -> SystemInfo:
"""Return service/system metadata."""
return get_system_info()
[docs]
@service_method(
access=ServiceAccess.DEV,
context=ServiceContext.NONE,
writeback=ServiceWriteback.NONE,
capability="dev_tools",
operation_id="system.reset",
)
def reset_system(self, *, hard: bool = False) -> RuntimeInfo:
"""Implementation-specific system reset hook."""
return reset_system(hard=hard)
__all__ = [
"ServiceManager",
"ServiceSession",
]