mirror of
https://github.com/creyD/creyPY.git
synced 2026-04-12 19:30:30 +02:00
Adjusted files for isort & autopep
This commit is contained in:
committed by
github-actions[bot]
parent
8363055984
commit
c2e2469027
@@ -19,14 +19,19 @@ async def get_object_or_404(
|
||||
db: AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = []
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def get_object_or_404(
|
||||
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id", response_fields: List[str] = []
|
||||
db_class: Type[T],
|
||||
id: UUID | str,
|
||||
db: Session,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
@@ -37,12 +42,14 @@ def get_object_or_404(
|
||||
db: Session | AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
response_fields: List[str] = []
|
||||
response_fields: List[str] = [],
|
||||
) -> T:
|
||||
|
||||
async def _get_async_object() -> T:
|
||||
if response_fields:
|
||||
selected_columns = [getattr(db_class, field) for field in response_fields if hasattr(db_class, field)]
|
||||
selected_columns = [
|
||||
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
|
||||
]
|
||||
query = select(*selected_columns).select_from(db_class)
|
||||
else:
|
||||
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
@@ -56,7 +63,9 @@ def get_object_or_404(
|
||||
|
||||
def _get_sync_object() -> T:
|
||||
if response_fields:
|
||||
selected_columns = [getattr(db_class, field) for field in response_fields if hasattr(db_class, field)]
|
||||
selected_columns = [
|
||||
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
|
||||
]
|
||||
query = db.query(*selected_columns).filter(getattr(db_class, lookup_column) == id)
|
||||
else:
|
||||
query = db.query(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
from typing import List, Optional, Type
|
||||
from pydantic import BaseModel, create_model
|
||||
from fastapi import Query
|
||||
|
||||
|
||||
class ResponseModelDependency:
|
||||
def __init__(self, model_class: Type[BaseModel]):
|
||||
self.model_class = model_class
|
||||
|
||||
|
||||
def __call__(self, response_fields: Optional[List[str]] = Query(None)) -> Type[BaseModel]:
|
||||
def process_result(result, fields=None):
|
||||
if not fields:
|
||||
return result
|
||||
|
||||
if hasattr(result, '_fields'):
|
||||
|
||||
if hasattr(result, "_fields"):
|
||||
row_fields = result._fields
|
||||
return dict(zip(row_fields, result))
|
||||
elif isinstance(result, tuple):
|
||||
@@ -19,21 +21,20 @@ class ResponseModelDependency:
|
||||
return result
|
||||
else:
|
||||
return {field: getattr(result, field) for field in fields if hasattr(result, field)}
|
||||
|
||||
|
||||
if not response_fields:
|
||||
return self.model_class, None, process_result
|
||||
|
||||
|
||||
all_annotations = {}
|
||||
for cls in self.model_class.__mro__:
|
||||
if hasattr(cls, '__annotations__'):
|
||||
if hasattr(cls, "__annotations__"):
|
||||
all_annotations.update(cls.__annotations__)
|
||||
|
||||
|
||||
fields = {}
|
||||
for field in response_fields:
|
||||
if field in all_annotations:
|
||||
fields[field] = (all_annotations[field], None)
|
||||
|
||||
dynamic_model = create_model(f'Dynamic{self.model_class.__name__}', **fields)
|
||||
|
||||
|
||||
dynamic_model = create_model(f"Dynamic{self.model_class.__name__}", **fields)
|
||||
|
||||
return dynamic_model, response_fields, process_result
|
||||
|
||||
Reference in New Issue
Block a user