feat: Filter response fields (#40)

* feat: Filter response fields

* changes

* changes
This commit is contained in:
vikynoah
2025-03-24 09:25:52 +01:00
committed by GitHub
parent 0af8f05edf
commit f586ce5c03
2 changed files with 54 additions and 4 deletions

View File

@@ -1,4 +1,4 @@
from typing import Type, TypeVar, overload
from typing import Type, TypeVar, overload, List
from uuid import UUID
from fastapi import HTTPException
@@ -19,13 +19,14 @@ async def get_object_or_404(
db: AsyncSession,
expunge: bool = False,
lookup_column: str = "id",
response_fields: List[str] = []
) -> T:
pass
@overload
def get_object_or_404(
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id"
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id", response_fields: List[str] = []
) -> T:
pass
@@ -36,10 +37,15 @@ def get_object_or_404(
db: Session | AsyncSession,
expunge: bool = False,
lookup_column: str = "id",
response_fields: List[str] = []
) -> T:
async def _get_async_object() -> T:
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
if response_fields:
selected_columns = [getattr(db_class, field) for field in response_fields if hasattr(db_class, field)]
query = select(*selected_columns).select_from(db_class)
else:
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
result = await db.execute(query)
obj = result.scalar_one_or_none()
if obj is None:
@@ -49,7 +55,12 @@ def get_object_or_404(
return obj
def _get_sync_object() -> T:
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
if response_fields:
selected_columns = [getattr(db_class, field) for field in response_fields if hasattr(db_class, field)]
query = db.query(*selected_columns).filter(getattr(db_class, lookup_column) == id)
else:
query = db.query(db_class).filter(getattr(db_class, lookup_column) == id)
obj = query.one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
if expunge:

View File

@@ -0,0 +1,39 @@
from typing import List, Optional, Type
from pydantic import BaseModel, create_model
from fastapi import Query
class ResponseModelDependency:
def __init__(self, model_class: Type[BaseModel]):
self.model_class = model_class
def __call__(self, response_fields: Optional[List[str]] = Query(None)) -> Type[BaseModel]:
def process_result(result, fields=None):
if not fields:
return result
if hasattr(result, '_fields'):
row_fields = result._fields
return dict(zip(row_fields, result))
elif isinstance(result, tuple):
return dict(zip(fields, result))
elif isinstance(result, dict):
return result
else:
return {field: getattr(result, field) for field in fields if hasattr(result, field)}
if not response_fields:
return self.model_class, None, process_result
all_annotations = {}
for cls in self.model_class.__mro__:
if hasattr(cls, '__annotations__'):
all_annotations.update(cls.__annotations__)
fields = {}
for field in response_fields:
if field in all_annotations:
fields[field] = (all_annotations[field], None)
dynamic_model = create_model(f'Dynamic{self.model_class.__name__}', **fields)
return dynamic_model, response_fields, process_result