mirror of
https://github.com/creyD/creyPY.git
synced 2026-04-12 19:30:30 +02:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b86b58f3e4 | |||
|
|
17f96c920d | ||
|
|
523241ac4b | ||
|
|
6f09c2ef4c | ||
|
|
9bba5b0a4e | ||
|
|
50031556f9 | ||
|
|
2940ddbdcd | ||
| 807af12fa1 | |||
|
|
dce897c247 | ||
|
|
89997372ef |
@@ -1,63 +1,214 @@
|
||||
from typing import Type, TypeVar
|
||||
from typing import Type, TypeVar, overload
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import HTTPException
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
import asyncio
|
||||
from .models.base import Base
|
||||
|
||||
T = TypeVar("T", bound=Base)
|
||||
|
||||
|
||||
@overload
|
||||
async def get_object_or_404(
|
||||
db_class: Type[T],
|
||||
id: UUID | str,
|
||||
db: AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def get_object_or_404(
|
||||
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id"
|
||||
) -> T:
|
||||
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
if expunge:
|
||||
db.expunge(obj)
|
||||
return obj
|
||||
pass
|
||||
|
||||
|
||||
# TODO: Add testing
|
||||
def create_obj_from_data(
|
||||
data: BaseModel, model: Type[T], db: Session, additional_data={}, exclude={}
|
||||
def get_object_or_404(
|
||||
db_class: Type[T],
|
||||
id: UUID | str,
|
||||
db: Session | AsyncSession,
|
||||
expunge: bool = False,
|
||||
lookup_column: str = "id",
|
||||
) -> T:
|
||||
obj = model(**data.model_dump(exclude=exclude) | additional_data)
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
|
||||
async def _get_async_object() -> T:
|
||||
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
|
||||
result = await db.execute(query)
|
||||
obj = result.scalar_one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
|
||||
if expunge:
|
||||
await db.expunge(obj)
|
||||
return obj
|
||||
|
||||
def _get_sync_object() -> T:
|
||||
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
|
||||
if expunge:
|
||||
db.expunge(obj)
|
||||
return obj
|
||||
|
||||
if isinstance(db, AsyncSession):
|
||||
return asyncio.ensure_future(_get_async_object()) # type: ignore
|
||||
elif isinstance(db, Session):
|
||||
return _get_sync_object()
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
|
||||
|
||||
|
||||
# TODO: Add testing
|
||||
@overload
|
||||
async def create_obj_from_data(
|
||||
data: BaseModel,
|
||||
model: Type[T],
|
||||
db: AsyncSession,
|
||||
additional_data: dict = {},
|
||||
exclude: dict = {},
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def create_obj_from_data(
|
||||
data: BaseModel, model: Type[T], db: Session, additional_data: dict = {}, exclude: dict = {}
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
def create_obj_from_data(
|
||||
data: BaseModel, model: Type[T], db: Session | AsyncSession, additional_data={}, exclude={}
|
||||
) -> T:
|
||||
obj_data = data.model_dump(exclude=exclude) | additional_data
|
||||
obj = model(**obj_data)
|
||||
|
||||
async def _create_async_obj():
|
||||
db.add(obj)
|
||||
await db.commit()
|
||||
await db.refresh(obj)
|
||||
return obj
|
||||
|
||||
def _create_sync_obj():
|
||||
db.add(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
|
||||
if isinstance(db, AsyncSession):
|
||||
return asyncio.ensure_future(_create_async_obj()) # type: ignore
|
||||
elif isinstance(db, Session):
|
||||
return _create_sync_obj()
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
|
||||
|
||||
|
||||
# TODO: Add testing
|
||||
@overload
|
||||
async def update_obj_from_data(
|
||||
data: BaseModel,
|
||||
model: Type[T],
|
||||
id: UUID | str,
|
||||
db: AsyncSession,
|
||||
partial: bool = True,
|
||||
ignore_fields: list = [],
|
||||
additional_data: dict = {},
|
||||
exclude: dict = {},
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def update_obj_from_data(
|
||||
data: BaseModel,
|
||||
model: Type[T],
|
||||
id: UUID | str,
|
||||
db: Session,
|
||||
partial: bool = True,
|
||||
ignore_fields: list = [],
|
||||
additional_data: dict = {},
|
||||
exclude: dict = {},
|
||||
) -> T:
|
||||
pass
|
||||
|
||||
|
||||
def update_obj_from_data(
|
||||
data: BaseModel,
|
||||
model: Type[T],
|
||||
id: UUID | str,
|
||||
db: Session | AsyncSession,
|
||||
partial: bool = True,
|
||||
ignore_fields=[],
|
||||
additional_data={},
|
||||
exclude={},
|
||||
) -> T:
|
||||
obj = get_object_or_404(model, id, db)
|
||||
data_dict = data.model_dump(exclude_unset=partial, exclude=exclude)
|
||||
data_dict.update(additional_data) # merge additional_data into data_dict
|
||||
for field in data_dict:
|
||||
if field not in ignore_fields:
|
||||
setattr(obj, field, data_dict[field])
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
def _update_fields(obj: T):
|
||||
data_dict = data.model_dump(exclude_unset=partial, exclude=exclude)
|
||||
data_dict.update(additional_data)
|
||||
|
||||
for field in data_dict:
|
||||
if field not in ignore_fields:
|
||||
setattr(obj, field, data_dict[field])
|
||||
|
||||
async def _update_async_obj() -> T:
|
||||
obj = await get_object_or_404(model, id, db)
|
||||
_update_fields(obj)
|
||||
await db.commit()
|
||||
await db.refresh(obj)
|
||||
return obj
|
||||
|
||||
def _update_sync_obj() -> T:
|
||||
obj = get_object_or_404(model, id, db)
|
||||
_update_fields(obj)
|
||||
db.commit()
|
||||
db.refresh(obj)
|
||||
return obj
|
||||
|
||||
if isinstance(db, AsyncSession):
|
||||
return asyncio.ensure_future(_update_async_obj()) # type: ignore
|
||||
elif isinstance(db, Session):
|
||||
return _update_sync_obj()
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
|
||||
|
||||
|
||||
# TODO: Add testing
|
||||
@overload
|
||||
async def delete_object(db_class: Type[T], id: UUID | str, db: AsyncSession) -> None:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
def delete_object(db_class: Type[T], id: UUID | str, db: Session) -> None:
|
||||
obj = db.query(db_class).filter(db_class.id == id).one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
db.delete(obj)
|
||||
db.commit()
|
||||
pass
|
||||
|
||||
|
||||
def delete_object(db_class: Type[T], id: UUID | str, db: Session | AsyncSession) -> None:
|
||||
async def _delete_async_obj() -> None:
|
||||
query = select(db_class).filter(db_class.id == id)
|
||||
result = await db.execute(query)
|
||||
obj = result.scalar_one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
await db.delete(obj)
|
||||
await db.commit()
|
||||
|
||||
def _delete_sync_obj() -> None:
|
||||
obj = db.query(db_class).filter(db_class.id == id).one_or_none()
|
||||
if obj is None:
|
||||
raise HTTPException(status_code=404, detail="The object does not exist.")
|
||||
db.delete(obj)
|
||||
db.commit()
|
||||
|
||||
if isinstance(db, AsyncSession):
|
||||
return asyncio.ensure_future(_delete_async_obj()) # type: ignore
|
||||
elif isinstance(db, Session):
|
||||
return _delete_sync_obj()
|
||||
else:
|
||||
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
from .session import * # noqa
|
||||
from .async_session import * # noqa
|
||||
|
||||
31
creyPY/fastapi/db/async_session.py
Normal file
31
creyPY/fastapi/db/async_session.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import os
|
||||
from typing import AsyncGenerator
|
||||
from dotenv import load_dotenv
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
host = os.getenv("POSTGRES_HOST", "localhost")
|
||||
user = os.getenv("POSTGRES_USER", "postgres")
|
||||
password = os.getenv("POSTGRES_PASSWORD", "root")
|
||||
port = os.getenv("POSTGRES_PORT", "5432")
|
||||
name = os.getenv("POSTGRES_DB", "fastapi")
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = f"postgresql+psycopg://{user}:{password}@{host}:{port}/"
|
||||
|
||||
|
||||
async_engine = create_async_engine(SQLALCHEMY_DATABASE_URL + name, pool_pre_ping=True)
|
||||
AsyncSessionLocal = sessionmaker(
|
||||
bind=async_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
autoflush=False,
|
||||
autocommit=False,
|
||||
)
|
||||
|
||||
|
||||
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
|
||||
async with AsyncSessionLocal() as db:
|
||||
yield db
|
||||
@@ -1,6 +1,7 @@
|
||||
from math import ceil
|
||||
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union
|
||||
|
||||
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union, overload
|
||||
from contextlib import suppress
|
||||
from pydantic import BaseModel
|
||||
from fastapi_pagination import Params
|
||||
from fastapi_pagination.bases import AbstractPage, AbstractParams
|
||||
from fastapi_pagination.types import (
|
||||
@@ -8,18 +9,36 @@ from fastapi_pagination.types import (
|
||||
GreaterEqualZero,
|
||||
AdditionalData,
|
||||
SyncItemsTransformer,
|
||||
AsyncItemsTransformer,
|
||||
ItemsTransformer,
|
||||
)
|
||||
from fastapi_pagination.api import create_page, apply_items_transformer
|
||||
from fastapi_pagination.utils import verify_params
|
||||
from fastapi_pagination.ext.sqlalchemy import create_paginate_query
|
||||
from fastapi_pagination.bases import AbstractParams, RawParams
|
||||
from pydantic.json_schema import SkipJsonSchema
|
||||
from sqlalchemy.sql.selectable import Select
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
|
||||
from fastapi import Query
|
||||
from sqlalchemy.util import await_only, greenlet_spawn
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class PaginationParams(BaseModel, AbstractParams):
|
||||
page: int = Query(1, ge=1, description="Page number")
|
||||
size: int = Query(50, ge=1, le=100, description="Page size")
|
||||
pagination: bool = Query(True, description="Toggle pagination")
|
||||
|
||||
def to_raw_params(self) -> RawParams:
|
||||
if not self.pagination:
|
||||
return RawParams(limit=None, offset=None)
|
||||
|
||||
return RawParams(limit=self.size, offset=(self.page - 1) * self.size)
|
||||
|
||||
|
||||
# TODO: Add complete fastapi-pagination proxy here
|
||||
# TODO: Add pagination off functionality
|
||||
# SkipJsonSchema is used to avoid generating invalid JSON schema in FastAPI
|
||||
@@ -32,7 +51,7 @@ class Page(AbstractPage[T], Generic[T]):
|
||||
has_next: bool | SkipJsonSchema[None] = None
|
||||
has_prev: bool | SkipJsonSchema[None] = None
|
||||
|
||||
__params_type__ = Params
|
||||
__params_type__ = PaginationParams
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
@@ -94,27 +113,72 @@ def unwrap_scalars(
|
||||
return [item[0] if force_unwrap else item for item in items]
|
||||
|
||||
|
||||
def _get_sync_conn_from_async(conn: Any) -> Session: # pragma: no cover
|
||||
if isinstance(conn, async_scoped_session):
|
||||
conn = conn()
|
||||
|
||||
with suppress(AttributeError):
|
||||
return conn.sync_session # type: ignore
|
||||
|
||||
with suppress(AttributeError):
|
||||
return conn.sync_connection # type: ignore
|
||||
|
||||
raise TypeError("conn must be an AsyncConnection or AsyncSession")
|
||||
|
||||
|
||||
@overload
|
||||
def paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
paginationFlag: bool = True,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[SyncItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
):
|
||||
params, _ = verify_params(params, "limit-offset", "cursor")
|
||||
) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
@overload
|
||||
async def paginate(
|
||||
connection: AsyncSession,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[AsyncItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
) -> Any:
|
||||
pass
|
||||
|
||||
|
||||
def _paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[ItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
async_: bool = False,
|
||||
):
|
||||
|
||||
if async_:
|
||||
|
||||
def _apply_items_transformer(*args: Any, **kwargs: Any) -> Any:
|
||||
return await_only(apply_items_transformer(*args, **kwargs, async_=True))
|
||||
|
||||
else:
|
||||
_apply_items_transformer = apply_items_transformer
|
||||
|
||||
params, raw_params = verify_params(params, "limit-offset", "cursor")
|
||||
count_query = create_count_query(query)
|
||||
total = connection.scalar(count_query)
|
||||
|
||||
if paginationFlag is False and total > 0:
|
||||
if params.pagination is False and total > 0:
|
||||
params = Params(page=1, size=total)
|
||||
else:
|
||||
params = Params(page=params.page, size=params.size)
|
||||
|
||||
query = create_paginate_query(query, params)
|
||||
items = connection.execute(query).all()
|
||||
|
||||
items = unwrap_scalars(items)
|
||||
t_items = apply_items_transformer(items, transformer)
|
||||
t_items = _apply_items_transformer(items, transformer)
|
||||
|
||||
return create_page(
|
||||
t_items,
|
||||
@@ -122,3 +186,19 @@ def paginate(
|
||||
total=total,
|
||||
**(additional_data or {}),
|
||||
)
|
||||
|
||||
|
||||
def paginate(
|
||||
connection: Session,
|
||||
query: Select,
|
||||
params: Optional[AbstractParams] = None,
|
||||
transformer: Optional[ItemsTransformer] = None,
|
||||
additional_data: Optional[AdditionalData] = None,
|
||||
):
|
||||
if isinstance(connection, AsyncSession):
|
||||
connection = _get_sync_conn_from_async(connection)
|
||||
return greenlet_spawn(
|
||||
_paginate, connection, query, params, transformer, additional_data, async_=True
|
||||
)
|
||||
|
||||
return _paginate(connection, query, params, transformer, additional_data, async_=False)
|
||||
|
||||
139
creyPY/fastapi/testing_async.py
Normal file
139
creyPY/fastapi/testing_async.py
Normal file
@@ -0,0 +1,139 @@
|
||||
import json
|
||||
from httpx import AsyncClient
|
||||
|
||||
|
||||
class AsyncGenericClient:
|
||||
def __init__(self, app):
|
||||
self.c = AsyncClient(app=app, base_url="http://testserver", follow_redirects=True)
|
||||
self.default_headers = {}
|
||||
|
||||
async def get(self, url: str, r_code: int = 200, parse_json=True):
|
||||
re = await self.c.get(url, headers=self.default_headers)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if parse_json else re.content
|
||||
|
||||
async def delete(self, url: str, r_code: int = 204):
|
||||
re = await self.c.delete(url, headers=self.default_headers)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if r_code != 204 else None
|
||||
|
||||
async def post(
|
||||
self, url: str, obj: dict | str = {}, r_code: int = 201, raw_response=False, *args, **kwargs
|
||||
):
|
||||
re = await self.c.post(
|
||||
url,
|
||||
data=json.dumps(obj) if isinstance(obj, dict) else obj,
|
||||
headers=self.default_headers | {"Content-Type": "application/json"},
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if not raw_response else re
|
||||
|
||||
async def post_file(
|
||||
self, url: str, file, r_code: int = 201, raw_response=False, *args, **kwargs
|
||||
):
|
||||
re = await self.c.post(
|
||||
url,
|
||||
files={"file": file},
|
||||
headers=self.default_headers | {"Content-Type": "application/json"},
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if not raw_response else re
|
||||
|
||||
async def patch(
|
||||
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
|
||||
):
|
||||
re = await self.c.patch(
|
||||
url,
|
||||
data=json.dumps(obj) if isinstance(obj, dict) else obj,
|
||||
headers=self.default_headers | {"Content-Type": "application/json"},
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if not raw_response else re
|
||||
|
||||
async def put(
|
||||
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
|
||||
):
|
||||
re = await self.c.put(
|
||||
url,
|
||||
data=json.dumps(obj) if isinstance(obj, dict) else obj,
|
||||
headers=self.default_headers
|
||||
| {
|
||||
"Content-Type": "application/json",
|
||||
"accept": "application/json",
|
||||
},
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
if re.status_code != r_code:
|
||||
print(re.content)
|
||||
assert r_code == re.status_code
|
||||
return re.json() if not raw_response else re
|
||||
|
||||
async def obj_lifecycle(
|
||||
self,
|
||||
input_obj: dict,
|
||||
url: str,
|
||||
pagination: bool = True,
|
||||
id_field: str = "id",
|
||||
created_at_check: bool = True,
|
||||
):
|
||||
# GET LIST
|
||||
re = await self.get(url)
|
||||
if pagination:
|
||||
assert re["total"] == 0
|
||||
assert len(re["results"]) == 0
|
||||
else:
|
||||
assert len(re) == 0
|
||||
|
||||
# CREATE
|
||||
re = await self.post(url, obj=input_obj)
|
||||
assert id_field in re
|
||||
assert re[id_field] is not None
|
||||
|
||||
if created_at_check:
|
||||
assert "created_at" in re
|
||||
assert re["created_at"] is not None
|
||||
|
||||
obj_id = str(re[id_field])
|
||||
|
||||
# GET
|
||||
re = await self.get(f"{url}{obj_id}/")
|
||||
assert re[id_field] == obj_id
|
||||
|
||||
# GET LIST
|
||||
re = await self.get(url)
|
||||
if pagination:
|
||||
assert re["total"] == 1
|
||||
assert len(re["results"]) == 1
|
||||
else:
|
||||
assert len(re) == 1
|
||||
|
||||
# DELETE
|
||||
await self.delete(f"{url}{obj_id}")
|
||||
|
||||
# GET LIST
|
||||
re = await self.get(url)
|
||||
if pagination:
|
||||
assert re["total"] == 0
|
||||
assert len(re["results"]) == 0
|
||||
else:
|
||||
assert len(re) == 0
|
||||
|
||||
# GET
|
||||
await self.get(f"{url}{obj_id}", parse_json=False, r_code=404)
|
||||
@@ -21,3 +21,6 @@ psycopg-pool>=3.2.2 # PostgreSQL
|
||||
h11>=0.14.0 # Testing
|
||||
httpcore>=1.0.5 # Testing
|
||||
httpx>=0.27.0 # Testing
|
||||
|
||||
asyncpg>=0.30.0 #SQLAlchemy
|
||||
greenlet>=3.1.1 #Async
|
||||
|
||||
Reference in New Issue
Block a user