# tangl/core/record.py
"""Immutable ordered records and append-only ordered registries.
This module defines :class:`Record` as a frozen, content-addressed artifact and
:class:`OrderedRegistry` as an append-only :class:`~tangl.core.registry.Registry`
specialization with range slicing over a comparable sort-key space.
See Also
--------
:mod:`tangl.core.bases`
Record composition traits (:class:`HasContent`, :class:`HasOrder`).
:mod:`tangl.core.registry`
Base registry behavior, selector filtering, and mapping semantics.
"""
from __future__ import annotations
from typing import Any, Callable, ClassVar, Iterable, Iterator, TypeVar, Union
from pydantic import ConfigDict, Field, ValidationError
from tangl.type_hints import Identifier
from .bases import HasContent, HasOrder
from .entity import Entity
from .registry import Registry
from .selector import Selector
ET = TypeVar("ET", bound="Entity")
[docs]
class Record(HasContent, HasOrder, Entity):
"""Frozen ordered artifact with content identity and optional origin reference.
Why
---
Records capture immutable runtime facts that should compare by content and remain
orderable for stream-like processing.
Key Features
------------
- **Three identity layers**: stable ``uid`` from :class:`Entity`, content equality
from :class:`HasContent`, and sequence ordering from :class:`HasOrder`.
- **Frozen + flexible schema**: ``frozen=True`` with ``extra='allow'`` supports
arbitrary payload fields in derived record families.
- **External origin dereference**: ``origin_id`` stores a producer reference, and
:meth:`origin` resolves it through an explicitly supplied registry.
Notes
-----
``origin_id`` is not a registry-aware pointer. Dereference requires passing the
correct lookup registry at call time.
Example:
>>> r = Record(content="foo")
>>> r.get_hashable_content()
'foo'
>>> try:
... r.content = "bar"
... except ValidationError as e:
... print(e) # doctest: +ELLIPSIS
1 validation error ...
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow", frozen=True)
origin_id: Identifier = None
def get_hashable_content(self) -> Any:
"""Return the primary content field used for content hashing.
Resolution order is ``content`` then ``payload`` then ``data``.
"""
for field_name in ["content", "payload", "data"]:
if hasattr(self, field_name):
return getattr(self, field_name)
raise AttributeError("No content available.")
# since a record is frozen, we _can_ cache the value hash minus non-generics
# (uid, label) and use it as the content hash in a pinch.
def origin(self, registry: Registry[ET]) -> ET:
"""Resolve ``origin_id`` through an explicit registry."""
return registry.get(self.origin_id)
OrderedEntity = TypeVar("OrderedEntity", bound=Union[Entity, HasOrder])
[docs]
class OrderedRegistry(Registry[OrderedEntity]):
"""Append-only ordered registry with sort-key range slicing.
Why
---
Ordered registries provide deterministic range queries over members with
:meth:`sort_key` support while keeping the core primitive independent from
higher-level stream marker/bookmark policies.
Key Features
------------
- append-only mutation model via :meth:`append`/:meth:`extend`;
- generic key accessors :meth:`min_key` / :meth:`max_key`;
- half-open range queries through :meth:`get_slice` with optional selector
composition.
Notes
-----
Named bookmarks/sections are intentionally out of scope for this core type and
should be layered above it (for example in VM/story stream services). Core
keeps append/slice only; bookmark channels and destructive undo truncation are
runtime-policy concerns.
"""
markers: dict[str, dict[str, int]] = Field(default_factory=dict)
def append(self, record: OrderedEntity) -> None:
"""Append one ordered entity to the registry."""
self.add(record)
def extend(self, records: Iterable[OrderedEntity]) -> None:
"""Append many ordered entities in input order."""
for record in records:
self.append(record)
def min_key(self, sort_key: Callable[[OrderedEntity], Any] | None = None) -> Any:
"""Return minimum member sort key, or ``None`` for an empty registry."""
if not self.members:
return None
key_fn = sort_key or (lambda member: member.sort_key())
return min(key_fn(member) for member in self.members.values())
def max_key(self, sort_key: Callable[[OrderedEntity], Any] | None = None) -> Any:
"""Return maximum member sort key, or ``None`` for an empty registry."""
if not self.members:
return None
key_fn = sort_key or (lambda member: member.sort_key())
return max(key_fn(member) for member in self.members.values())
def add_record(self, record: OrderedEntity) -> None:
"""Legacy compatibility helper used by stream-oriented call sites."""
self.append(record)
def _last_seq(self) -> int | None:
"""Return the highest record sequence, or ``None`` when empty."""
last_item = self.last()
return int(last_item.seq) if last_item is not None else None
def last(self, selector: Selector | None = None) -> OrderedEntity | None:
"""Return the last matching item by ``seq``."""
selector = self._ensure_selector(selector)
results = list(self.find_all(selector=selector))
if not results:
return None
return max(results, key=lambda item: int(getattr(item, "seq", -1)))
def get_slice(
self,
start_key: Any = None,
stop_key: Any = None,
selector: Selector | None = None,
sort_key: Callable[[OrderedEntity], Any] | None = None,
*,
start_seq: Any = None,
end_seq: Any = None,
predicate: Callable[[OrderedEntity], bool] | None = None,
) -> Iterator[OrderedEntity]:
"""Yield members where ``start_key <= sort_key(member) < stop_key``.
Bounds are half-open and optional. Passing ``None`` for either bound means
unbounded in that direction.
"""
if start_seq is not None:
start_key = start_seq
if end_seq is not None:
stop_key = end_seq
key_fn = sort_key or (lambda member: member.sort_key())
def in_range(member: OrderedEntity) -> bool:
key = key_fn(member)
if start_key is not None and key < start_key:
return False
if stop_key is not None and key >= stop_key:
return False
return True
selector = self._ensure_selector(selector) or Selector()
base_predicate = selector.predicate
def combined_predicate(member: OrderedEntity) -> bool:
if not in_range(member):
return False
if base_predicate is not None and not base_predicate(member):
return False
if predicate is not None and not predicate(member):
return False
return True
effective_selector = selector.with_criteria(predicate=combined_predicate)
return self.find_all(effective_selector, sort_key=key_fn)
def set_marker(
self,
marker_name: str,
marker_type: str = "_",
marker_seq: int | None = None,
*,
overwrite: bool = False,
) -> None:
"""Set/update a named stream marker (legacy compatibility)."""
next_seq = self._last_seq()
marker_seq = (next_seq + 1 if next_seq is not None else 0) if marker_seq is None else marker_seq
marker_bucket = self.markers.setdefault(marker_type, {})
if not overwrite and marker_name in marker_bucket:
raise KeyError(f"Marker {marker_name} already exists")
marker_bucket[marker_name] = marker_seq
def _next_marker_seq(self, start_seq: int, marker_type: str = "_") -> int:
"""Return next marker seq of same type, or stream end."""
max_seq = self._last_seq()
stream_end = max_seq + 1 if max_seq is not None else 0
marker_bucket = self.markers.get(marker_type, {})
if not marker_bucket:
return stream_end
next_seqs = sorted(seq for seq in marker_bucket.values() if seq > start_seq)
return next_seqs[0] if next_seqs else stream_end
def get_section(
self,
marker_name: str,
marker_type: str = "_",
selector: Selector | None = None,
) -> Iterator[OrderedEntity]:
"""Yield records in [marker, next-marker) for a marker namespace."""
selector = self._ensure_selector(selector)
marker_bucket = self.markers.get(marker_type) or {}
if not marker_bucket:
raise KeyError(f"{marker_name}@{marker_type} not found")
if marker_name == "latest":
_, start_seq = max(marker_bucket.items(), key=lambda item: item[1])
else:
if marker_name not in marker_bucket:
raise KeyError(f"{marker_name}@{marker_type} not found")
start_seq = marker_bucket[marker_name]
end_seq = self._next_marker_seq(start_seq, marker_type)
return self.get_slice(start_seq=start_seq, end_seq=end_seq, selector=selector)
def remove(self, *_args: Any, **_kwargs: Any) -> None:
"""Disallow removal to preserve append-only history semantics."""
raise NotImplementedError("Cannot remove records from an OrderedRegistry.")