Compare commits

...

11 Commits

Author SHA1 Message Date
1d7b767623 Merge pull request #31 from vikynoah/pagination_fix 2025-01-24 15:29:52 +01:00
vikynoah
f1f29e84c2 fix: Pagination Fix 2025-01-24 15:23:08 +01:00
dcb9afb8f2 fix: added btree_gist extension option 2025-01-24 11:01:48 +01:00
creyD
8c98e001f9 Adjusted files for isort & autopep 2025-01-24 07:49:05 +00:00
959a746e4f fix: fixed issue with new date format 2025-01-24 08:48:35 +01:00
4f6c066242 feat: added unittest basecase 2025-01-23 11:12:25 +01:00
creyD
da66e116c3 Adjusted files for isort & autopep 2025-01-21 21:50:45 +00:00
c09df1341f fix: fixed migration issue 2025-01-21 22:50:15 +01:00
88000f9cf4 fix: updated mail 2025-01-21 22:20:16 +01:00
92a33489ac fix: Updated author_email 2025-01-21 22:19:51 +01:00
9da4cbcb8e feat: added auth0 testing helpers 2025-01-21 22:17:29 +01:00
5 changed files with 296 additions and 25 deletions

View File

@@ -1,5 +1,5 @@
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime
from sqlalchemy import Column, DateTime, String from sqlalchemy import Column, DateTime, String
from sqlalchemy.dialects.postgresql import UUID from sqlalchemy.dialects.postgresql import UUID
@@ -14,11 +14,7 @@ class Base:
# Primary key as uuid # Primary key as uuid
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime(timezone=True), server_default=func.now()) created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column( updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
created_by_id = Column(String) created_by_id = Column(String)
__name__: str __name__: str

View File

@@ -1,27 +1,26 @@
from contextlib import suppress
from math import ceil from math import ceil
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union, overload from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union, overload
from contextlib import suppress
from pydantic import BaseModel from fastapi import Query
from fastapi_pagination import Params from fastapi_pagination.api import apply_items_transformer, create_page
from fastapi_pagination.bases import AbstractPage, AbstractParams from fastapi_pagination.bases import AbstractPage, AbstractParams, RawParams
from fastapi_pagination.ext.sqlalchemy import create_paginate_query
from fastapi_pagination.types import ( from fastapi_pagination.types import (
AdditionalData,
AsyncItemsTransformer,
GreaterEqualOne, GreaterEqualOne,
GreaterEqualZero, GreaterEqualZero,
AdditionalData,
SyncItemsTransformer,
AsyncItemsTransformer,
ItemsTransformer, ItemsTransformer,
SyncItemsTransformer,
) )
from fastapi_pagination.api import create_page, apply_items_transformer
from fastapi_pagination.utils import verify_params from fastapi_pagination.utils import verify_params
from fastapi_pagination.ext.sqlalchemy import create_paginate_query from pydantic import BaseModel
from fastapi_pagination.bases import AbstractParams, RawParams
from pydantic.json_schema import SkipJsonSchema from pydantic.json_schema import SkipJsonSchema
from sqlalchemy.sql.selectable import Select from sqlalchemy import func, select
from sqlalchemy.orm.session import Session
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
from fastapi import Query from sqlalchemy.orm.session import Session
from sqlalchemy.sql.selectable import Select
from sqlalchemy.util import await_only, greenlet_spawn from sqlalchemy.util import await_only, greenlet_spawn
T = TypeVar("T") T = TypeVar("T")
@@ -29,7 +28,7 @@ T = TypeVar("T")
class PaginationParams(BaseModel, AbstractParams): class PaginationParams(BaseModel, AbstractParams):
page: int = Query(1, ge=1, description="Page number") page: int = Query(1, ge=1, description="Page number")
size: int = Query(50, ge=1, le=100, description="Page size") size: int = Query(50, ge=1, description="Page size")
pagination: bool = Query(True, description="Toggle pagination") pagination: bool = Query(True, description="Toggle pagination")
def to_raw_params(self) -> RawParams: def to_raw_params(self) -> RawParams:
@@ -62,7 +61,7 @@ class Page(AbstractPage[T], Generic[T]):
total: Optional[int] = None, total: Optional[int] = None,
**kwargs: Any, **kwargs: Any,
) -> Self: ) -> Self:
if not isinstance(params, Params): if not isinstance(params, PaginationParams):
raise TypeError("Page should be used with Params") raise TypeError("Page should be used with Params")
size = params.size or total or len(items) size = params.size or total or len(items)
@@ -170,9 +169,9 @@ def _paginate(
total = connection.scalar(count_query) total = connection.scalar(count_query)
if params.pagination is False and total > 0: if params.pagination is False and total > 0:
params = Params(page=1, size=total) params = PaginationParams(page=1, size=total)
else: else:
params = Params(page=params.page, size=params.size) params = PaginationParams(page=params.page, size=params.size)
query = create_paginate_query(query, params) query = create_paginate_query(query, params)
items = connection.execute(query).all() items = connection.execute(query).all()

View File

@@ -0,0 +1,183 @@
import json
import unittest
from typing import Type
from httpx import ASGITransport, AsyncClient, Response
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy_utils import create_database, database_exists, drop_database
from creyPY.fastapi.models.base import Base
class AbstractTestAPI(unittest.IsolatedAsyncioTestCase):
client: AsyncClient
default_headers: dict = {}
@classmethod
def setUpClass(cls, app, headers={}) -> None:
cls.client = AsyncClient(
transport=ASGITransport(app=app), base_url="http://testserver", follow_redirects=True
)
cls.default_headers = headers
print("setting up abstract")
@classmethod
def setup_database(
cls, sync_db_url: str, async_db_url: str, base: Type[Base], btree_gist: bool = False
):
cls.engine_s = create_engine(
sync_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": "require"},
)
if database_exists(cls.engine_s.url):
drop_database(cls.engine_s.url)
create_database(cls.engine_s.url)
if btree_gist:
with cls.engine_s.begin() as conn:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS btree_gist"))
# Migrate
base.metadata.create_all(cls.engine_s)
cls.engine = create_async_engine(
async_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": "require"},
)
async def get(self, url: str, r_code: int = 200, parse_json=True) -> dict | bytes:
re = await self.client.get(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if parse_json else re.content
async def delete(self, url: str, r_code: int = 204) -> dict | None:
re = await self.client.delete(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if r_code != 204 else None
async def post(
self, url: str, obj: dict | str = {}, r_code: int = 201, raw_response=False, *args, **kwargs
):
re = await self.client.post(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
if not raw_response:
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def post_file(
self, url: str, file, r_code: int = 201, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.post(
url,
files={"file": file},
headers=self.default_headers,
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def patch(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.patch(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def put(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.put(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers
| {
"Content-Type": "application/json",
"accept": "application/json",
},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def obj_lifecycle(
self,
input_obj: dict,
url: str,
pagination: bool = True,
id_field: str = "id",
created_at_check: bool = True,
):
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# CREATE
re = await self.post(url, obj=input_obj)
self.assertIn(id_field, re)
self.assertIsNotNone(re[id_field])
if created_at_check:
self.assertIn("created_at", re)
self.assertIsNotNone(re["created_at"])
obj_id = str(re[id_field])
# GET
re = await self.get(f"{url}{obj_id}/")
self.assertEqual(re[id_field], obj_id)
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 1)
self.assertEqual(len(re["results"]), 1)
else:
self.assertEqual(len(re), 1)
# DELETE
await self.delete(f"{url}{obj_id}")
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# GET
await self.get(f"{url}{obj_id}", parse_json=False, r_code=404)

View File

@@ -0,0 +1,93 @@
USER_OBJ = {
"auth0|testing": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester",
"nickname": "testing",
"picture": "https://avatars.githubusercontent.com/u/15138480?v=4",
"updated_at": "2024-01-17T12:36:37.300Z",
"user_id": "auth0|testing",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.761Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.620Z",
"logins_count": 1,
},
"auth0|new_user": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test2@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester 2",
"nickname": "testing 2",
"picture": "https://avatars.githubusercontent.com/u/15138481?v=4",
"updated_at": "2024-01-17T12:36:37.303Z",
"user_id": "auth0|new_user",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.759Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.618Z",
"logins_count": 1,
},
}
def get_user_auth0(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]
def patch_user_auth0(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub].update(input_obj)
return get_user_auth0(sub)
def get_user_auth0_metadata(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]["user_metadata"]
def check_company_auth0(*args, **kwargs) -> bool:
return True
def auth0_sub_to_profile(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
"company_ids": USER_OBJ[sub]["user_metadata"]["company_ids"],
}
def auth0_sub_to_public(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
}
def patch_user_auth0_metadata(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub]["user_metadata"].update(input_obj)
return get_user_auth0_metadata(sub)
def set_company_id(sub: str, company_id: str):
if sub not in USER_OBJ:
USER_OBJ[sub] = {}
USER_OBJ[sub]["user_metadata"] = {"company_ids": [company_id]}

View File

@@ -39,7 +39,7 @@ setup(
long_description=open("README.md").read(), long_description=open("README.md").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
author="Conrad Großer", author="Conrad Großer",
author_email="conrad@noah.tech", author_email="code@grosser.group",
packages=find_packages(), packages=find_packages(),
url="https://github.com/creyD/creyPY", url="https://github.com/creyD/creyPY",
license="MIT", license="MIT",