Adjusted files for isort & autopep

This commit is contained in:
creyD
2024-11-15 11:39:59 +00:00
committed by github-actions[bot]
parent 9bba5b0a4e
commit 6f09c2ef4c

View File

@@ -11,28 +11,31 @@ from .models.base import Base
T = TypeVar("T", bound=Base) T = TypeVar("T", bound=Base)
@overload @overload
async def get_object_or_404( async def get_object_or_404(
db_class: Type[T], db_class: Type[T],
id: UUID | str, id: UUID | str,
db: AsyncSession, db: AsyncSession,
expunge: bool = False, expunge: bool = False,
lookup_column: str = "id" lookup_column: str = "id",
) -> T: ) -> T:
pass pass
@overload @overload
def get_object_or_404( def get_object_or_404(
db_class: Type[T], db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id"
id: UUID | str,
db: Session,
expunge: bool = False,
lookup_column: str = "id"
) -> T: ) -> T:
pass pass
def get_object_or_404( def get_object_or_404(
db_class: Type[T], id: UUID | str, db: Session | AsyncSession, expunge: bool = False, lookup_column: str = "id" db_class: Type[T],
id: UUID | str,
db: Session | AsyncSession,
expunge: bool = False,
lookup_column: str = "id",
) -> T: ) -> T:
async def _get_async_object() -> T: async def _get_async_object() -> T:
@@ -40,7 +43,7 @@ def get_object_or_404(
result = await db.execute(query) result = await db.execute(query)
obj = result.scalar_one_or_none() obj = result.scalar_one_or_none()
if obj is None: if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
if expunge: if expunge:
await db.expunge(obj) await db.expunge(obj)
return obj return obj
@@ -48,18 +51,17 @@ def get_object_or_404(
def _get_sync_object() -> T: def _get_sync_object() -> T:
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none() obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
if obj is None: if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
if expunge: if expunge:
db.expunge(obj) db.expunge(obj)
return obj return obj
if isinstance(db, AsyncSession): if isinstance(db, AsyncSession):
return asyncio.ensure_future(_get_async_object()) # type: ignore return asyncio.ensure_future(_get_async_object()) # type: ignore
elif isinstance(db, Session): elif isinstance(db, Session):
return _get_sync_object() return _get_sync_object()
else: else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing # TODO: Add testing
@@ -69,26 +71,20 @@ async def create_obj_from_data(
model: Type[T], model: Type[T],
db: AsyncSession, db: AsyncSession,
additional_data: dict = {}, additional_data: dict = {},
exclude: dict = {} exclude: dict = {},
) -> T: ) -> T:
pass pass
@overload @overload
def create_obj_from_data( def create_obj_from_data(
data: BaseModel, data: BaseModel, model: Type[T], db: Session, additional_data: dict = {}, exclude: dict = {}
model: Type[T],
db: Session,
additional_data: dict = {},
exclude: dict = {}
) -> T: ) -> T:
pass pass
def create_obj_from_data( def create_obj_from_data(
data: BaseModel, data: BaseModel, model: Type[T], db: Session | AsyncSession, additional_data={}, exclude={}
model: Type[T],
db: Session | AsyncSession,
additional_data={},
exclude={}
) -> T: ) -> T:
obj_data = data.model_dump(exclude=exclude) | additional_data obj_data = data.model_dump(exclude=exclude) | additional_data
obj = model(**obj_data) obj = model(**obj_data)
@@ -106,11 +102,11 @@ def create_obj_from_data(
return obj return obj
if isinstance(db, AsyncSession): if isinstance(db, AsyncSession):
return asyncio.ensure_future(_create_async_obj()) # type: ignore return asyncio.ensure_future(_create_async_obj()) # type: ignore
elif isinstance(db, Session): elif isinstance(db, Session):
return _create_sync_obj() return _create_sync_obj()
else: else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing # TODO: Add testing
@@ -123,10 +119,11 @@ async def update_obj_from_data(
partial: bool = True, partial: bool = True,
ignore_fields: list = [], ignore_fields: list = [],
additional_data: dict = {}, additional_data: dict = {},
exclude: dict = {} exclude: dict = {},
) -> T: ) -> T:
pass pass
@overload @overload
def update_obj_from_data( def update_obj_from_data(
data: BaseModel, data: BaseModel,
@@ -136,10 +133,11 @@ def update_obj_from_data(
partial: bool = True, partial: bool = True,
ignore_fields: list = [], ignore_fields: list = [],
additional_data: dict = {}, additional_data: dict = {},
exclude: dict = {} exclude: dict = {},
) -> T: ) -> T:
pass pass
def update_obj_from_data( def update_obj_from_data(
data: BaseModel, data: BaseModel,
model: Type[T], model: Type[T],
@@ -153,11 +151,11 @@ def update_obj_from_data(
def _update_fields(obj: T): def _update_fields(obj: T):
data_dict = data.model_dump(exclude_unset=partial, exclude=exclude) data_dict = data.model_dump(exclude_unset=partial, exclude=exclude)
data_dict.update(additional_data) data_dict.update(additional_data)
for field in data_dict: for field in data_dict:
if field not in ignore_fields: if field not in ignore_fields:
setattr(obj, field, data_dict[field]) setattr(obj, field, data_dict[field])
async def _update_async_obj() -> T: async def _update_async_obj() -> T:
obj = await get_object_or_404(model, id, db) obj = await get_object_or_404(model, id, db)
_update_fields(obj) _update_fields(obj)
@@ -173,29 +171,25 @@ def update_obj_from_data(
return obj return obj
if isinstance(db, AsyncSession): if isinstance(db, AsyncSession):
return asyncio.ensure_future(_update_async_obj()) # type: ignore return asyncio.ensure_future(_update_async_obj()) # type: ignore
elif isinstance(db, Session): elif isinstance(db, Session):
return _update_sync_obj() return _update_sync_obj()
else: else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing # TODO: Add testing
@overload @overload
async def delete_object( async def delete_object(db_class: Type[T], id: UUID | str, db: AsyncSession) -> None:
db_class: Type[T], id: UUID | str, db: AsyncSession
) -> None:
pass pass
@overload @overload
def delete_object( def delete_object(db_class: Type[T], id: UUID | str, db: Session) -> None:
db_class: Type[T], id: UUID | str, db: Session
) -> None:
pass pass
def delete_object( def delete_object(db_class: Type[T], id: UUID | str, db: Session | AsyncSession) -> None:
db_class: Type[T], id: UUID | str, db: Session | AsyncSession
) -> None:
async def _delete_async_obj() -> None: async def _delete_async_obj() -> None:
query = select(db_class).filter(db_class.id == id) query = select(db_class).filter(db_class.id == id)
result = await db.execute(query) result = await db.execute(query)
@@ -213,8 +207,8 @@ def delete_object(
db.commit() db.commit()
if isinstance(db, AsyncSession): if isinstance(db, AsyncSession):
return asyncio.ensure_future(_delete_async_obj()) # type: ignore return asyncio.ensure_future(_delete_async_obj()) # type: ignore
elif isinstance(db, Session): elif isinstance(db, Session):
return _delete_sync_obj() return _delete_sync_obj()
else: else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore