"""Authentication helpers for service transports."""from__future__importannotationsimportbase64fromdataclassesimportdataclassfromtypingimportAny,Iterable,Mapping,MutableMappingfromuuidimportUUID
[docs]@dataclass(frozen=True)classUserAuthInfo:"""Resolved authentication context for a user-bound request."""user_id:UUIDis_privileged:bool=False
defuser_id_by_key(api_key:str,persistence:Any,*,reverse_index:MutableMapping[str,UUID]|None=None,)->UserAuthInfo|None:"""Resolve ``api_key`` to :class:`UserAuthInfo` when possible. Returns ------- UserAuthInfo | None ``UserAuthInfo`` when the key resolves; otherwise ``None``. Lookup order: 1) reverse index cache (if provided), 2) persistence scan for matching ``content_hash``. """ifnotapi_keyorpersistenceisNone:returnNoneexpected_hash=_decode_key_hash(api_key)ifexpected_hashisNone:returnNoneifreverse_indexisnotNone:cached_user=_get_from_persistence(persistence,reverse_index.get(api_key))if_user_matches_hash(cached_user,expected_hash):return_auth_info_from_user(cached_user)reverse_index.pop(api_key,None)forcandidatein_iter_persistence_values(persistence):ifnot_user_matches_hash(candidate,expected_hash):continueifreverse_indexisnotNone:reverse_index[api_key]=candidate.uidreturn_auth_info_from_user(candidate)returnNonedef_auth_info_from_user(user:Any)->UserAuthInfo:returnUserAuthInfo(user_id=user.uid,is_privileged=bool(getattr(user,"privileged",False)),)def_decode_key_hash(api_key:str)->bytes|None:missing_padding=(-len(api_key))%4padded_key=api_key+("="*missing_padding)try:returnbase64.urlsafe_b64decode(padded_key)except(ValueError,TypeError):returnNonedef_iter_persistence_values(persistence:Any)->Iterable[Any]:values=getattr(persistence,"values",None)ifcallable(values):yield fromvalues()returnifisinstance(persistence,Mapping):yield frompersistence.values()returnforkeyinpersistence:item=_get_from_persistence(persistence,key)ifitemisnotNone:yielditemdef_get_from_persistence(persistence:Any,identifier:Any)->Any:ifidentifierisNone:returnNonegetter=getattr(persistence,"get",None)ifcallable(getter):returngetter(identifier)try:returnpersistence[identifier]exceptKeyError:returnNonedef_normalize_hash(value:Any)->bytes|None:ifvalueisNone:returnNoneifisinstance(value,bytes):returnvalueifisinstance(value,bytearray):returnbytes(value)ifisinstance(value,str):try:returnbytes.fromhex(value)exceptValueError:returnNonereturnNonedef_looks_like_user(value:Any)->bool:return(valueisnotNoneandhasattr(value,"uid")andhasattr(value,"current_ledger_id"))def_user_matches_hash(candidate:Any,expected_hash:bytes)->bool:ifnot_looks_like_user(candidate):returnFalseactual_hash=_normalize_hash(getattr(candidate,"content_hash",None))returnactual_hash==expected_hash__all__=["UserAuthInfo","user_id_by_key"]