Source code for secretvaults.user

"""
SecretVault user client for managing owned documents.
"""

import asyncio
from datetime import datetime
from typing import Dict, List, Optional

from nuc.builder import NucTokenBuilder
from nuc.token import Command, Did, InvocationBody
from nuc.envelope import NucTokenEnvelope
from .common.keypair import Keypair
from .common.utils import into_seconds_from_now, inject_ids_into_records

from .base import SecretVaultBaseClient, SecretVaultBaseOptions
from .common.blindfold import BlindfoldFactoryConfig, to_blindfold_key
from .common.cluster import (
    execute_on_cluster,
    process_concealed_object_response,
    process_plaintext_response,
)
from .common.nuc_cmd import NucCmd
from .dto.data import CreateDataResponse, CreateOwnedDataRequest
from .dto.users import (
    DeleteDocumentRequestParams,
    GrantAccessToDataRequest,
    ListDataReferencesResponse,
    ReadDataRequestParams,
    ReadDataResponse,
    ReadUserProfileResponse,
    RevokeAccessToDataRequest,
    UpdateUserDataRequest,
)
from .logger import Log
from .nildb import NilDbUserClient, create_nil_db_user_client


[docs] class SecretVaultUserOptions(SecretVaultBaseOptions[NilDbUserClient]): # pylint: disable=too-few-public-methods """Options for SecretVault user client."""
[docs] class SecretVaultUserClient(SecretVaultBaseClient[NilDbUserClient]): """Client for users to manage owned-documents in SecretVaults."""
[docs] @classmethod async def from_options( cls, keypair: Keypair, base_urls: List[str], blindfold: Optional[BlindfoldFactoryConfig] = None, ) -> "SecretVaultUserClient": """ Creates and initializes a new SecretVaultUserClient instance. Args: keypair: The keypair for authentication base_urls: List of base URLs for the NIL DB services blindfold: Optional blindfold configuration for encryption Returns: SecretVaultUserClient instance """ # Create clients client_promises = [create_nil_db_user_client(url) for url in base_urls] clients = await asyncio.gather(*client_promises) # Create client with or without encryption if blindfold: if hasattr(blindfold, "key") and blindfold.key: # User provided a key client = cls( SecretVaultUserOptions( clients=clients, keypair=keypair, key=blindfold.key, ) ) else: # Create a new key key = await to_blindfold_key(blindfold, cluster_size=len(clients)) client = cls( SecretVaultUserOptions( clients=clients, keypair=keypair, key=key, ) ) else: # No encryption client = cls( SecretVaultUserOptions( clients=clients, keypair=keypair, ) ) Log.info( { "did": keypair.to_did_string()[-8:], "nodes": len(clients), "encryption": client._options.key.__class__.__name__ if client._options.key else "none", }, "SecretVaultUserClient created", ) return client
[docs] async def read_profile(self) -> ReadUserProfileResponse: """Reads the user's profile information from the cluster.""" results_by_node = await execute_on_cluster( self.nodes, lambda client: client.read_profile( self._mint_invocation( command=NucCmd.NIL_DB_USERS_READ, audience=client.id, ) ), ) result = process_plaintext_response(results_by_node) Log.info({"user": self.id}, "User profile read") return result
[docs] async def create_data(self, delegation: str, body: CreateOwnedDataRequest) -> Dict[Did, CreateDataResponse]: """Creates one or more data documents owned by the user.""" create_body = inject_ids_into_records(body) # Prepare map of node-id to node-specific payload node_payloads = await self._prepare_node_payloads(create_body) # Execute on all nodes def create_invocation_token(client): # Parse the delegation token envelope envelope = NucTokenEnvelope.parse(delegation) # Create invocation token builder that extends the delegation builder = NucTokenBuilder.extending(envelope) # Build the token with all required parameters token = ( builder.command(Command(NucCmd.NIL_DB_DATA_CREATE.value.split("."))) .audience(client.id) # Target node's DID .expires_at(datetime.fromtimestamp(into_seconds_from_now(60))) .body(InvocationBody({})) .build(self.keypair.private_key()) ) return token result = await execute_on_cluster( self.nodes, lambda client: client.create_owned_data( create_invocation_token(client), node_payloads[client.id], ), ) Log.info( { "user": self.id, "collection": body.collection, "documents": len(body.data), "concealed": self._options.key is not None, }, "User data created", ) return result
[docs] async def list_data_references(self) -> ListDataReferencesResponse: """Lists references to all data documents owned by the user.""" results_by_node = await execute_on_cluster( self.nodes, lambda client: client.list_data_references( self._mint_invocation( command=NucCmd.NIL_DB_USERS_READ, audience=client.id, ) ), ) result = process_plaintext_response(results_by_node) Log.info( {"user": self.id, "count": len(result.data) if result.data else 0}, "User data references listed", ) return result
[docs] async def read_data( # pylint: disable=too-many-locals,too-many-branches self, params: ReadDataRequestParams ) -> ReadDataResponse: """Reads a single data document, automatically revealing concealed values if a key is configured.""" # Fetch the raw data from all nodes results_by_node = await execute_on_cluster( self.nodes, lambda client: client.read_data( self._mint_invocation( command=NucCmd.NIL_DB_USERS_READ, audience=client.id, subject=Did.parse(params.subject) if params.subject else None, ), params, ), ) key = self._options.key # Extract and process only the 5 DTO fields as plaintext # Use the actual field names (without underscores) as they appear in the data dto_fields = ["id", "created", "updated", "owner", "acl"] # Create a modified results_by_node with only DTO fields dto_results = {} # Create a new dict for non-DTO fields data_results = {} for node_id, response in results_by_node.items(): if hasattr(response, "data") and response.data: # Extract only DTO fields dto_data = {} field_mapping = { "id": "_id", "created": "_created", "updated": "_updated", "owner": "_owner", "acl": "_acl", } for field in dto_fields: if hasattr(response.data, field): aliased_field = field_mapping[field] dto_data[aliased_field] = getattr(response.data, field) dto_results[node_id] = type(response)(data=dto_data) # Extract non-DTO fields data_fields = {} # Convert Pydantic model to dict if hasattr(response.data, "model_dump"): response_dict = response.data.model_dump() else: response_dict = dict(response.data) for field_name, field_value in response_dict.items(): if field_name not in dto_fields: data_fields[field_name] = field_value data_results[node_id] = data_fields else: dto_results[node_id] = response data_results[node_id] = response # Process DTO fields as plaintext dto_result = process_plaintext_response(dto_results) # Process the data fields with key/no key logic if key: # Process with concealed response try: data_result = await process_concealed_object_response({"key": key, "resultsByNode": data_results}) except Exception as e: # pylint: disable=broad-exception-caught Log.warning("Concealed processing failed, using plaintext", error=str(e)) data_result = process_plaintext_response(data_results) else: # Process as plaintext data_result = process_plaintext_response(data_results) # Merge DTO results and data results if hasattr(dto_result, "data") and dto_result.data and data_result: # Convert DTO result to dictionary if hasattr(dto_result.data, "model_dump"): dto_dict = dto_result.data.model_dump() else: dto_dict = dict(dto_result.data) merged_data = {**dto_dict, **data_result} result = merged_data else: result = dto_result Log.info( { "user": self.id, "collection": params.collection, "document": params.document, }, "User data read", ) return result
[docs] async def delete_data(self, params: DeleteDocumentRequestParams) -> Dict[Did, None]: """Deletes a user-owned document from all nodes.""" result = await execute_on_cluster( self.nodes, lambda client: client.delete_data( self._mint_invocation( command=NucCmd.NIL_DB_USERS_DELETE, audience=client.id, ), params, ), ) Log.info( { "user": self.id, "collection": params.collection, "document": params.document, }, "User data deleted", ) return result
[docs] async def grant_access(self, body: GrantAccessToDataRequest) -> Dict[Did, None]: """Grants access to data for a specific user.""" result = await execute_on_cluster( self.nodes, lambda client: client.grant_access( self._mint_invocation( command=NucCmd.NIL_DB_USERS_UPDATE, audience=client.id, ), body, ), ) Log.info( { "user": self.id, "collection": body.collection, "document": body.document, "grantee": body.acl.grantee, }, "Access granted", ) return result
[docs] async def revoke_access(self, body: RevokeAccessToDataRequest) -> Dict[Did, None]: """Revokes access to data for a specific user.""" result = await execute_on_cluster( self.nodes, lambda client: client.revoke_access( self._mint_invocation( command=NucCmd.NIL_DB_USERS_UPDATE, audience=client.id, ), body, ), ) Log.info( { "user": self.id, "collection": body.collection, "document": body.document, "grantee": body.grantee, }, "Access revoked", ) return result
[docs] async def update_data(self, body: UpdateUserDataRequest) -> Dict[Did, None]: """Updates a user-owned document on all nodes.""" # Prepare request payloads node_payload_update = await self._prepare_node_payloads(body) result = await execute_on_cluster( self.nodes, lambda client: client.update_data( self._mint_invocation( command=NucCmd.NIL_DB_USERS_UPDATE, audience=client.id, ), node_payload_update[client.id], ), ) Log.info( { "user": self.id, "collection": body.collection, "document": body.document, }, "User data updated", ) return result
[docs] async def close(self): """Close all node connections.""" for node in self.nodes: if hasattr(node, "close") and callable(getattr(node, "close")): await node.close()
def _mint_invocation(self, command: NucCmd, audience: Did, subject: Optional[Did] = None) -> str: """Mints an invocation token for user operations. Args: command: The NUC command to execute audience: The DID of the target node Returns: A signed invocation token """ # Create invocation token builder builder = NucTokenBuilder.invocation({}) # Build the token with all required parameters token = ( builder.command(Command(command.value.split("."))) .subject(subject if subject else self.id) # User's DID as subject .audience(audience) # Target node's DID .expires_at(datetime.fromtimestamp(into_seconds_from_now(60))) .build(self.keypair.private_key()) ) return token