# tangl/core/graph.py
"""Graph topology primitives.
This module consolidates the v38 graph surface (nodes, edges, subgraphs, and
hierarchical nodes) into one file. It focuses on topology shape and typed queries;
VM/service layers provide traversal policy and execution behavior.
See Also
--------
:mod:`tangl.core.registry`
Graph extends :class:`Registry` and reuses registry-aware/group primitives.
:mod:`tangl.vm.frame`
Runtime traversal and cursor behavior is layered above core topology.
"""
from __future__ import annotations
import itertools
from functools import cached_property
from fnmatch import fnmatch
from typing import Any, Iterable, Iterator, Optional
from uuid import UUID
from pydantic import Field
from .entity import Entity
from .registry import EntityGroup, HierarchicalGroup, Registry, RegistryAware
from .selector import Selector
from .singleton import Singleton
def _ancestor_list(item: GraphItem | RegistryAware) -> list[GraphItem | RegistryAware]:
"""Return containment ancestry for graph and hierarchy items.
``GraphItem`` exposes ``ancestors()`` as a method, while
``HierarchicalGroup`` exposes ``ancestors`` as a property. Core graph
helpers need to work across both shapes.
"""
raw_ancestors = getattr(item, "ancestors", None)
if raw_ancestors is None:
return []
if callable(raw_ancestors):
return list(raw_ancestors())
return list(raw_ancestors)
[docs]
class GraphItem(RegistryAware):
"""Base class for items managed by :class:`Graph`.
Graph items are registry-aware entities. ``graph`` is the preferred alias for the
bound registry pointer when the owner is a graph.
"""
@property
def graph(self) -> Graph:
return self._registry
@graph.setter
def graph(self, value: Graph | None) -> None:
# Compatibility for legacy graph containers that assign item.graph directly.
self.bind_registry(value)
@cached_property
def parent(self) -> GraphItem | None:
"""Return first owning hierarchical graph node, if present.
Plain :class:`Subgraph` membership is intentionally ignored here. Subgraphs
are bag-like graph views; only :class:`HierarchicalNode` membership defines
a single parent path for graph items.
"""
registry = getattr(self, "registry", None)
if registry is None:
return None
selector = Selector(has_kind=HierarchicalNode, has_member=self)
return registry.find_one(selector)
def ancestors(self):
"""Yield containment ancestry from membership-derived parent to root."""
seen: set[UUID | None] = {getattr(self, "uid", None)}
current: GraphItem | None = self
while current is not None:
parent = getattr(current, "parent", None)
if parent is None:
return
uid = getattr(parent, "uid", None)
if uid in seen:
return
seen.add(uid)
yield parent
current = parent
def has_path(self, pattern: str) -> bool:
"""Legacy compatibility predicate for selector ``has_path`` criteria."""
path = getattr(self, "path", None)
if not isinstance(path, str):
labels: list[str] = [self.get_label()]
for ancestor in _ancestor_list(self):
labels.append(ancestor.get_label())
path = ".".join(reversed(labels))
return fnmatch(path, pattern)
def has_ancestor_tags(self, tags: set[str]) -> bool:
"""Legacy compatibility predicate for selector ``has_ancestor_tags``."""
if not tags:
return True
wanted = set(tags)
pooled: set[str] = set()
pooled.update(getattr(self, "tags", set()) or set())
for ancestor in _ancestor_list(self):
pooled.update(getattr(ancestor, "tags", set()) or set())
return wanted.issubset(pooled)
def has_ancestor_tags__not(self, tags: set[str]) -> bool:
"""Legacy compatibility negative scope predicate."""
if not tags:
return True
forbidden = set(tags)
pooled: set[str] = set()
pooled.update(getattr(self, "tags", set()) or set())
for ancestor in _ancestor_list(self):
pooled.update(getattr(ancestor, "tags", set()) or set())
return forbidden.isdisjoint(pooled)
[docs]
class Graph(Registry[GraphItem]):
"""Specialized registry for graph topology.
Why
---
Graph centralizes topology shape concerns:
- typed creation helpers for nodes/edges/subgraphs,
- typed selector-based queries,
- and graph-level hook bridges for link/unlink operations.
Notes
-----
Creation helpers instantiate ``kind(**attrs)`` directly, then rely on registry
ownership for binding and lifecycle hooks.
``Graph`` may also carry an optional singleton ``factory`` authority. This is
explicitly structured and unstructured by hand so graphs can recover behavior
registries after persistence. This mirrors the story-layer ``StoryGraph.world``
round-trip shim and is an interim pattern until core gains generic recursive
handling for entity-shaped fields.
Typed find helpers apply narrowing via ``selector.with_criteria(has_kind=...)``.
Because ``with_criteria`` avoids widening ``has_kind``, callers may narrow kinds
but cannot widen helper defaults.
API
---
- :meth:`bind_factory` attaches or clears the singleton graph authority.
- :meth:`path_dist` ranks candidates by shared containment ancestry.
- :meth:`unstructure` and :meth:`structure` preserve singleton factory identity
across graph round-trips.
Example:
>>> g = Graph()
>>> a = Node(label="a", registry=g)
>>> b = Node(label="b", registry=g)
>>> e = g.add_edge(a, b, label="ab")
>>> g.nodes
[<Node:a>, <Node:b>]
>>> e.predecessor is a and e.successor is b
True
"""
factory: Any | None = Field(default=None, exclude=True)
def get_authorities(self) -> list[object]:
"""Return application-level behavior registries for dispatch bootstrapping.
Extension hook — override in application-layer graph subclasses to
expose domain-specific registries to the VM dispatch chain.
A bare ``Graph`` delegates to its bound singleton ``factory`` when present.
Application graphs may override this to add richer authority chains:
- :class:`~tangl.story.StoryGraph` returns ``[story_dispatch]``
and cascades to ``world.get_authorities()`` when a world is attached.
- Future ``WorldGraph``, ``MechanicsGraph``, etc. follow the same pattern.
Why a hook on Graph, not a Protocol
------------------------------------
Dispatch bootstrapping cannot use dispatch to assemble itself — the
authority chain must be discoverable before any hook fires. A duck-typed
method on the graph is the right primitive: the VM layer checks
``getattr(graph, "get_authorities", None)`` and calls it if present,
without type-coupling to any application graph class.
See Also
--------
:meth:`tangl.vm.runtime.frame.PhaseCtx.get_authorities`
Supplies graph/world authority registries to VM dispatch expansion.
:class:`tangl.story.story_graph.StoryGraph`
Reference override — returns story and world authority registries.
Notes
-----
The assembly order matters: ``vm_dispatch`` is always first (SYSTEM
layer), then authorities returned here (APPLICATION and AUTHOR layers)
in declaration order. Lower dispatch layers (LOCAL, INLINE) are added
by ``PhaseCtx.get_authorities`` and inline behavior expansion separately.
"""
authorities: list[object] = []
get_factory_authorities = getattr(self.factory, "get_authorities", None)
if callable(get_factory_authorities):
for authority in get_factory_authorities() or ():
if authority not in authorities:
authorities.append(authority)
return authorities
def bind_factory(self, value: Singleton | None) -> None:
"""Bind a singleton graph authority used for persistence-safe behavior lookup."""
if value is not None and not isinstance(value, Singleton):
raise TypeError("Graph factory must be a Singleton-compatible authority")
self.factory = value
def unstructure(self):
"""Return constructor-form graph data including a singleton factory reference."""
data = super().unstructure()
if self.factory is None or not isinstance(self.factory, Singleton):
return data
data["factory"] = self.factory.unstructure()
return data
@classmethod
def structure(cls, data, _ctx=None):
"""Structure a graph and restore any persisted singleton factory reference."""
def _coerce_kind_refs(value: Any) -> Any:
if isinstance(value, dict):
normalized: dict[str, Any] = {}
for key, item in value.items():
if key == "kind" and isinstance(item, str):
normalized[key] = Entity.dereference_cls_name(item) or item
else:
normalized[key] = _coerce_kind_refs(item)
return normalized
if isinstance(value, list):
return [_coerce_kind_refs(item) for item in value]
return value
payload = _coerce_kind_refs(dict(data))
factory_data = payload.pop("factory", None)
graph = super().structure(payload, _ctx=_ctx)
if factory_data is None:
return graph
if isinstance(factory_data, Singleton):
graph.bind_factory(factory_data)
return graph
if not isinstance(factory_data, dict):
raise TypeError("Persisted graph factory references must be singleton constructor data")
factory = Singleton.structure(dict(factory_data))
if factory is None:
raise LookupError(
"Persisted graph factory reference could not be resolved to a registered singleton: "
f"{factory_data!r}"
)
graph.bind_factory(factory)
return graph
def path_dist(self, a: GraphItem, b: GraphItem) -> int:
"""Return containment distance between two graph items.
The distance is measured over containment ancestry rather than edge
traversal. When two items do not share a common ancestor, they receive a
graph-wide disconnection penalty so connected candidates always sort ahead
of disconnected ones.
"""
self._validate_linkable(a)
self._validate_linkable(b)
def _chain(item: GraphItem) -> list[GraphItem]:
return [item, *_ancestor_list(item)]
a_chain = _chain(a)
b_chain = _chain(b)
b_index = {item.uid: idx for idx, item in enumerate(b_chain)}
for a_idx, item in enumerate(a_chain):
if item.uid in b_index:
return a_idx + b_index[item.uid]
disconnected_penalty = max(1, len(self.members) * 2)
return disconnected_penalty + len(a_chain) + len(b_chain)
def add(self, value: GraphItem, _ctx=None) -> None:
"""Add and bind legacy ``graph`` pointers when present."""
if hasattr(value, "graph"):
try:
value.graph = self
except Exception:
pass
super().add(value, _ctx=_ctx)
def add_node(self, *, kind=None, **attrs) -> Node:
"""Create and add a node-like graph item."""
kind = kind or Node
node = kind(**attrs)
self.add(node)
return node
def add_edge(
self,
predecessor: GraphItem | None = None,
successor: GraphItem | None = None,
*,
kind=None,
**attrs,
) -> Edge:
"""Create and add an edge between predecessor and successor items."""
predecessor_id = attrs.pop("predecessor_id", None)
successor_id = attrs.pop("successor_id", None)
if predecessor is not None:
self._validate_linkable(predecessor)
predecessor_id = predecessor.uid
if successor is not None:
self._validate_linkable(successor)
successor_id = successor.uid
kind = kind or Edge
edge = kind(
predecessor_id=predecessor_id,
successor_id=successor_id,
**attrs,
)
self.add(edge)
return edge
def add_subgraph(self, *, kind=None, members: Iterable[GraphItem] = None, **attrs) -> Subgraph:
"""Create and add a subgraph, then optionally add members."""
kind = kind or Subgraph
subgraph = kind(**attrs)
self.add(subgraph)
for item in members or ():
subgraph.add_member(item)
return subgraph
def _typed_selector(
self,
kind: type[GraphItem],
selector: Selector | None = None,
) -> Selector:
base_selector = self._ensure_selector(selector) or Selector()
return base_selector.with_criteria(has_kind=kind)
def _find_typed_all(
self,
kind: type[GraphItem],
selector: Selector | None = None,
) -> Iterator[GraphItem]:
return self.find_all(selector=self._typed_selector(kind, selector))
def _find_typed_one(
self,
kind: type[GraphItem],
selector: Selector | None = None,
) -> GraphItem | None:
return self.find_one(selector=self._typed_selector(kind, selector))
def find_subgraphs(
self,
selector: Selector | None = None,
) -> Iterator[Subgraph]:
return self._find_typed_all(Subgraph, selector)
def find_subgraph(
self,
selector: Selector | None = None,
) -> Optional[Subgraph]:
return self._find_typed_one(Subgraph, selector)
@property
def subgraphs(self) -> list[Subgraph]:
return list(self.find_subgraphs())
def find_edges(
self,
selector: Selector | None = None,
) -> Iterator[Edge]:
return self._find_typed_all(Edge, selector)
@property
def edges(self) -> list[Edge]:
return list(self.find_edges())
def find_edge(
self,
selector: Selector | None = None,
) -> Optional[Edge]:
return self._find_typed_one(Edge, selector)
def find_nodes(
self,
selector: Selector | None = None,
) -> Iterator[Node]:
return self._find_typed_all(Node, selector)
@property
def nodes(self) -> list[Node]:
return list(self.find_nodes())
def find_node(
self,
selector: Selector | None = None,
) -> Optional[Node]:
return self._find_typed_one(Node, selector)
def _do_link(self, caller: GraphItem, node: Node, _ctx):
"""Bridge to dispatch ``do_link`` hook."""
from .dispatch import do_link
return do_link(caller=caller, node=node, ctx=_ctx)
def _do_unlink(self, caller: GraphItem, node: Node, _ctx):
"""Bridge to dispatch ``do_unlink`` hook."""
from .dispatch import do_unlink
return do_unlink(caller=caller, node=node, ctx=_ctx)
[docs]
class Subgraph(EntityGroup, GraphItem):
"""Non-hierarchical grouping of graph items.
Subgraph reuses :class:`EntityGroup` membership semantics and adds graph-level
link/unlink hook bridge calls when context is provided.
"""
def add_member(self, item: GraphItem, _ctx=None) -> None:
self.graph._validate_linkable(item)
from .ctx import resolve_ctx
_ctx = resolve_ctx(_ctx)
if _ctx is not None:
self.graph._do_link(self, item, _ctx)
item._invalidate_parent_attr()
super().add_member(item)
def remove_member(self, item: GraphItem, _ctx=None) -> None:
from .ctx import resolve_ctx
_ctx = resolve_ctx(_ctx)
if _ctx is not None:
self.graph._do_unlink(self, item, _ctx)
item._invalidate_parent_attr()
super().remove_member(item)
def members(self, selector: Selector | None = None, sort_key=None) -> Iterator[GraphItem]:
"""Yield typed graph-item members."""
return super().members(selector=selector, sort_key=sort_key)
[docs]
class Edge(GraphItem):
"""Directed connection between predecessor and successor graph items.
Notes
-----
- Endpoints may be dangling (`None`) by design.
Access patterns:
- ``edge.predecessor`` / ``edge.successor`` for graph-dereferenced properties.
- ``edge.set_predecessor(node, _ctx=...)`` for explicit context-driven mutation.
- ``edge.predecessor = node`` for ambient-context mutation only.
Example:
>>> g = Graph()
>>> n = Node(label="n", registry=g)
>>> e = Edge(registry=g, successor_id=n.uid)
>>> g.find_one(Selector(successor=n))
<Edge:anon->n>
"""
predecessor_id: Optional[UUID] = None
successor_id: Optional[UUID] = None
@property
def predecessor(self) -> Optional[Node]:
return self.graph.get(self.predecessor_id)
def set_predecessor(self, value: Node, _ctx=None) -> None:
from .ctx import resolve_ctx
_ctx = resolve_ctx(_ctx)
if value is not None:
self.graph._validate_linkable(value)
if _ctx is not None:
self.graph._do_link(self, value, _ctx)
self.predecessor_id = value.uid
else:
if _ctx is not None and self.predecessor is not None:
self.graph._do_unlink(self, self.predecessor, _ctx)
self.predecessor_id = None
@predecessor.setter
def predecessor(self, value: Node) -> None:
self.set_predecessor(value)
@property
def successor(self) -> Optional[Node]:
return self.graph.get(self.successor_id)
def set_successor(self, value: Node, _ctx=None) -> None:
from .ctx import resolve_ctx
_ctx = resolve_ctx(_ctx)
if value is not None:
self.graph._validate_linkable(value)
if _ctx is not None:
self.graph._do_link(self, value, _ctx)
self.successor_id = value.uid
else:
if _ctx is not None and self.successor is not None:
self.graph._do_unlink(self, self.successor, _ctx)
self.successor_id = None
@successor.setter
def successor(self, value: Node) -> None:
self.set_successor(value)
def __repr__(self) -> str:
src = self.predecessor.get_label() if self.predecessor is not None else "anon"
dst = self.successor.get_label() if self.successor is not None else "anon"
return f"<{self.__class__.__name__}:{src[:6]}->{dst[:6]}>"
[docs]
class Node(GraphItem):
"""Graph vertex with directional edge navigation and wiring helpers."""
def edges_in(
self,
selector: Selector | None = None,
) -> Iterator[Edge]:
selector = (self.graph._ensure_selector(selector) or Selector()).with_criteria(successor=self)
return self.graph.find_edges(selector=selector)
def predecessors(
self,
selector: Selector | None = None,
) -> Iterator[Node]:
"""Yield immediate predecessor nodes via incoming edges."""
return (
edge.predecessor
for edge in self.edges_in(selector=selector)
if edge.predecessor
)
def edges_out(
self,
selector: Selector | None = None,
) -> Iterator[Edge]:
selector = (self.graph._ensure_selector(selector) or Selector()).with_criteria(predecessor=self)
return self.graph.find_edges(selector=selector)
def successors(
self,
selector: Selector | None = None,
) -> Iterator[Node]:
"""Yield immediate successor nodes via outgoing edges."""
return (
edge.successor
for edge in self.edges_out(selector=selector)
if edge.successor
)
def edges(
self,
selector: Selector | None = None,
) -> Iterator[Edge]:
"""Yield all incident edges (incoming then outgoing)."""
return itertools.chain(
self.edges_in(selector=selector),
self.edges_out(selector=selector),
)
def add_edge_to(self, node: Node, kind=None, **attrs) -> Edge:
"""Create an edge from this node to ``node``."""
return self.graph.add_edge(self, node, kind=kind, **attrs)
def add_edge_from(self, node: Node, kind=None, **attrs) -> Edge:
"""Create an edge from ``node`` to this node."""
return self.graph.add_edge(node, self, kind=kind, **attrs)
def remove_edge_to(self, node: Node) -> None:
edge = self.graph.find_edge(Selector(predecessor=self, successor=node))
if edge is not None:
self.graph.remove(edge.uid)
def remove_edge_from(self, node: Node) -> None:
edge = self.graph.find_edge(Selector(predecessor=node, successor=self))
if edge is not None:
self.graph.remove(edge.uid)
[docs]
class HierarchicalNode(HierarchicalGroup, Node):
"""Node + hierarchy composition.
This class combines :class:`HierarchicalGroup` (parent/path/ancestor semantics)
with :class:`Node` (edge navigation and wiring helpers).
"""