"""
Builder NIL DB client implementation.
"""
import aiohttp
from ..common.paths import NilDbEndpoint
from ..dto.builders import (
ReadBuilderProfileResponse,
RegisterBuilderRequest,
UpdateBuilderProfileRequest,
)
from ..dto.collections import (
CreateCollectionIndexRequest,
CreateCollectionRequest,
ListCollectionsResponse,
ReadCollectionMetadataResponse,
)
from ..dto.data import (
CreateDataResponse,
CreateStandardDataRequest,
DeleteDataRequest,
DeleteDataResponse,
FindDataRequest,
FindDataResponse,
TailDataResponse,
UpdateDataRequest,
UpdateDataResponse,
)
from ..dto.queries import (
CreateQueryRequest,
ReadQueriesResponse,
ReadQueryResponse,
ReadQueryRunByIdResponse,
RunQueryRequest,
RunQueryResponse,
)
from ..dto.system import ReadAboutNodeResponse
from .base_client import AuthenticatedRequestOptions, NilDbBaseClient, NilDbBaseClientOptions
[docs]
class NilDbBuilderClientOptions(NilDbBaseClientOptions):
"""Options for NIL DB builder client."""
[docs]
class NilDbBuilderClient(NilDbBaseClient): # pylint: disable=too-many-public-methods
"""Builder NIL DB client implementation."""
def __init__(self, options: NilDbBuilderClientOptions):
super().__init__(options)
self._options = options
[docs]
async def register(self, body: RegisterBuilderRequest) -> None:
"""Register a new builder."""
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.builders.register, method="POST", body=body.model_dump(by_alias=True)
),
)
[docs]
async def read_profile(self, token: str) -> ReadBuilderProfileResponse:
"""Retrieve the authenticated builder's profile information."""
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.builders.me, token=token), ReadBuilderProfileResponse
)
[docs]
async def update_profile(self, token: str, body: UpdateBuilderProfileRequest) -> None:
"""Update the authenticated builder's profile information."""
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.builders.me, method="POST", body=body.model_dump(by_alias=True), token=token
),
)
[docs]
async def delete_builder(self, token: str) -> None:
"""Delete the authenticated builder and all associated resources."""
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.builders.me, method="DELETE", token=token),
)
[docs]
async def create_collection(self, token: str, body: CreateCollectionRequest) -> None:
"""Create a new collection for data validation."""
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.collections.root, method="POST", body=body.model_dump(by_alias=True), token=token
),
)
[docs]
async def read_collections(self, token: str) -> ListCollectionsResponse:
"""List all collections owned by the authenticated builder."""
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.collections.root, method="GET", token=token),
ListCollectionsResponse,
)
[docs]
async def delete_collection(self, token: str, collection: str) -> None:
"""Delete a collection by id and all associated data."""
path = NilDbEndpoint.v1.collections.byId.replace(":id", collection)
return await self.request(
AuthenticatedRequestOptions(path=path, method="DELETE", token=token),
)
[docs]
async def read_collection(self, token: str, collection: str) -> ReadCollectionMetadataResponse:
"""Retrieve a collection by id including metadata."""
path = NilDbEndpoint.v1.collections.byId.replace(":id", collection)
return await self.request(
AuthenticatedRequestOptions(path=path, method="GET", token=token), ReadCollectionMetadataResponse
)
[docs]
async def create_collection_index(self, token: str, collection: str, body: CreateCollectionIndexRequest) -> None:
"""Create an index on a collection."""
path = NilDbEndpoint.v1.collections.indexesById.replace(":id", collection)
return await self.request(
AuthenticatedRequestOptions(path=path, method="POST", body=body.model_dump(by_alias=True), token=token),
)
[docs]
async def drop_collection_index(self, token: str, collection: str, index: str) -> None:
"""Drop an index from a collection."""
path = NilDbEndpoint.v1.collections.indexesByNameById.replace(":id", collection).replace(":name", index)
return await self.request(
AuthenticatedRequestOptions(path=path, method="DELETE", token=token),
)
[docs]
async def get_queries(self, token: str) -> ReadQueriesResponse:
"""List all queries owned by the authenticated builder."""
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.queries.root, token=token), ReadQueriesResponse
)
[docs]
async def get_query(self, token: str, query: str) -> ReadQueryResponse:
"""Retrieve a query by id."""
path = NilDbEndpoint.v1.queries.byId.replace(":id", query)
return await self.request(AuthenticatedRequestOptions(path=path, token=token), ReadQueryResponse)
[docs]
async def create_query(self, token: str, body: CreateQueryRequest) -> None:
"""Create a new MongoDB aggregation query with variable substitution."""
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.queries.root, method="POST", body=body.model_dump(by_alias=True), token=token
)
)
[docs]
async def delete_query(self, token: str, query: str) -> None:
"""Delete a query by id."""
path = NilDbEndpoint.v1.queries.byId.replace(":id", query)
return await self.request(
AuthenticatedRequestOptions(path=path, method="DELETE", token=token),
)
[docs]
async def run_query(self, token: str, body: RunQueryRequest) -> RunQueryResponse:
"""Execute a query with variable substitution."""
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.queries.run, method="POST", body=body.model_dump(by_alias=True), token=token
),
RunQueryResponse,
)
[docs]
async def read_query_run_results(self, token: str, run: str) -> ReadQueryRunByIdResponse:
"""Retrieve the status and results of a background query job."""
path = NilDbEndpoint.v1.queries.runById.replace(":id", run)
return await self.request(AuthenticatedRequestOptions(path=path, token=token), ReadQueryRunByIdResponse)
[docs]
async def create_standard_data(self, token: str, body: CreateStandardDataRequest) -> CreateDataResponse:
"""Upload standard data records to a schema-validated collection."""
body_data = self._prepare_request_body(body)
return await self.request(
AuthenticatedRequestOptions(
path=NilDbEndpoint.v1.data.createStandard, method="POST", body=body_data, token=token
),
CreateDataResponse,
)
[docs]
async def find_data(self, token: str, body: FindDataRequest) -> FindDataResponse:
"""Search for data matching the provided filter."""
body_data = self._prepare_request_body(body)
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.data.find, method="POST", body=body_data, token=token),
FindDataResponse,
)
[docs]
async def update_data(self, token: str, body: UpdateDataRequest) -> UpdateDataResponse:
"""Update data records matching the provided filter."""
body_data = self._prepare_request_body(body)
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.data.update, method="POST", body=body_data, token=token),
UpdateDataResponse,
)
[docs]
async def delete_data(self, token: str, body: DeleteDataRequest) -> DeleteDataResponse:
"""Delete data records matching the provided filter."""
body_data = self._prepare_request_body(body)
return await self.request(
AuthenticatedRequestOptions(path=NilDbEndpoint.v1.data.delete, method="POST", body=body_data, token=token),
DeleteDataResponse,
)
[docs]
async def flush_data(self, token: str, collection: str) -> None:
"""Remove all data from a collection."""
path = NilDbEndpoint.v1.data.flushById.replace(":id", collection)
return await self.request(
AuthenticatedRequestOptions(path=path, method="DELETE", token=token),
)
[docs]
async def tail_data(self, token: str, collection: str, limit: int = 10) -> TailDataResponse:
"""Retrieve the most recent data records from a collection."""
path = f"{NilDbEndpoint.v1.data.tailById.replace(':id', collection)}?limit={limit}"
return await self.request(AuthenticatedRequestOptions(path=path, method="GET", token=token), TailDataResponse)
[docs]
async def create_nil_db_builder_client(base_url: str) -> NilDbBuilderClient:
"""
Create a NIL DB builder client.
Args:
base_url: Base URL for the NIL DB service
Returns:
NIL DB builder client
"""
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}/about") as response:
body = await response.json()
about = ReadAboutNodeResponse.model_validate(body)
options = NilDbBuilderClientOptions(about=about, base_url=base_url)
return NilDbBuilderClient(options)