Compare commits

...

8 Commits

Author SHA1 Message Date
4f6c066242 feat: added unittest basecase 2025-01-23 11:12:25 +01:00
creyD
da66e116c3 Adjusted files for isort & autopep 2025-01-21 21:50:45 +00:00
c09df1341f fix: fixed migration issue 2025-01-21 22:50:15 +01:00
88000f9cf4 fix: updated mail 2025-01-21 22:20:16 +01:00
92a33489ac fix: Updated author_email 2025-01-21 22:19:51 +01:00
9da4cbcb8e feat: added auth0 testing helpers 2025-01-21 22:17:29 +01:00
52307f6028 fix: fixed deprecation warning 2025-01-21 22:17:08 +01:00
8019b566f2 fix: added string method for base model 2025-01-21 22:16:07 +01:00
4 changed files with 288 additions and 4 deletions

View File

@@ -1,5 +1,4 @@
import uuid
from datetime import datetime
from sqlalchemy import Column, DateTime, String
from sqlalchemy.dialects.postgresql import UUID
@@ -14,15 +13,30 @@ class Base:
# Primary key as uuid
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
updated_at = Column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
created_by_id = Column(String)
__name__: str
# TODO: Add default representation string
# TODO: Add automated foreign key resolution
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
def __str__(self) -> str:
# if the object has a name, title or similar attribute, return it
if hasattr(self, "name"):
return str(self.name) # type: ignore
# if the object has a title attribute, return it
if hasattr(self, "title"):
return str(self.title) # type: ignore
# otherwise return the object's id
return str(self.id)

View File

@@ -0,0 +1,177 @@
import json
import unittest
from typing import Type
from httpx import ASGITransport, AsyncClient, Response
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy_utils import create_database, database_exists, drop_database
from creyPY.fastapi.models.base import Base
class AbstractTestAPI(unittest.IsolatedAsyncioTestCase):
client: AsyncClient
default_headers: dict = {}
@classmethod
def setUpClass(cls, app, headers={}) -> None:
cls.client = AsyncClient(
transport=ASGITransport(app=app), base_url="http://testserver", follow_redirects=True
)
cls.default_headers = headers
print("setting up abstract")
@classmethod
def setup_database(cls, sync_db_url: str, async_db_url: str, base: Type[Base]):
cls.engine_s = create_engine(
sync_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": "require"},
)
if database_exists(cls.engine_s.url):
drop_database(cls.engine_s.url)
create_database(cls.engine_s.url)
# Migrate
base.metadata.create_all(cls.engine_s)
cls.engine = create_async_engine(
async_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": "require"},
)
async def get(self, url: str, r_code: int = 200, parse_json=True) -> dict | bytes:
re = await self.client.get(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if parse_json else re.content
async def delete(self, url: str, r_code: int = 204) -> dict | None:
re = await self.client.delete(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if r_code != 204 else None
async def post(
self, url: str, obj: dict | str = {}, r_code: int = 201, raw_response=False, *args, **kwargs
):
re = await self.client.post(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
if not raw_response:
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def post_file(
self, url: str, file, r_code: int = 201, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.post(
url,
files={"file": file},
headers=self.default_headers,
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def patch(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.patch(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def put(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.put(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers
| {
"Content-Type": "application/json",
"accept": "application/json",
},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def obj_lifecycle(
self,
input_obj: dict,
url: str,
pagination: bool = True,
id_field: str = "id",
created_at_check: bool = True,
):
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# CREATE
re = await self.post(url, obj=input_obj)
self.assertIn(id_field, re)
self.assertIsNotNone(re[id_field])
if created_at_check:
self.assertIn("created_at", re)
self.assertIsNotNone(re["created_at"])
obj_id = str(re[id_field])
# GET
re = await self.get(f"{url}{obj_id}/")
self.assertEqual(re[id_field], obj_id)
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 1)
self.assertEqual(len(re["results"]), 1)
else:
self.assertEqual(len(re), 1)
# DELETE
await self.delete(f"{url}{obj_id}")
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# GET
await self.get(f"{url}{obj_id}", parse_json=False, r_code=404)

View File

@@ -0,0 +1,93 @@
USER_OBJ = {
"auth0|testing": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester",
"nickname": "testing",
"picture": "https://avatars.githubusercontent.com/u/15138480?v=4",
"updated_at": "2024-01-17T12:36:37.300Z",
"user_id": "auth0|testing",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.761Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.620Z",
"logins_count": 1,
},
"auth0|new_user": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test2@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester 2",
"nickname": "testing 2",
"picture": "https://avatars.githubusercontent.com/u/15138481?v=4",
"updated_at": "2024-01-17T12:36:37.303Z",
"user_id": "auth0|new_user",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.759Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.618Z",
"logins_count": 1,
},
}
def get_user_auth0(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]
def patch_user_auth0(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub].update(input_obj)
return get_user_auth0(sub)
def get_user_auth0_metadata(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]["user_metadata"]
def check_company_auth0(*args, **kwargs) -> bool:
return True
def auth0_sub_to_profile(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
"company_ids": USER_OBJ[sub]["user_metadata"]["company_ids"],
}
def auth0_sub_to_public(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
}
def patch_user_auth0_metadata(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub]["user_metadata"].update(input_obj)
return get_user_auth0_metadata(sub)
def set_company_id(sub: str, company_id: str):
if sub not in USER_OBJ:
USER_OBJ[sub] = {}
USER_OBJ[sub]["user_metadata"] = {"company_ids": [company_id]}

View File

@@ -39,7 +39,7 @@ setup(
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Conrad Großer",
author_email="conrad@noah.tech",
author_email="code@grosser.group",
packages=find_packages(),
url="https://github.com/creyD/creyPY",
license="MIT",