Compare commits

...

89 Commits

Author SHA1 Message Date
vikynoah
83dca59817 fix: BaseSchemaModelOUT alter (#43) 2025-04-02 09:58:36 +02:00
vikynoah
b80d26586d fix: Response model (#41)
* feat: Filter response fields

* changes

* changes

* import fix

* changes
2025-03-27 09:33:04 +01:00
creyD
c2e2469027 Adjusted files for isort & autopep 2025-03-24 08:26:44 +00:00
renovate[bot]
8363055984 feat(deps): update dependency python to 3.13 (#39)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-03-24 09:26:14 +01:00
vikynoah
f586ce5c03 feat: Filter response fields (#40)
* feat: Filter response fields

* changes

* changes
2025-03-24 09:25:52 +01:00
creyD
0af8f05edf Adjusted files for isort & autopep 2025-03-06 14:36:12 +00:00
vikynoah
b73c524e8d feat: Delete User from Auth0 on invite delete (#38) 2025-03-06 15:35:42 +01:00
creyD
be260b0ee6 Adjusted files for isort & autopep 2025-02-26 16:11:54 +00:00
renovate[bot]
b0f2815568 feat(deps): update dependency stripe to v11.6.0 (#36)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-02-26 17:11:23 +01:00
vikynoah
6ac609f3f4 feat: Add CSV file Helper (#37) 2025-02-26 17:11:07 +01:00
creyD
53ed939451 Adjusted files for isort & autopep 2025-02-24 17:27:48 +00:00
c56d14c2fd Merge pull request #35 from vikynoah/invite_bug
fix: add company id to invite user
2025-02-24 18:27:17 +01:00
vikynoah
1e9bcb92b6 fix: add company id to invite user 2025-02-24 11:49:17 +01:00
5e16bd5cbc fix: fixed issue that creyPY couldn't be used without PSQL 2025-02-19 10:27:51 +01:00
creyD
50b444be89 Adjusted files for isort & autopep 2025-02-14 09:10:55 +00:00
e12c86e352 Merge pull request #34 from vikynoah/obj_lifecycle_patch
feat : Add Patch to obj lifecycle
2025-02-14 10:10:21 +01:00
vikynoah
0708a48301 feat : Add Patch to obj lifecycle 2025-02-13 02:05:15 +01:00
34595d52f2 Merge pull request #33 from creyD/renovate/stripe-11.x
feat(deps): update dependency stripe to v11.5.0
2025-02-05 09:38:28 +01:00
renovate[bot]
421725ad10 feat(deps): update dependency stripe to v11.5.0 2025-01-27 22:02:52 +00:00
31c4cbb055 fix: fixed multiple bugs in database handling 2025-01-27 16:26:26 +01:00
410ae12f8e feat: added ssl option to test database 2025-01-27 13:16:55 +01:00
1f224c44bc feat: added sslmode flag 2025-01-27 13:09:16 +01:00
5b0cc0d87d fix: fixed tests 2025-01-24 19:10:36 +01:00
ecfc0fc167 fix: fixed issue with new mixin 2025-01-24 19:04:07 +01:00
eb62c87679 feat: added experimental init and annotation mixins 2025-01-24 18:58:39 +01:00
2ad7700f72 Merge pull request #30 from creyD/renovate/stripe-11.x
feat(deps): update dependency stripe to v11
2025-01-24 18:39:30 +01:00
1d7b767623 Merge pull request #31 from vikynoah/pagination_fix 2025-01-24 15:29:52 +01:00
vikynoah
f1f29e84c2 fix: Pagination Fix 2025-01-24 15:23:08 +01:00
dcb9afb8f2 fix: added btree_gist extension option 2025-01-24 11:01:48 +01:00
creyD
8c98e001f9 Adjusted files for isort & autopep 2025-01-24 07:49:05 +00:00
959a746e4f fix: fixed issue with new date format 2025-01-24 08:48:35 +01:00
4f6c066242 feat: added unittest basecase 2025-01-23 11:12:25 +01:00
creyD
da66e116c3 Adjusted files for isort & autopep 2025-01-21 21:50:45 +00:00
c09df1341f fix: fixed migration issue 2025-01-21 22:50:15 +01:00
88000f9cf4 fix: updated mail 2025-01-21 22:20:16 +01:00
92a33489ac fix: Updated author_email 2025-01-21 22:19:51 +01:00
9da4cbcb8e feat: added auth0 testing helpers 2025-01-21 22:17:29 +01:00
52307f6028 fix: fixed deprecation warning 2025-01-21 22:17:08 +01:00
8019b566f2 fix: added string method for base model 2025-01-21 22:16:07 +01:00
renovate[bot]
10c1ea5411 feat(deps): update dependency stripe to v11 2025-01-21 21:13:13 +00:00
83726f517c feat: added stripe service 2025-01-21 22:12:03 +01:00
abe84bcfcb Merge pull request #22 from creyD/dev
Major Version 3.0.0
2025-01-21 12:15:43 +01:00
vikynoah
2d6de99585 fix: post_file method change for testing (#29)
* fix: post_file method change for testing

* changes
2025-01-16 09:35:23 +01:00
vikynoah
573f59349f fix: changes to post method in testing_async (#28) 2025-01-08 19:37:10 +01:00
creyD
32bf089456 Adjusted files for isort & autopep 2025-01-02 22:20:49 +00:00
vikynoah
d75fede3d1 fix: Force postgresql SSL mode (#27)
* fix: Force postgresql SSL mode

* changes
2025-01-02 23:20:17 +01:00
creyD
f8b781b3e7 Adjusted files for isort & autopep 2024-12-11 16:15:33 +00:00
vikynoah
93c7f6f6cb fix: Async Testing (#26)
* fix: httpx fix as per latest version

* fix: Fix Async Testing client
2024-12-11 17:14:59 +01:00
creyD
2e44453915 Adjusted files for isort & autopep 2024-12-09 15:29:15 +00:00
vikynoah
2a22471de9 fix: httpx fix as per latest version (#25) 2024-12-09 16:28:44 +01:00
2176b1a37d fix: bumped security risks and enabled newer packages installed 2024-12-04 20:05:19 +01:00
5daddf260e fix: added timeouts to the requests to fix Bandit issue 2024-11-25 13:20:17 +01:00
364e07daa1 fix: fixed random issue (codacy) 2024-11-25 13:14:07 +01:00
5daf6eb8c5 fix: fixed missing import 2024-11-25 12:55:35 +01:00
dfb0588d1c fix: fixed pipeline 2024-11-24 18:27:45 +01:00
3251afdb90 fix: fixed pipeline 2024-11-24 18:25:59 +01:00
85fe263da4 fix: pipeline fix 2024-11-24 18:21:43 +01:00
0be70deb00 fix: fixed pipeline 2024-11-24 18:18:13 +01:00
0418c75e19 feat: added all install option for dependencies 2024-11-24 18:16:03 +01:00
2444269486 feat: added auth0 common module 2024-11-24 18:13:58 +01:00
creyD
33bdeb12a0 Adjusted files for isort & autopep 2024-11-24 17:03:03 +00:00
5efed5399b Update README.md 2024-11-24 18:02:00 +01:00
7dbce117c8 feat: added common database helper 2024-11-24 18:01:45 +01:00
481bfcfffd feat: unified configs for pg sessions 2024-11-24 17:57:53 +01:00
90c9d2dc09 breaking: default version no longer uses postgres 2024-11-24 17:57:49 +01:00
renovate[bot]
8b037fbeb5 chore: Configure Renovate (#20)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
Co-authored-by: Conrad <grosserconrad@gmail.com>
2024-11-24 16:23:52 +01:00
b86b58f3e4 Merge pull request #19 from creyD/dev 2024-11-22 13:20:27 +01:00
creyD
17f96c920d Adjusted files for isort & autopep 2024-11-22 11:58:05 +00:00
vikynoah
523241ac4b feat: N-271 async db (#18) 2024-11-22 12:56:45 +01:00
creyD
6f09c2ef4c Adjusted files for isort & autopep 2024-11-15 11:39:59 +00:00
vikynoah
9bba5b0a4e fix: N 271 async db (#17) 2024-11-15 12:39:30 +01:00
creyD
50031556f9 Adjusted files for isort & autopep 2024-11-12 08:54:34 +00:00
vikynoah
2940ddbdcd feat: Introduce ASYNC DB as Plug and Play (#16)
Co-authored-by: vikbhas <waraa.vignesh@gmail.com>
2024-11-12 09:54:04 +01:00
807af12fa1 Merge pull request #13 from creyD/dev 2024-11-05 11:54:46 +01:00
creyD
dce897c247 Adjusted files for isort & autopep 2024-11-05 10:29:40 +00:00
vikynoah
89997372ef fix: Changes to accomodate pagination flag in Params (#14)
Co-authored-by: vikbhas <waraa.vignesh@gmail.com>
2024-11-05 11:29:06 +01:00
c8c5977978 fix: removed non-working backsync 2024-10-29 16:20:01 +01:00
974bc591d6 Merge pull request #11 from creyD/dev 2024-10-29 15:49:00 +01:00
eb895398ab fix: trying again to fix the pipeline 2024-10-29 15:46:52 +01:00
867abd7054 fix: fixed workflow again 2024-10-29 15:33:54 +01:00
26e18f6b31 Merge pull request #10 from creyD/dev 2024-10-29 15:32:28 +01:00
8a3a60dbb0 fix: fixed workflow again 2024-10-29 15:30:42 +01:00
e52a5f421b Merge pull request #9 from creyD/dev 2024-10-29 15:23:41 +01:00
a6ded91185 fix: syncing back tags to dev 2024-10-29 15:18:57 +01:00
eb64874c47 fix: minor adjustment to the pipeline 2024-10-29 15:13:36 +01:00
b7200852a4 Merge branch 'dev' of https://github.com/creyD/creyPY into dev 2024-10-29 15:13:30 +01:00
3d18205205 fix: fixed github pipeline 2024-10-29 15:12:41 +01:00
99c84b676c Merge pull request #8 from creyD/dev 2024-10-29 15:09:34 +01:00
6a93ab05a3 Merge pull request #6 from creyD/dev 2024-10-29 14:56:14 +01:00
37 changed files with 1189 additions and 86 deletions

View File

@@ -12,7 +12,6 @@ on:
- "**/CHANGELOG.md"
pull_request:
branches:
- master
- dev
workflow_dispatch:
@@ -39,14 +38,17 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
python-version: '3.13'
- run: python -m pip install --upgrade pip
- run: python -m pip install -r requirements.txt
- run: |
python -m pip install -r requirements.txt
python -m pip install -r requirements.pg.txt
python -m pip install -r requirements.auth0.txt
- run: python test.py
tag_and_publish:
runs-on: ubuntu-latest
if: github.ref_name == 'dev' || github.ref_name == 'master' && github.event_name == 'push'
if: (github.ref_name == 'master' || github.ref_name == 'dev') && github.event_name == 'push'
needs: test
permissions:
id-token: write # IMPORTANT: this permission is mandatory for trusted publishing
@@ -84,7 +86,6 @@ jobs:
version_format: ${{ steps.version_format.outputs.version_format }}
- name: Create & Push Tag
if: github.ref_name == 'master' || github.ref_name == 'dev'
run: |
git tag ${{ steps.git_version.outputs.version }}
git push origin ${{ steps.git_version.outputs.version }}
@@ -92,7 +93,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
python-version: '3.13'
- name: Install dependencies
run: |

View File

@@ -55,8 +55,3 @@ from creyPY.const import LanguageEnum
print(LanguageEnum.EN) # Output: LanguageEnum.EN
print(LanguageEnum.EN.value) # Output: English
```
## TODO
- Add async support for database connection
- Add version without postgresql dependency

View File

@@ -1,63 +1,234 @@
from typing import Type, TypeVar
from typing import Type, TypeVar, overload, List
from uuid import UUID
from fastapi import HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
import asyncio
from .models.base import Base
T = TypeVar("T", bound=Base)
@overload
async def get_object_or_404(
db_class: Type[T],
id: UUID | str,
db: AsyncSession,
expunge: bool = False,
lookup_column: str = "id",
response_fields: List[str] = [],
) -> T:
pass
@overload
def get_object_or_404(
db_class: Type[T], id: UUID | str, db: Session, expunge: bool = False, lookup_column: str = "id"
db_class: Type[T],
id: UUID | str,
db: Session,
expunge: bool = False,
lookup_column: str = "id",
response_fields: List[str] = [],
) -> T:
obj = db.query(db_class).filter(getattr(db_class, lookup_column) == id).one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.")
if expunge:
db.expunge(obj)
return obj
pass
def get_object_or_404(
db_class: Type[T],
id: UUID | str,
db: Session | AsyncSession,
expunge: bool = False,
lookup_column: str = "id",
response_fields: List[str] = [],
) -> T:
async def _get_async_object() -> T:
if response_fields:
selected_columns = [
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
]
query = select(*selected_columns).select_from(db_class)
else:
query = select(db_class).filter(getattr(db_class, lookup_column) == id)
result = await db.execute(query)
obj = result.scalar_one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
if expunge:
await db.expunge(obj)
return obj
def _get_sync_object() -> T:
if response_fields:
selected_columns = [
getattr(db_class, field) for field in response_fields if hasattr(db_class, field)
]
query = db.query(*selected_columns).filter(getattr(db_class, lookup_column) == id)
else:
query = db.query(db_class).filter(getattr(db_class, lookup_column) == id)
obj = query.one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.") # type: ignore
if expunge:
db.expunge(obj)
return obj
if isinstance(db, AsyncSession):
return asyncio.ensure_future(_get_async_object()) # type: ignore
elif isinstance(db, Session):
return _get_sync_object()
else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing
@overload
async def create_obj_from_data(
data: BaseModel,
model: Type[T],
db: AsyncSession,
additional_data: dict = {},
exclude: dict = {},
) -> T:
pass
@overload
def create_obj_from_data(
data: BaseModel, model: Type[T], db: Session, additional_data={}, exclude={}
data: BaseModel, model: Type[T], db: Session, additional_data: dict = {}, exclude: dict = {}
) -> T:
obj = model(**data.model_dump(exclude=exclude) | additional_data)
db.add(obj)
db.commit()
db.refresh(obj)
return obj
pass
def create_obj_from_data(
data: BaseModel, model: Type[T], db: Session | AsyncSession, additional_data={}, exclude={}
) -> T:
obj_data = data.model_dump(exclude=exclude) | additional_data
obj = model(**obj_data)
async def _create_async_obj():
db.add(obj)
await db.commit()
await db.refresh(obj)
return obj
def _create_sync_obj():
db.add(obj)
db.commit()
db.refresh(obj)
return obj
if isinstance(db, AsyncSession):
return asyncio.ensure_future(_create_async_obj()) # type: ignore
elif isinstance(db, Session):
return _create_sync_obj()
else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing
@overload
async def update_obj_from_data(
data: BaseModel,
model: Type[T],
id: UUID | str,
db: AsyncSession,
partial: bool = True,
ignore_fields: list = [],
additional_data: dict = {},
exclude: dict = {},
) -> T:
pass
@overload
def update_obj_from_data(
data: BaseModel,
model: Type[T],
id: UUID | str,
db: Session,
partial: bool = True,
ignore_fields: list = [],
additional_data: dict = {},
exclude: dict = {},
) -> T:
pass
def update_obj_from_data(
data: BaseModel,
model: Type[T],
id: UUID | str,
db: Session | AsyncSession,
partial: bool = True,
ignore_fields=[],
additional_data={},
exclude={},
) -> T:
obj = get_object_or_404(model, id, db)
data_dict = data.model_dump(exclude_unset=partial, exclude=exclude)
data_dict.update(additional_data) # merge additional_data into data_dict
for field in data_dict:
if field not in ignore_fields:
setattr(obj, field, data_dict[field])
db.commit()
db.refresh(obj)
return obj
def _update_fields(obj: T):
data_dict = data.model_dump(exclude_unset=partial, exclude=exclude)
data_dict.update(additional_data)
for field in data_dict:
if field not in ignore_fields:
setattr(obj, field, data_dict[field])
async def _update_async_obj() -> T:
obj = await get_object_or_404(model, id, db)
_update_fields(obj)
await db.commit()
await db.refresh(obj)
return obj
def _update_sync_obj() -> T:
obj = get_object_or_404(model, id, db)
_update_fields(obj)
db.commit()
db.refresh(obj)
return obj
if isinstance(db, AsyncSession):
return asyncio.ensure_future(_update_async_obj()) # type: ignore
elif isinstance(db, Session):
return _update_sync_obj()
else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore
# TODO: Add testing
@overload
async def delete_object(db_class: Type[T], id: UUID | str, db: AsyncSession) -> None:
pass
@overload
def delete_object(db_class: Type[T], id: UUID | str, db: Session) -> None:
obj = db.query(db_class).filter(db_class.id == id).one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.")
db.delete(obj)
db.commit()
pass
def delete_object(db_class: Type[T], id: UUID | str, db: Session | AsyncSession) -> None:
async def _delete_async_obj() -> None:
query = select(db_class).filter(db_class.id == id)
result = await db.execute(query)
obj = result.scalar_one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.")
await db.delete(obj)
await db.commit()
def _delete_sync_obj() -> None:
obj = db.query(db_class).filter(db_class.id == id).one_or_none()
if obj is None:
raise HTTPException(status_code=404, detail="The object does not exist.")
db.delete(obj)
db.commit()
if isinstance(db, AsyncSession):
return asyncio.ensure_future(_delete_async_obj()) # type: ignore
elif isinstance(db, Session):
return _delete_sync_obj()
else:
raise HTTPException(status_code=404, detail="Invalid session type. Expected Session or AsyncSession.") # type: ignore

View File

@@ -1 +1,8 @@
from .session import * # noqa
try:
import sqlalchemy
from .async_session import *
from .helpers import *
from .session import *
except ImportError:
print("SQLAlchemy not installed. Database functionality will be disabled.")

View File

@@ -0,0 +1,22 @@
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from .common import SQLALCHEMY_DATABASE_URL, name, ssl_mode
async_engine = create_async_engine(
SQLALCHEMY_DATABASE_URL + name, pool_pre_ping=True, connect_args={"sslmode": ssl_mode}
)
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False,
)
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as db:
yield db

View File

@@ -0,0 +1,15 @@
import os
from dotenv import load_dotenv
load_dotenv()
host = os.getenv("POSTGRES_HOST", "localhost")
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "root")
port = os.getenv("POSTGRES_PORT", "5432")
name = os.getenv("POSTGRES_DB", "fastapi")
ssl_mode = os.getenv("SSL_MODE", "require")
SQLALCHEMY_DATABASE_URL = f"postgresql+psycopg://{user}:{password}@{host}:{port}/"

View File

@@ -0,0 +1,8 @@
from sqlalchemy_utils import create_database, database_exists
def create_if_not_exists(db_name: str):
from .common import SQLALCHEMY_DATABASE_URL
if not database_exists(SQLALCHEMY_DATABASE_URL + db_name):
create_database(SQLALCHEMY_DATABASE_URL + db_name)

View File

@@ -1,23 +1,14 @@
import os
from typing import Generator
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session
load_dotenv()
from .common import SQLALCHEMY_DATABASE_URL, name, ssl_mode
host = os.getenv("POSTGRES_HOST", "localhost")
user = os.getenv("POSTGRES_USER", "postgres")
password = os.getenv("POSTGRES_PASSWORD", "root")
port = os.getenv("POSTGRES_PORT", "5432")
name = os.getenv("POSTGRES_DB", "fastapi")
SQLALCHEMY_DATABASE_URL = f"postgresql+psycopg://{user}:{password}@{host}:{port}/"
engine = create_engine(SQLALCHEMY_DATABASE_URL + name, pool_pre_ping=True)
engine = create_engine(
SQLALCHEMY_DATABASE_URL + name, pool_pre_ping=True, connect_args={"sslmode": ssl_mode}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

View File

@@ -1 +1,2 @@
from .base import * # noqa
from .mixins import * # noqa

View File

@@ -7,9 +7,11 @@ from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import as_declarative
from sqlalchemy.sql import func
from .mixins import AutoAnnotateMixin, AutoInitMixin
@as_declarative()
class Base:
class Base(AutoAnnotateMixin, AutoInitMixin):
__abstract__ = True
# Primary key as uuid
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
@@ -19,10 +21,21 @@ class Base:
__name__: str
# TODO: Add default representation string
# TODO: Add automated foreign key resolution
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
def __str__(self) -> str:
# if the object has a name, title or similar attribute, return it
if hasattr(self, "name"):
return str(self.name) # type: ignore
# if the object has a title attribute, return it
if hasattr(self, "title"):
return str(self.title) # type: ignore
# otherwise return the object's id
return str(self.id)

View File

@@ -0,0 +1,36 @@
from sqlalchemy import Column
from sqlalchemy.orm import Mapped
class AutoAnnotateMixin:
@classmethod
def __init_subclass__(cls) -> None:
super().__init_subclass__()
annotations = {}
for key, value in cls.__dict__.items():
if isinstance(value, Column):
annotations[key] = Mapped[value.type.python_type]
cls.__annotations__ = annotations
class AutoInitMixin:
@classmethod
def __init_subclass__(cls) -> None:
super().__init_subclass__()
init_params = []
for key, value in cls.__dict__.items():
if isinstance(value, Column):
if not value.nullable and not value.default and not value.server_default:
init_params.append((key, value.type.python_type))
def __init__(self, **kwargs):
super(cls, self).__init__()
for key, _ in init_params:
if key not in kwargs:
raise TypeError(f"Missing required argument: {key}")
setattr(self, key, kwargs[key])
for key, value in kwargs.items():
if key not in init_params and hasattr(self.__class__, key):
setattr(self, key, value)
cls.__init__ = __init__

View File

@@ -1,25 +1,43 @@
from contextlib import suppress
from math import ceil
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union
from typing import Any, Generic, Optional, Self, Sequence, TypeVar, Union, overload
from fastapi_pagination import Params
from fastapi_pagination.bases import AbstractPage, AbstractParams
from fastapi import Query
from fastapi_pagination.api import apply_items_transformer, create_page
from fastapi_pagination.bases import AbstractPage, AbstractParams, RawParams
from fastapi_pagination.ext.sqlalchemy import create_paginate_query
from fastapi_pagination.types import (
AdditionalData,
AsyncItemsTransformer,
GreaterEqualOne,
GreaterEqualZero,
AdditionalData,
ItemsTransformer,
SyncItemsTransformer,
)
from fastapi_pagination.api import create_page, apply_items_transformer
from fastapi_pagination.utils import verify_params
from fastapi_pagination.ext.sqlalchemy import create_paginate_query
from pydantic import BaseModel
from pydantic.json_schema import SkipJsonSchema
from sqlalchemy.sql.selectable import Select
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession, async_scoped_session
from sqlalchemy.orm.session import Session
from sqlalchemy import select, func
from sqlalchemy.sql.selectable import Select
from sqlalchemy.util import await_only, greenlet_spawn
T = TypeVar("T")
class PaginationParams(BaseModel, AbstractParams):
page: int = Query(1, ge=1, description="Page number")
size: int = Query(50, ge=1, description="Page size")
pagination: bool = Query(True, description="Toggle pagination")
def to_raw_params(self) -> RawParams:
if not self.pagination:
return RawParams(limit=None, offset=None)
return RawParams(limit=self.size, offset=(self.page - 1) * self.size)
# TODO: Add complete fastapi-pagination proxy here
# TODO: Add pagination off functionality
# SkipJsonSchema is used to avoid generating invalid JSON schema in FastAPI
@@ -32,7 +50,7 @@ class Page(AbstractPage[T], Generic[T]):
has_next: bool | SkipJsonSchema[None] = None
has_prev: bool | SkipJsonSchema[None] = None
__params_type__ = Params
__params_type__ = PaginationParams
@classmethod
def create(
@@ -43,7 +61,7 @@ class Page(AbstractPage[T], Generic[T]):
total: Optional[int] = None,
**kwargs: Any,
) -> Self:
if not isinstance(params, Params):
if not isinstance(params, PaginationParams):
raise TypeError("Page should be used with Params")
size = params.size or total or len(items)
@@ -94,27 +112,72 @@ def unwrap_scalars(
return [item[0] if force_unwrap else item for item in items]
def _get_sync_conn_from_async(conn: Any) -> Session: # pragma: no cover
if isinstance(conn, async_scoped_session):
conn = conn()
with suppress(AttributeError):
return conn.sync_session # type: ignore
with suppress(AttributeError):
return conn.sync_connection # type: ignore
raise TypeError("conn must be an AsyncConnection or AsyncSession")
@overload
def paginate(
connection: Session,
query: Select,
paginationFlag: bool = True,
params: Optional[AbstractParams] = None,
transformer: Optional[SyncItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
):
params, _ = verify_params(params, "limit-offset", "cursor")
) -> Any:
pass
@overload
async def paginate(
connection: AsyncSession,
query: Select,
params: Optional[AbstractParams] = None,
transformer: Optional[AsyncItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
) -> Any:
pass
def _paginate(
connection: Session,
query: Select,
params: Optional[AbstractParams] = None,
transformer: Optional[ItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
async_: bool = False,
):
if async_:
def _apply_items_transformer(*args: Any, **kwargs: Any) -> Any:
return await_only(apply_items_transformer(*args, **kwargs, async_=True))
else:
_apply_items_transformer = apply_items_transformer
params, raw_params = verify_params(params, "limit-offset", "cursor")
count_query = create_count_query(query)
total = connection.scalar(count_query)
if paginationFlag is False and total > 0:
params = Params(page=1, size=total)
if params.pagination is False and total > 0:
params = PaginationParams(page=1, size=total)
else:
params = PaginationParams(page=params.page, size=params.size)
query = create_paginate_query(query, params)
items = connection.execute(query).all()
items = unwrap_scalars(items)
t_items = apply_items_transformer(items, transformer)
t_items = _apply_items_transformer(items, transformer)
return create_page(
t_items,
@@ -122,3 +185,19 @@ def paginate(
total=total,
**(additional_data or {}),
)
def paginate(
connection: Session,
query: Select,
params: Optional[AbstractParams] = None,
transformer: Optional[ItemsTransformer] = None,
additional_data: Optional[AdditionalData] = None,
):
if isinstance(connection, AsyncSession):
connection = _get_sync_conn_from_async(connection)
return greenlet_spawn(
_paginate, connection, query, params, transformer, additional_data, async_=True
)
return _paginate(connection, query, params, transformer, additional_data, async_=False)

View File

@@ -1 +1,2 @@
from .base import * # noqa
from .response_schema import * #noqa

View File

@@ -11,6 +11,6 @@ class BaseSchemaModelIN(BaseModel):
class BaseSchemaModelOUT(BaseSchemaModelIN):
id: UUID
id: UUID | str
created_at: datetime
updated_at: datetime

View File

@@ -0,0 +1,40 @@
from typing import List, Optional, Type
from pydantic import BaseModel, create_model
from fastapi import Query
class ResponseModelDependency:
def __init__(self, model_class: Type[BaseModel]):
self.model_class = model_class
def __call__(self, response_fields: Optional[List[str]] = Query(None)) -> Type[BaseModel]:
def process_result(result, fields=None):
if not fields:
return result
if hasattr(result, "_fields"):
row_fields = result._fields
return dict(zip(row_fields, result))
elif isinstance(result, tuple):
return dict(zip(fields, result))
elif isinstance(result, dict):
return result
else:
return {field: getattr(result, field) for field in fields if hasattr(result, field)}
if not response_fields:
return self.model_class, None, process_result
all_annotations = {}
for cls in self.model_class.__mro__:
if hasattr(cls, "__annotations__"):
all_annotations.update(cls.__annotations__)
fields = {}
for field in response_fields:
if field in all_annotations:
fields[field] = (all_annotations[field], None)
dynamic_model = create_model(f"Dynamic{self.model_class.__name__}", **fields)
return dynamic_model, response_fields, process_result

View File

@@ -41,7 +41,7 @@ class GenericClient(TestClient):
re = self.c.post(
url,
files={"file": file},
headers=self.default_headers | {"Content-Type": "application/json"},
headers=self.default_headers,
*args,
**kwargs,
)

View File

@@ -0,0 +1,143 @@
import json
from httpx import ASGITransport, AsyncClient
class AsyncGenericClient:
def __init__(self, app, headers={}):
self.c = AsyncClient(
transport=ASGITransport(app=app), base_url="http://testserver", follow_redirects=True
)
self.default_headers = headers
async def get(self, url: str, r_code: int = 200, parse_json=True):
re = await self.c.get(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
assert r_code == re.status_code
return re.json() if parse_json else re.content
async def delete(self, url: str, r_code: int = 204):
re = await self.c.delete(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
assert r_code == re.status_code
return re.json() if r_code != 204 else None
async def post(
self, url: str, obj: dict | str = {}, r_code: int = 201, raw_response=False, *args, **kwargs
):
re = await self.c.post(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
if not raw_response:
assert r_code == re.status_code
return re.json() if not raw_response else re
async def post_file(
self, url: str, file, r_code: int = 201, raw_response=False, *args, **kwargs
):
re = await self.c.post(
url,
files={"file": file},
headers=self.default_headers,
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
assert r_code == re.status_code
return re.json() if not raw_response else re
async def patch(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
):
re = await self.c.patch(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
assert r_code == re.status_code
return re.json() if not raw_response else re
async def put(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
):
re = await self.c.put(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers
| {
"Content-Type": "application/json",
"accept": "application/json",
},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
assert r_code == re.status_code
return re.json() if not raw_response else re
async def obj_lifecycle(
self,
input_obj: dict,
url: str,
pagination: bool = True,
id_field: str = "id",
created_at_check: bool = True,
):
# GET LIST
re = await self.get(url)
if pagination:
assert re["total"] == 0
assert len(re["results"]) == 0
else:
assert len(re) == 0
# CREATE
re = await self.post(url, obj=input_obj)
assert id_field in re
assert re[id_field] is not None
if created_at_check:
assert "created_at" in re
assert re["created_at"] is not None
obj_id = str(re[id_field])
# GET
re = await self.get(f"{url}{obj_id}/")
assert re[id_field] == obj_id
# GET LIST
re = await self.get(url)
if pagination:
assert re["total"] == 1
assert len(re["results"]) == 1
else:
assert len(re) == 1
# DELETE
await self.delete(f"{url}{obj_id}")
# GET LIST
re = await self.get(url)
if pagination:
assert re["total"] == 0
assert len(re["results"]) == 0
else:
assert len(re) == 0
# GET
await self.get(f"{url}{obj_id}", parse_json=False, r_code=404)

View File

@@ -0,0 +1,196 @@
import json
import unittest
from typing import Type
from httpx import ASGITransport, AsyncClient, Response
from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy_utils import create_database, database_exists, drop_database
from creyPY.fastapi.models.base import Base
class AbstractTestAPI(unittest.IsolatedAsyncioTestCase):
client: AsyncClient
default_headers: dict = {}
@classmethod
def setUpClass(cls, app, headers={}) -> None:
cls.client = AsyncClient(
transport=ASGITransport(app=app), base_url="http://testserver", follow_redirects=True
)
cls.default_headers = headers
@classmethod
def setup_database(
cls,
sync_db_url: str,
async_db_url: str,
base: Type[Base],
btree_gist: bool = False,
ssl_mode: str = "require",
):
cls.engine_s = create_engine(
sync_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": ssl_mode},
)
if database_exists(cls.engine_s.url):
drop_database(cls.engine_s.url)
create_database(cls.engine_s.url)
if btree_gist:
with cls.engine_s.begin() as conn:
conn.execute(text("CREATE EXTENSION IF NOT EXISTS btree_gist"))
# Migrate
base.metadata.create_all(cls.engine_s)
cls.engine = create_async_engine(
async_db_url,
echo=False,
pool_pre_ping=True,
connect_args={"sslmode": ssl_mode},
)
async def get(self, url: str, r_code: int = 200, parse_json=True) -> dict | bytes:
re = await self.client.get(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if parse_json else re.content
async def delete(self, url: str, r_code: int = 204) -> dict | None:
re = await self.client.delete(url, headers=self.default_headers)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if r_code != 204 else None
async def post(
self, url: str, obj: dict | str = {}, r_code: int = 201, raw_response=False, *args, **kwargs
):
re = await self.client.post(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
if not raw_response:
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def post_file(
self, url: str, file, r_code: int = 201, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.post(
url,
files={"file": file},
headers=self.default_headers,
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def patch(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.patch(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers | {"Content-Type": "application/json"},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def put(
self, url: str, obj: dict | str = {}, r_code: int = 200, raw_response=False, *args, **kwargs
) -> dict | bytes | Response:
re = await self.client.put(
url,
data=json.dumps(obj) if isinstance(obj, dict) else obj,
headers=self.default_headers
| {
"Content-Type": "application/json",
"accept": "application/json",
},
*args,
**kwargs,
)
if re.status_code != r_code:
print(re.content)
self.assertEqual(r_code, re.status_code)
return re.json() if not raw_response else re
async def obj_lifecycle(
self,
input_obj: dict,
url: str,
pagination: bool = True,
id_field: str = "id",
created_at_check: bool = True,
patch: dict | None = None,
):
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# CREATE
re = await self.post(url, obj=input_obj)
self.assertIn(id_field, re)
self.assertIsNotNone(re[id_field])
if created_at_check:
self.assertIn("created_at", re)
self.assertIsNotNone(re["created_at"])
obj_id = str(re[id_field])
# GET
re = await self.get(f"{url}{obj_id}/")
self.assertEqual(re[id_field], obj_id)
# PATCH
if patch:
for key, value in patch.items():
input_obj[key] = value
re = await self.patch(f"{url}{obj_id}/", obj=input_obj)
for key, value in patch.items():
self.assertEqual(re[key], value)
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 1)
self.assertEqual(len(re["results"]), 1)
else:
self.assertEqual(len(re), 1)
# DELETE
await self.delete(f"{url}{obj_id}")
# GET LIST
re = await self.get(url)
if pagination:
self.assertEqual(re["total"], 0)
self.assertEqual(len(re["results"]), 0)
else:
self.assertEqual(len(re), 0)
# GET
await self.get(f"{url}{obj_id}", parse_json=False, r_code=404)

26
creyPY/helpers.py Normal file
View File

@@ -0,0 +1,26 @@
import secrets
import string
import csv
from pathlib import Path
def create_random_password(length: int = 12) -> str:
all_characters = string.ascii_letters + string.digits + string.punctuation
password = [
secrets.choice(string.ascii_lowercase),
secrets.choice(string.ascii_uppercase),
secrets.choice(string.digits),
secrets.choice(string.punctuation),
]
password += [secrets.choice(all_characters) for _ in range(length - 4)]
secrets.SystemRandom().shuffle(password)
return "".join(password)
def data_to_csv(file: Path, data: list) -> None:
with file.open(mode="w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys(), delimiter=";")
writer.writeheader()
writer.writerows(data)

View File

@@ -0,0 +1,2 @@
from .auth0 import * # noqa
from .stripe import * # noqa

View File

@@ -0,0 +1,4 @@
from .exceptions import * # noqa
from .manage import * # noqa
from .testing import * # noqa
from .utils import * # noqa

View File

@@ -0,0 +1,13 @@
import os
from dotenv import load_dotenv
load_dotenv()
AUTH0_DOMAIN = os.getenv("AUTH0_DOMAIN")
AUTH0_CLIENT_ID = os.getenv("AUTH0_CLIENT_ID")
AUTH0_CLIENT_SECRET = os.getenv("AUTH0_CLIENT_SECRET")
AUTH0_ALGORIGHM = os.getenv("AUTH0_ALGORIGHM", "RS256")
AUTH0_AUDIENCE = os.getenv("AUTH0_AUDIENCE")
AUTH0_ISSUER = os.getenv("AUTH0_ISSUER")

View File

@@ -0,0 +1,12 @@
from fastapi import HTTPException, status
class UnauthorizedException(HTTPException):
def __init__(self, detail: str, **kwargs):
"""Returns HTTP 403"""
super().__init__(status.HTTP_403_FORBIDDEN, detail=detail)
class UnauthenticatedException(HTTPException):
def __init__(self):
super().__init__(status_code=status.HTTP_401_UNAUTHORIZED, detail="Requires authentication")

View File

@@ -0,0 +1,21 @@
import requests
from cachetools import TTLCache, cached
from .common import AUTH0_CLIENT_ID, AUTH0_CLIENT_SECRET, AUTH0_DOMAIN
cache = TTLCache(maxsize=100, ttl=600)
@cached(cache)
def get_management_token() -> str:
response = requests.post(
f"https://{AUTH0_DOMAIN}/oauth/token",
json={
"client_id": AUTH0_CLIENT_ID,
"client_secret": AUTH0_CLIENT_SECRET,
"audience": f"https://{AUTH0_DOMAIN}/api/v2/", # This should be the management audience
"grant_type": "client_credentials",
},
timeout=5, # Add a timeout parameter to avoid hanging requests
).json()
return response["access_token"]

View File

@@ -0,0 +1,93 @@
USER_OBJ = {
"auth0|testing": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester",
"nickname": "testing",
"picture": "https://avatars.githubusercontent.com/u/15138480?v=4",
"updated_at": "2024-01-17T12:36:37.300Z",
"user_id": "auth0|testing",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.761Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.620Z",
"logins_count": 1,
},
"auth0|new_user": {
"created_at": "2023-08-15T13:25:31.507Z",
"email": "test2@test.org",
"email_verified": True,
"identities": [
{
"connection": "Username-Password-Authentication",
"provider": "auth0",
"user_id": "testing",
"isSocial": False,
}
],
"name": "Test Tester 2",
"nickname": "testing 2",
"picture": "https://avatars.githubusercontent.com/u/15138481?v=4",
"updated_at": "2024-01-17T12:36:37.303Z",
"user_id": "auth0|new_user",
"user_metadata": {},
"last_password_reset": "2024-01-17T11:42:08.759Z",
"last_ip": "127.0.0.1",
"last_login": "2024-01-17T11:43:09.618Z",
"logins_count": 1,
},
}
def get_user_auth0(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]
def patch_user_auth0(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub].update(input_obj)
return get_user_auth0(sub)
def get_user_auth0_metadata(sub, *args, **kwargs) -> dict:
return USER_OBJ[sub]["user_metadata"]
def check_company_auth0(*args, **kwargs) -> bool:
return True
def auth0_sub_to_profile(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
"company_ids": USER_OBJ[sub]["user_metadata"]["company_ids"],
}
def auth0_sub_to_public(sub: str) -> dict:
return {
"email": USER_OBJ[sub]["email"],
"name": USER_OBJ[sub]["name"],
"picture": USER_OBJ[sub]["picture"],
}
def patch_user_auth0_metadata(input_obj: dict, sub, *args, **kwargs) -> dict:
USER_OBJ[sub]["user_metadata"].update(input_obj)
return get_user_auth0_metadata(sub)
def set_company_id(sub: str, company_id: str):
if sub not in USER_OBJ:
USER_OBJ[sub] = {}
USER_OBJ[sub]["user_metadata"] = {"company_ids": [company_id]}

View File

@@ -0,0 +1,147 @@
from typing import Optional
import jwt
import requests
from fastapi import HTTPException, Request, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from creyPY.helpers import create_random_password
from .common import (
AUTH0_ALGORIGHM,
AUTH0_AUDIENCE,
AUTH0_CLIENT_ID,
AUTH0_DOMAIN,
AUTH0_ISSUER,
)
from .exceptions import UnauthenticatedException, UnauthorizedException
from .manage import get_management_token
JWKS_CLIENT = jwt.PyJWKClient(f"https://{AUTH0_DOMAIN}/.well-known/jwks.json")
async def verify(
request: Request,
token: Optional[HTTPAuthorizationCredentials] = Security(HTTPBearer(auto_error=False)),
) -> str:
if token is None:
raise UnauthenticatedException
# This gets the 'kid' from the passed token
try:
signing_key = JWKS_CLIENT.get_signing_key_from_jwt(token.credentials).key
except jwt.exceptions.PyJWKClientError as error:
raise UnauthorizedException(str(error))
except jwt.exceptions.DecodeError as error:
raise UnauthorizedException(str(error))
try:
payload = jwt.decode(
token.credentials,
signing_key,
algorithms=[AUTH0_ALGORIGHM],
audience=AUTH0_AUDIENCE,
issuer=AUTH0_ISSUER,
)
except Exception as error:
raise UnauthorizedException(str(error))
return payload["sub"]
### GENERIC AUTH0 CALLS ###
def get_user(sub) -> dict:
re = requests.get(
f"https://{AUTH0_DOMAIN}/api/v2/users/{sub}",
headers={"Authorization": f"Bearer {get_management_token()}"},
timeout=5,
)
if re.status_code != 200:
raise HTTPException(re.status_code, re.json())
return re.json()
def patch_user(input_obj: dict, sub) -> dict:
re = requests.patch(
f"https://{AUTH0_DOMAIN}/api/v2/users/{sub}",
headers={"Authorization": f"Bearer {get_management_token()}"},
json=input_obj,
timeout=5,
)
if re.status_code != 200:
raise HTTPException(re.status_code, re.json())
return re.json()
### USER METADATA CALLS ###
def get_user_metadata(sub) -> dict:
try:
return get_user(sub).get("user_metadata", {})
except:
return {}
def patch_user_metadata(input_obj: dict, sub) -> dict:
return patch_user({"user_metadata": input_obj}, sub)
def clear_user_metadata(sub) -> dict:
return patch_user({"user_metadata": {}}, sub)
def request_verification_mail(sub: str) -> None:
re = requests.post(
f"https://{AUTH0_DOMAIN}/api/v2/jobs/verification-email",
headers={"Authorization": f"Bearer {get_management_token()}"},
json={"user_id": sub},
timeout=5,
)
if re.status_code != 201:
raise HTTPException(re.status_code, re.json())
return re.json()
def create_user_invite(email: str, company_id: str) -> dict:
re = requests.post(
f"https://{AUTH0_DOMAIN}/api/v2/users",
headers={"Authorization": f"Bearer {get_management_token()}"},
json={
"email": email,
"connection": "Username-Password-Authentication",
"password": create_random_password(),
"verify_email": False,
"app_metadata": {"invitedToMyApp": True},
"user_metadata": {"company_ids": [company_id]},
},
timeout=5,
)
if re.status_code != 201:
raise HTTPException(re.status_code, re.json())
return re.json()
def delete_user_invite(user_id: str) -> None:
re = requests.delete(
f"https://{AUTH0_DOMAIN}/api/v2/users/{user_id}",
headers={"Authorization": f"Bearer {get_management_token()}"},
timeout=5,
)
if re.status_code != 204:
raise HTTPException(re.status_code, re.json())
def password_change_mail(email: str) -> bool:
re = requests.post(
f"https://{AUTH0_DOMAIN}/dbconnections/change_password",
headers={"Authorization": f"Bearer {get_management_token()}"},
json={
"client_id": AUTH0_CLIENT_ID,
"email": email,
"connection": "Username-Password-Authentication",
},
timeout=5,
)
if re.status_code != 200:
raise HTTPException(re.status_code, re.json())
return True

View File

@@ -0,0 +1 @@
from .testing import * # noqa

View File

@@ -0,0 +1,23 @@
class ItemReturn:
quantity = 1
class SubscriptionItem:
def retrieve(self, id: str = ""):
return ItemReturn
def modify(self, id: str, quantity: int):
return ItemReturn
class StripeAPI:
def __init__(self, key: str):
pass
@property
def SubscriptionItem(self):
return SubscriptionItem
def get_stripe_api():
return StripeAPI("test")

View File

@@ -0,0 +1,11 @@
import os
import stripe
from dotenv import load_dotenv
load_dotenv()
def get_stripe_api():
stripe.api_key = os.getenv("STRIPE_API_KEY", "")
return stripe

7
renovate.json Normal file
View File

@@ -0,0 +1,7 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:recommended",
":semanticCommitTypeAll(feat)"
]
}

7
requirements.auth0.txt Normal file
View File

@@ -0,0 +1,7 @@
cachetools>=5.5.0 # for caching
charset-normalizer>=3.4.0 # Auth0 API interactions
requests>=2.32.3 # Auth0 API interactions
pyjwt>=2.10.1 # Auth0 API interactions
cffi>=1.17.1 # Auth0 API interactions
cryptography>=43.0.3 # Auth0 API interactions
pycparser>=2.22 # Auth0 API interactions

View File

@@ -23,5 +23,3 @@ twine>=5.0.0
urllib3>=2.2.1
wheel>=0.43.0
zipp>=3.18.1
-r requirements.txt

5
requirements.pg.txt Normal file
View File

@@ -0,0 +1,5 @@
psycopg>=3.2.1 # PostgreSQL
psycopg-binary>=3.2.1 # PostgreSQL
psycopg-pool>=3.2.2 # PostgreSQL
asyncpg>=0.30.0 # SQLAlchemy
greenlet>=3.1.1 # Async

1
requirements.stripe.txt Normal file
View File

@@ -0,0 +1 @@
stripe==11.6.0 # Stripe

View File

@@ -11,13 +11,10 @@ starlette>=0.37.2 # FastAPI
fastapi-pagination>=0.12.26 # Pagination
sqlalchemy>=2.0.31 # SQLAlchemy
sqlalchemy-utils>=0.41.2 # For managing databases
python-dotenv>=1.0.1 # Environment variables
psycopg>=3.2.1 # PostgreSQL
psycopg-binary>=3.2.1 # PostgreSQL
psycopg-pool>=3.2.2 # PostgreSQL
h11>=0.14.0 # Testing
httpcore>=1.0.5 # Testing
httpx>=0.27.0 # Testing

View File

@@ -5,6 +5,18 @@ from setuptools import find_packages, setup
with open("requirements.txt") as f:
requirements = f.read().splitlines()
with open("requirements.build.txt") as f:
build_requirements = f.read().splitlines()
with open("requirements.pg.txt") as f:
pg_requirements = f.read().splitlines()
with open("requirements.auth0.txt") as f:
auth0_requirements = f.read().splitlines()
with open("requirements.stripe.txt") as f:
stripe_requirements = f.read().splitlines()
def get_latest_git_tag() -> str:
try:
@@ -27,12 +39,19 @@ setup(
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
author="Conrad Großer",
author_email="conrad@noah.tech",
author_email="code@grosser.group",
packages=find_packages(),
url="https://github.com/creyD/creyPY",
license="MIT",
python_requires=">=3.12",
install_requires=requirements,
extras_require={
"build": build_requirements,
"postgres": pg_requirements,
"auth0": auth0_requirements,
"stripe": stripe_requirements,
"all": build_requirements + pg_requirements + auth0_requirements + stripe_requirements,
},
keywords=[
"creyPY",
"Python",
@@ -40,7 +59,6 @@ setup(
"shortcuts",
"snippets",
"utils",
"personal library",
],
platforms="any",
)

View File

@@ -7,9 +7,7 @@ from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from creyPY.fastapi.app import generate_unique_id
from creyPY.fastapi.crud import (
get_object_or_404,
)
from creyPY.fastapi.crud import get_object_or_404
from creyPY.fastapi.models.base import Base
@@ -65,7 +63,7 @@ class TestMyFunction(unittest.TestCase):
def test_get_object_or_404_existing_object(self):
# Arrange
obj_id = UUID("123e4567-e89b-12d3-a456-426614174000")
obj = MockDBClass(obj_id)
obj = MockDBClass(id=obj_id)
self.db.add(obj)
self.db.commit()