"""Replay engine implementations for vm."""from__future__importannotationsfromtypingimportIterablefromuuidimportUUIDfromtangl.coreimportGraphfrom.contractsimportReplayDelta,ReplayEnginefrom.patchimportEvent,OpEnum,Patchfrom.recordsimportCheckpointRecordDEFAULT_ALGORITHM_ID="diff_v1"
[docs]classDiffReplayEngine:"""Diff-based replay engine. Deltas are represented as :class:`Patch` records containing create/update/delete events for graph members. """defalgorithm_id(self)->str:returnDEFAULT_ALGORITHM_IDdefbuild_delta(self,*,before_graph:Graph,after_graph:Graph)->Patch|None:events:list[Event]=[]before_ids=set(before_graph.members.keys())after_ids=set(after_graph.members.keys())foritem_idinsorted(before_ids-after_ids,key=str):events.append(Event(operation=OpEnum.DELETE,item_id=item_id,))foritem_idinsorted(after_ids-before_ids,key=str):item=after_graph.get(item_id)ifitemisNone:continueevents.append(Event(operation=OpEnum.CREATE,item_id=item_id,value=item.unstructure(),))foritem_idinsorted(before_ids&after_ids,key=str):before_item=before_graph.get(item_id)after_item=after_graph.get(item_id)ifbefore_itemisNoneorafter_itemisNone:continueifbefore_item.value_hash()==after_item.value_hash():continueevents.append(Event(operation=OpEnum.UPDATE,item_id=item_id,value=after_item.unstructure(),))ifnotevents:returnNonereturnPatch(registry_id=after_graph.uid,initial_registry_value_hash=before_graph.value_hash(),final_registry_value_hash=after_graph.value_hash(),events=events,)defapply_delta(self,*,graph:Graph,delta:ReplayDelta)->Graph:returndelta.apply_to(graph)defmake_checkpoint(self,*,graph:Graph,step:int,cursor_id:UUID,call_stack_ids:Iterable[UUID],)->CheckpointRecord:returnCheckpointRecord(step=step,algorithm_id=self.algorithm_id(),graph_payload=graph.unstructure(),state_hash=graph.value_hash(),cursor_id=cursor_id,call_stack_ids=list(call_stack_ids),)defrestore_checkpoint(self,checkpoint:CheckpointRecord)->Graph:graph=checkpoint.restore_graph()ifgraph.value_hash()!=checkpoint.state_hash:raiseValueError("Checkpoint hash mismatch")returngraph
defget_replay_engine(algorithm_id:str)->ReplayEngine:"""Resolve a replay engine by id."""ifalgorithm_id==DEFAULT_ALGORITHM_ID:returnDiffReplayEngine()raiseValueError(f"Unknown replay algorithm: {algorithm_id}")