Source code for tangl.vm.provision.requirement

"""Frontier constraint edges for the vm provisioning pipeline.

Defines the requirement data model and the topology carriers that embed
requirements into the runtime graph.
"""

from __future__ import annotations
from typing import Any, Optional, Generic, Iterable, TypeVar
from uuid import UUID

from pydantic import Field

from tangl.core import Entity, Registry, RegistryAware, Selector, Edge, Node, EntityTemplate
from ..ctx import VmPhaseCtx
from .provisioner import ProvisionPolicy

PT = TypeVar('PT', bound=RegistryAware)  # 'ProviderType'

[docs] class Requirement(Selector, Generic[PT]): """Requirement(has_kind: type | None, has_identifier: str | None) Unsatisfied resource contract carried by a frontier edge. Why ---- Requirements separate the shape of what is needed from the graph structure that carries that need. This lets the resolver reason about candidates, policy, and diagnostics before mutating runtime topology. Key Features ------------ * Extends :class:`~tangl.core.Selector` so requirement matching reuses the engine's normal selection semantics. * Tracks satisfaction, selected policy, and resolution metadata for replay and diagnostics. * Supports soft requirements and deferred UPDATE or CLONE formulas. API --- - :attr:`satisfied` reports whether the requirement is currently fulfilled. - :meth:`satisfied_by` evaluates a candidate provider. - :meth:`from_identifier` builds a simple identifier-driven requirement. Example: >>> e = Entity(label='foo') >>> req = Requirement.from_identifier('foo') >>> req.satisfied False >>> req.satisfied_by(e) True >>> req.provider_id = e.uid >>> req.satisfied True >>> req = Requirement(hard_requirement=False) >>> req.satisfied True """ # Not an entity, a serializable collection of traits that can be # used as a component on an entity with a requirement. provision_policy: ProvisionPolicy = ProvisionPolicy.ANY provider_id: Optional[UUID] = None hard_requirement: bool = True # todo: validate not none selector @property def satisfied(self): return self.provider_id is not None or not self.hard_requirement unsatisfiable: Optional[bool] = None # unknown unambiguously_resolved: Optional[bool] = None # unknown selected_offer_policy: Optional[ProvisionPolicy] = None resolved_step: Optional[int] = None resolved_cursor_id: Optional[UUID] = None resolution_reason: Optional[str] = None resolution_meta: Optional[dict[str, Any]] = None def _matches_identifier(self, entity: PT) -> bool: extra = self.__pydantic_extra__ or {} identifier = extra.get("has_identifier") if not isinstance(identifier, str) or not identifier: return True has_identifier = getattr(entity, "has_identifier", None) if callable(has_identifier): try: if bool(has_identifier(identifier)): return True except (TypeError, ValueError, AttributeError): pass if "." in identifier: path = getattr(entity, "path", None) if isinstance(path, str) and path == identifier: return True return False def satisfied_by(self, entity: PT) -> bool: if not self._matches_identifier(entity): return False criteria = dict(self.__pydantic_extra__ or {}) for key in ("has_identifier", "authored_path", "is_qualified", "is_absolute"): criteria.pop(key, None) return Selector(predicate=self.predicate, **criteria).matches(entity) def _validate_satisfied_by(self, entity: PT) -> bool: if not self.satisfied_by(entity): raise ValueError(f'Requirement {self} not satisfied by {entity!r}') return True fallback_templ: Optional[EntityTemplate] = None # a requirement can satisfy itself if it carries an inline template # a cloner could take such an offer and combine it with an existing source # Authoring-path metadata used by qualified/unqualified policy forks. authored_path: str | None = None is_qualified: bool = False is_absolute: bool = False # Optional two-part formula for late synthesized UPDATE/CLONE offers. # These are typed fields (not selector extras), so they do not participate # in provider matching criteria. reference_selector: Optional[Selector] = None update_template_selector: Optional[Selector] = None media_spec: Any = None media_ref_id: Optional[UUID] = None media_basename: Optional[str] = None
def stamp_requirement_resolution( requirement: Requirement[Any], *, _ctx: VmPhaseCtx | None = None, ) -> None: """Write shared step/cursor resolution metadata from a typed runtime context.""" if _ctx is None: requirement.resolved_step = None requirement.resolved_cursor_id = None return step = _ctx.step requirement.resolved_step = step if isinstance(step, int) else None cursor_id = _ctx.cursor_id requirement.resolved_cursor_id = cursor_id if isinstance(cursor_id, UUID) else None
[docs] class HasRequirement(RegistryAware, Generic[PT]): """HasRequirement(requirement: Requirement[PT]) Mixin that makes a registry-aware carrier expose one embedded requirement. Why ---- Resolver logic needs a uniform way to read satisfaction state, retrieve the resolved provider, and write resolution metadata back to the carrier edge. Key Features ------------ * Centralizes provider bookkeeping on the embedded :class:`Requirement`. * Mirrors satisfaction and resolution metadata through simple properties. * Validates provider compatibility before storing the resolved provider id. API --- - :attr:`provider` and :meth:`set_provider` synchronize the embedded requirement with registry linkage. - :attr:`satisfied` and :meth:`satisfied_by` delegate to the embedded requirement. Example: >>> reg = Registry() >>> r = HasRequirement(requirement={'has_identifier': 'foo'}); reg.add(r) >>> r.satisfied False >>> e = RegistryAware(label='foo'); reg.add(e) >>> r.satisfied_by(e) True >>> r.provider = e >>> r.satisfied True >>> r.provider <RegistryAware:foo> >>> f = RegistryAware(label='bar'); reg.add(f) >>> r.satisfied_by(f) False >>> try: ... r.provider = f ... except ValueError as e: ... print(e) Requirement ... not satisfied by <RegistryAware:bar> """ # typically an entity will have only one requirement to avoid bookkeeping confusion. # nodes may have multiple dependency edges, for example, but each dependency has # a single requirement. requirement: Requirement[PT] # delegators @property def provider(self) -> Optional[PT]: if self.requirement.provider_id is not None: return self.registry.get(self.requirement.provider_id) def set_provider(self, provider: PT, _ctx=None) -> None: if (self.requirement._validate_satisfied_by(provider) and self.registry._validate_linkable(provider)): self.requirement.provider_id = provider.uid stamp_requirement_resolution(self.requirement, _ctx=_ctx) @provider.setter def provider(self, value: PT) -> None: self.set_provider(value) @property def satisfied(self): return self.requirement.satisfied @property def resolution_reason(self) -> Optional[str]: return self.requirement.resolution_reason @property def resolution_meta(self) -> Optional[dict[str, Any]]: return self.requirement.resolution_meta def satisfied_by(self, entity: PT) -> bool: return self.requirement.satisfied_by(entity)
# Provides the carrier mechanism to map requirements into the graph-topology. # Alternatively, they could be represented as control-Nodes that weld together # ingoing and outgoing edges under a given name. # These are dynamic edges that can provoke a topological update via an 'update' event # on their requirement component. Need to be careful to watch that.
[docs] class Dependency(Edge, HasRequirement[PT], Generic[PT]): """Dependency(predecessor_id: UUID | None = None, requirement: Requirement[PT]) Pull-resource edge whose predecessor declares a needed provider. Why ---- Dependencies make missing topology explicit. A frontier node can advertise what it needs before any provider exists, then let the resolver bind a concrete successor later during planning. Key Features ------------ * Couples one embedded :class:`Requirement` to graph topology. * Keeps ``provider`` and ``successor`` synchronized so resolution state is visible through normal edge traversal. API --- - :meth:`set_provider` validates and stores the resolved provider while synchronizing ``successor``. - :meth:`set_successor` provides a topology-first alias that also updates requirement state. Example: >>> reg = Registry() >>> r = Dependency(requirement={'has_identifier': 'foo'}); reg.add(r) >>> r.satisfied False >>> e = RegistryAware(label='foo'); reg.add(e) >>> r.successor = e >>> r.satisfied True >>> r.provider <RegistryAware:foo> >>> r.successor <RegistryAware:foo> """ # another layer of delegators, the important thing here is that the 'on_link' # hook gets triggered when the provider is set. Could do that by syncing the # provider and successor, or by ignoring successor_id and rewiring set_provider to # call the on_link hook. def set_provider(self, value: Optional[PT], _ctx=None) -> None: # could just leave successor_id None and defer to provider-id in all cases to # entirely avoid syncing concern. might also make the end-type more clear as # the expected provider type. if value is None: self.requirement.provider_id = None self.requirement.resolved_step = None self.requirement.resolved_cursor_id = None super().set_successor(None, _ctx=_ctx) return super().set_provider(value, _ctx=_ctx) super().set_successor(value, _ctx=_ctx) def set_successor(self, value: Optional[PT], _ctx=None) -> None: if value is None: self.requirement.provider_id = None self.requirement.resolved_step = None self.requirement.resolved_cursor_id = None super().set_successor(None, _ctx=_ctx) return self.set_provider(value, _ctx=_ctx)
[docs] class Affordance(Edge, HasRequirement[PT], Generic[PT]): """Affordance(predecessor_id: UUID | None = None, requirement: Requirement[PT]) Push-resource edge whose predecessor offers a provider to nearby consumers. Why ---- Affordances model already-available local providers that should be treated as preferred EXISTING offers before wider search proceeds. Key Features ------------ * Couples one embedded :class:`Requirement` to a provider edge that pushes availability outward from its predecessor. * Keeps ``provider`` and ``successor`` synchronized so local offer graphs stay consistent. API --- - :meth:`set_provider` validates and stores the resolved provider while synchronizing ``successor``. - :meth:`set_successor` provides a topology-first alias that also updates requirement state. Example: >>> reg = Registry() >>> r = Affordance(requirement={'has_identifier': 'foo'}); reg.add(r) >>> r.satisfied False >>> e = RegistryAware(label='foo'); reg.add(e) >>> r.successor = e >>> r.satisfied True >>> r.provider <RegistryAware:foo> >>> r.successor <RegistryAware:foo> """ # another layer of delegators def set_provider(self, value: Optional[PT], _ctx=None) -> None: """Bind provider and synchronize to ``successor`` (push resource).""" if value is None: self.requirement.provider_id = None self.requirement.resolved_step = None self.requirement.resolved_cursor_id = None super().set_successor(None, _ctx=_ctx) return super().set_provider(value, _ctx=_ctx) super().set_successor(value, _ctx=_ctx) def set_successor(self, value: Optional[PT], _ctx=None) -> None: """Provider alias for ``successor`` on affordance edges.""" if value is None: self.requirement.provider_id = None self.requirement.resolved_step = None self.requirement.resolved_cursor_id = None super().set_successor(None, _ctx=_ctx) return self.set_provider(value, _ctx=_ctx)
class Fanout(Edge, Generic[PT]): """Fanout(predecessor_id: UUID | None = None, requirement: Requirement[PT]) Non-blocking gather edge that resolves to every eligible provider. Why ---- Some runtime nodes, such as activity hubs or sandbox menus, do not need one best provider. They need the complete set of currently eligible providers so later planning handlers can surface them as affordances or dynamic actions. Key Features ------------ * Carries one selector-shaped :class:`Requirement` while storing many resolved provider ids. * Exposes dereferenced ``providers`` plus explicit ``set_providers`` and ``clear_providers`` helpers. * Remains non-blocking: an empty provider set is valid and does not make frontier resolution fail. API --- - :attr:`providers` dereferences all currently linked provider ids. - :meth:`set_providers` validates and stores a whole provider set. - :meth:`clear_providers` removes all current links. """ requirement: Requirement[PT] provider_ids: list[UUID] = Field(default_factory=list) @property def providers(self) -> list[PT]: registry = getattr(self, "registry", None) if registry is None: return [] providers: list[PT] = [] for provider_id in self.provider_ids: provider = registry.get(provider_id) if provider is not None: providers.append(provider) return providers def set_providers(self, providers: Iterable[PT], _ctx=None) -> None: registry = getattr(self, "registry", None) if registry is None: raise ValueError("Fanout must be bound to a registry before setting providers") validated_ids: list[UUID] = [] seen_ids: set[UUID] = set() for provider in providers: if not self.requirement._validate_satisfied_by(provider): continue if registry._validate_linkable(provider): if provider.uid in seen_ids: continue seen_ids.add(provider.uid) validated_ids.append(provider.uid) self.provider_ids = validated_ids stamp_requirement_resolution(self.requirement, _ctx=_ctx) def clear_providers(self) -> None: self.provider_ids.clear() self.requirement.resolved_step = None self.requirement.resolved_cursor_id = None