mirror of
https://github.com/creyD/creyPY.git
synced 2026-04-12 19:30:30 +02:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7afb8e2fd8 | ||
|
|
badf2b157f | ||
|
|
c903266ec4 | ||
|
|
910638e3a6 |
@@ -1,12 +1,13 @@
|
||||
from typing import Type, TypeVar, overload, List
|
||||
import asyncio
|
||||
from typing import List, Type, TypeVar, overload
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import HTTPException
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
import asyncio
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .models.base import Base
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
@@ -50,16 +51,27 @@ def get_object_or_404(
|
||||
selected_columns = [
|
||||
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
|
||||
]
|
||||
query = select(*selected_columns).select_from(db_class)
|
||||
query = select(*selected_columns).where(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
row = result.first()
|
||||
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
if hasattr(row, "_mapping"):
|
||||
obj_dict = dict(row._mapping)
|
||||
else:
|
||||
obj_dict = {column.key: getattr(row, column.key) for column in selected_columns}
|
||||
else:
|
||||
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
obj = result.scalar_one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
|
||||
query = select(db_class).where(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
row = result.scalar_one_or_none()
|
||||
if row is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
|
||||
obj_dict = row
|
||||
if expunge:
|
||||
await db.expunge(obj)
|
||||
return obj
|
||||
await db.expunge(obj_dict)
|
||||
return obj_dict
|
||||
|
||||
def _get_sync_object() -> T:
|
||||
if response_fields:
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import List, Optional, Type
|
||||
from pydantic import BaseModel, create_model
|
||||
|
||||
from fastapi import Query
|
||||
from pydantic import BaseModel, create_model
|
||||
|
||||
|
||||
class ResponseModelDependency:
|
||||
@@ -8,8 +9,10 @@ class ResponseModelDependency:
|
||||
self.model_class = model_class
|
||||
|
||||
def __call__(self, response_fields: Optional[List[str]] = Query(None)) -> Type[BaseModel]:
|
||||
def process_result(result, fields=None):
|
||||
def process_result(result, fields=None, async_session=False):
|
||||
if not fields:
|
||||
if async_session:
|
||||
return {k: v for k, v in result.__dict__.items() if not k.startswith("_")}
|
||||
return result
|
||||
|
||||
if hasattr(result, "_fields"):
|
||||
|
||||
Reference in New Issue
Block a user