mirror of
https://github.com/creyD/creyPY.git
synced 2026-04-14 12:20:31 +02:00
feat: Introduce ASYNC DB as Plug and Play (#16)
Co-authored-by: vikbhas <waraa.vignesh@gmail.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from math import ceil
|
||||
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union
|
||||
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union, overload
|
||||
from contextlib import suppress
|
||||
from pydantic import BaseModel
|
||||
from fastapi_pagination import Params
|
||||
from fastapi_pagination.bases import AbstractPage, AbstractParams
|
||||
@@ -8,6 +9,8 @@ from fastapi_pagination.types import (
|
||||
GreaterEqualZero,
|
||||
AdditionalData,
|
||||
SyncItemsTransformer,
|
||||
AsyncItemsTransformer,
|
||||
ItemsTransformer
|
||||
)
|
||||
from fastapi_pagination.api import create_page, apply_items_transformer
|
||||
from fastapi_pagination.utils import verify_params
|
||||
@@ -17,7 +20,9 @@ from pydantic.json_schema import SkipJsonSchema
|
||||
from sqlalchemy.sql.selectable import Select
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
|
||||
from fastapi import Query
|
||||
from sqlalchemy.util import await_only, greenlet_spawn
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@@ -107,19 +112,59 @@ def unwrap_scalars(
|
||||
) -> Union[Sequence[T], Sequence[Sequence[T]]]:
|
||||
return [item[0] if force_unwrap else item for item in items]
|
||||
|
||||
def _get_sync_conn_from_async(conn: Any) -> Session: # pragma: no cover
|
||||
if isinstance(conn, async_scoped_session):
|
||||
conn = conn()
|
||||
|
||||
with suppress(AttributeError):
|
||||
return conn.sync_session # type: ignore
|
||||
|
||||
with suppress(AttributeError):
|
||||
return conn.sync_connection # type: ignore
|
||||
|
||||
raise TypeError("conn must be an AsyncConnection or AsyncSession")
|
||||
|
||||
@overload
|
||||
def paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[SyncItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
async def paginate(
|
||||
connection: AsyncSession,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[AsyncItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
def _paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[ItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
async_:bool = False
|
||||
):
|
||||
|
||||
if async_:
|
||||
def _apply_items_transformer(*args: Any, **kwargs: Any) -> Any:
|
||||
return await_only(apply_items_transformer(*args, **kwargs, async_=True))
|
||||
else:
|
||||
_apply_items_transformer = apply_items_transformer
|
||||
|
||||
params, raw_params = verify_params(params, "limit-offset", "cursor")
|
||||
count_query = create_count_query(query)
|
||||
total = connection.scalar(count_query)
|
||||
|
||||
|
||||
if params.pagination is False and total > 0:
|
||||
params = Params(page=1, size=total)
|
||||
else:
|
||||
@@ -129,7 +174,7 @@ def paginate(
|
||||
items = connection.execute(query).all()
|
||||
|
||||
items = unwrap_scalars(items)
|
||||
t_items = apply_items_transformer(items, transformer)
|
||||
t_items = _apply_items_transformer(items, transformer)
|
||||
|
||||
return create_page(
|
||||
t_items,
|
||||
@@ -137,3 +182,29 @@ def paginate(
|
||||
total=total,
|
||||
**(additional_data or {}),
|
||||
)
|
||||
|
||||
def paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[ItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
):
|
||||
if isinstance(connection,AsyncSession):
|
||||
connection = _get_sync_conn_from_async(connection)
|
||||
return greenlet_spawn(_paginate,
|
||||
connection,
|
||||
query,
|
||||
params,
|
||||
transformer,
|
||||
additional_data,
|
||||
async_=True)
|
||||
|
||||
return _paginate(
|
||||
connection,
|
||||
query,
|
||||
params,
|
||||
transformer,
|
||||
additional_data,
|
||||
async_=False
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user